diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go new file mode 100644 index 000000000..2725c055b --- /dev/null +++ b/cmd/client/txgen/main.go @@ -0,0 +1,328 @@ +package main + +import ( + "flag" + "fmt" + "math/big" + "math/rand" + "os" + "path" + "sync" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + bls2 "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/api/client" + proto_node "github.com/harmony-one/harmony/api/proto/node" + "github.com/harmony-one/harmony/common/denominations" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/genesis" + "github.com/harmony-one/harmony/internal/params" + "github.com/harmony-one/harmony/internal/shardchain" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/node" + "github.com/harmony-one/harmony/p2p" + p2p_host "github.com/harmony-one/harmony/p2p/host" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "github.com/harmony-one/harmony/shard" +) + +var ( + version string + builtBy string + builtAt string + commit string + stateMutex sync.Mutex +) + +const ( + checkFrequency = 2 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions. +) + +// Settings is the settings for TX generation. No Cross-Shard Support! +type Settings struct { + NumOfAddress int + MaxNumTxsPerBatch int +} + +func printVersion(me string) { + fmt.Fprintf(os.Stderr, "Harmony (C) 2019. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt) + os.Exit(0) +} + +// The main entrance for the transaction generator program which simulate transactions and send to the network for +// processing. + +var ( + ip = flag.String("ip", "127.0.0.1", "IP of the node") + port = flag.String("port", "9999", "port of the node.") + numTxns = flag.Int("numTxns", 100, "number of transactions to send per message") + logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution") + duration = flag.Int("duration", 30, "duration of the tx generation in second. If it's negative, the experiment runs forever.") + versionFlag = flag.Bool("version", false, "Output version info") + crossShardRatio = flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility + shardIDFlag = flag.Int("shardID", 0, "The shardID the node belongs to.") + // Key file to store the private key + keyFile = flag.String("key", "./.txgenkey", "the private key file of the txgen") + // logging verbosity + verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)") +) + +func setUpTXGen() *node.Node { + nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile) + if err != nil { + panic(err) + } + + peerPubKey := bls.RandPrivateKey().GetPublicKey() + if peerPubKey == nil { + panic(fmt.Errorf("generate key error")) + } + shardID := *shardIDFlag + selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey} + + // Nodes containing blockchain data to mirror the shards' data in the network + + myhost, err := p2pimpl.NewHost(&selfPeer, nodePriKey) + if err != nil { + panic("unable to new host in txgen") + } + if err != nil { + fmt.Fprintf(os.Stderr, "Error :%v \n", err) + os.Exit(1) + } + decider := quorum.NewDecider(quorum.SuperMajorityVote) + consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil, decider) + chainDBFactory := &shardchain.MemDBFactory{} + txGen := node.New(myhost, consensusObj, chainDBFactory, false) //Changed it : no longer archival node. + txGen.Client = client.NewClient(txGen.GetHost(), uint32(shardID)) + consensusObj.ChainReader = txGen.Blockchain() + genesisShardingConfig := shard.Schedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch)) + startIdx := 0 + endIdx := startIdx + genesisShardingConfig.NumNodesPerShard() + pubs := []*bls2.PublicKey{} + for _, acct := range genesis.HarmonyAccounts[startIdx:endIdx] { + pub := &bls2.PublicKey{} + if err := pub.DeserializeHexStr(acct.BlsPublicKey); err != nil { + fmt.Printf("Can not deserialize public key. err: %v", err) + os.Exit(1) + } + pubs = append(pubs, pub) + } + consensusObj.Decider.UpdateParticipants(pubs) + txGen.NodeConfig.SetRole(nodeconfig.ClientNode) + if shardID == 0 { + txGen.NodeConfig.SetShardGroupID(nodeconfig.GroupIDBeacon) + } else { + txGen.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID))) + } + + txGen.NodeConfig.SetIsClient(true) + + return txGen +} + +func main() { + flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress") + flag.Parse() + if *versionFlag { + printVersion(os.Args[0]) + } + // Logging setup + utils.SetLogContext(*port, *ip) + utils.SetLogVerbosity(log.Lvl(*verbosity)) + if len(utils.BootNodes) == 0 { + bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings) + if err != nil { + panic(err) + } + utils.BootNodes = bootNodeAddrs + } + // Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard + setting := Settings{ + NumOfAddress: 10000, + MaxNumTxsPerBatch: *numTxns, + } + shardID := *shardIDFlag + utils.Logger().Debug(). + Int("cx ratio", *crossShardRatio). + Msg("Cross Shard Ratio Is Set But not used") + + // TODO(Richard): refactor this chuck to a single method + // Setup a logger to stdout and log file. + logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder) + h := log.MultiHandler( + log.StreamHandler(os.Stdout, log.TerminalFormat(false)), + log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file + ) + log.Root().SetHandler(h) + txGen := setUpTXGen() + txGen.ServiceManagerSetup() + txGen.RunServices() + start := time.Now() + totalTime := float64(*duration) + utils.Logger().Debug(). + Float64("totalTime", totalTime). + Bool("RunForever", isDurationForever(totalTime)). + Msg("Total Duration") + ticker := time.NewTicker(checkFrequency * time.Second) + txGen.DoSyncWithoutConsensus() +syncLoop: + for { + t := time.Now() + if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { + utils.Logger().Debug(). + Int("duration", (int(t.Sub(start)))). + Time("startTime", start). + Float64("totalTime", totalTime). + Msg("Generator timer ended in syncLoop.") + break syncLoop + } + select { + case <-ticker.C: + if txGen.State.String() == "NodeReadyForConsensus" { + utils.Logger().Debug(). + Str("txgen node", txGen.SelfPeer.String()). + Str("Node State", txGen.State.String()). + Msg("Generator is now in Sync.") + ticker.Stop() + break syncLoop + } + } + } + readySignal := make(chan uint32) + // This func is used to update the client's blockchain when new blocks are received from the leaders + updateBlocksFunc := func(blocks []*types.Block) { + utils.Logger().Info(). + Uint64("block num", blocks[0].NumberU64()). + Msg("[Txgen] Received new block") + for _, block := range blocks { + shardID := block.ShardID() + if txGen.Consensus.ShardID == shardID { + utils.Logger().Info(). + Int("txNum", len(block.Transactions())). + Uint32("shardID", shardID). + Str("preHash", block.ParentHash().Hex()). + Uint64("currentBlock", txGen.Blockchain().CurrentBlock().NumberU64()). + Uint64("incoming block", block.NumberU64()). + Msg("Got block from leader") + if block.NumberU64()-txGen.Blockchain().CurrentBlock().NumberU64() == 1 { + if err := txGen.AddNewBlock(block); err != nil { + utils.Logger().Error(). + Err(err). + Msg("Error when adding new block") + } + stateMutex.Lock() + if err := txGen.Worker.UpdateCurrent(block.Coinbase()); err != nil { + utils.Logger().Warn().Err(err).Msg("(*Worker).UpdateCurrent failed") + } + stateMutex.Unlock() + readySignal <- shardID + } + } else { + continue + } + } + } + txGen.Client.UpdateBlocks = updateBlocksFunc + // Start the client server to listen to leader's message + go func() { + // wait for 3 seconds for client to send ping message to leader + // FIXME (leo) the readySignal should be set once we really sent ping message to leader + time.Sleep(1 * time.Second) // wait for nodes to be ready + readySignal <- uint32(shardID) + }() +pushLoop: + for { + t := time.Now() + utils.Logger().Debug(). + Float64("running time", t.Sub(start).Seconds()). + Float64("totalTime", totalTime). + Msg("Current running time") + if !isDurationForever(totalTime) && t.Sub(start).Seconds() >= totalTime { + utils.Logger().Debug(). + Int("duration", (int(t.Sub(start)))). + Time("startTime", start). + Float64("totalTime", totalTime). + Msg("Generator timer ended.") + break pushLoop + } + if shardID != 0 { + if otherHeight, flag := txGen.IsSameHeight(); flag { + if otherHeight >= 1 { + go func() { + readySignal <- uint32(shardID) + utils.Logger().Debug().Msg("Same blockchain height so readySignal generated") + time.Sleep(3 * time.Second) // wait for nodes to be ready + }() + } + } + } + select { + case shardID := <-readySignal: + lock := sync.Mutex{} + txs, err := GenerateSimulatedTransactionsAccount(uint32(shardID), txGen, setting) + if err != nil { + utils.Logger().Debug(). + Err(err). + Msg("Error in Generating Txns") + } + lock.Lock() + SendTxsToShard(txGen, txs, uint32(shardID)) + lock.Unlock() + case <-time.After(10 * time.Second): + utils.Logger().Warn().Msg("No new block is received so far") + } + } +} + +// SendTxsToShard sends txs to shard, currently just to beacon shard +func SendTxsToShard(clientNode *node.Node, txs types.Transactions, shardID uint32) { + msg := proto_node.ConstructTransactionListMessageAccount(txs) + var err error + if shardID == 0 { + err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + } else { + clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID)) + err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) + } + if err != nil { + utils.Logger().Debug(). + Err(err). + Msg("Error in Sending Txns") + } +} + +// GenerateSimulatedTransactionsAccount generates simulated transaction for account model. +func GenerateSimulatedTransactionsAccount(shardID uint32, node *node.Node, setting Settings) (types.Transactions, error) { + TxnsToGenerate := setting.MaxNumTxsPerBatch // TODO: make use of settings + txs := make([]*types.Transaction, TxnsToGenerate) + rounds := (TxnsToGenerate / 100) + remainder := TxnsToGenerate % 100 + for i := 0; i < 100; i++ { + baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey)) + for j := 0; j < rounds; j++ { + randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey) + randAmount := rand.Float32() + tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) + txs[100*j+i] = tx + } + if i < remainder { + randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey) + randAmount := rand.Float32() + tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(rounds), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) + txs[100*rounds+i] = tx + } + } + return txs, nil +} + +func isDurationForever(duration float64) bool { + return duration <= 0 +} diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go new file mode 100644 index 000000000..7d62c5c05 --- /dev/null +++ b/cmd/harmony/main.go @@ -0,0 +1,517 @@ +package main + +import ( + "encoding/hex" + "flag" + "fmt" + "math/big" + "math/rand" + "os" + "path" + "runtime" + "strconv" + "time" + + ethCommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/api/service/syncing" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/internal/blsgen" + "github.com/harmony-one/harmony/internal/common" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" + "github.com/harmony-one/harmony/internal/genesis" + hmykey "github.com/harmony-one/harmony/internal/keystore" + "github.com/harmony-one/harmony/internal/memprofiling" + "github.com/harmony-one/harmony/internal/shardchain" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/node" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "github.com/harmony-one/harmony/shard" +) + +// Version string variables +var ( + version string + builtBy string + builtAt string + commit string +) + +// Host +var ( + myHost p2p.Host +) + +// InitLDBDatabase initializes a LDBDatabase. isGenesis=true will return the beacon chain database for normal shard nodes +func InitLDBDatabase(ip string, port string, freshDB bool, isBeacon bool) (*ethdb.LDBDatabase, error) { + var dbFileName string + if isBeacon { + dbFileName = fmt.Sprintf("./db/harmony_beacon_%s_%s", ip, port) + } else { + dbFileName = fmt.Sprintf("./db/harmony_%s_%s", ip, port) + } + if freshDB { + var err = os.RemoveAll(dbFileName) + if err != nil { + fmt.Println(err.Error()) + } + } + return ethdb.NewLDBDatabase(dbFileName, 0, 0) +} + +func printVersion() { + fmt.Fprintln(os.Stderr, nodeconfig.GetVersion()) + os.Exit(0) +} + +var ( + ip = flag.String("ip", "127.0.0.1", "ip of the node") + port = flag.String("port", "9000", "port of the node.") + logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution") + logMaxSize = flag.Int("log_max_size", 100, "the max size in megabytes of the log file before it gets rotated") + freshDB = flag.Bool("fresh_db", false, "true means the existing disk based db will be removed") + profile = flag.Bool("profile", false, "Turn on profiling (CPU, Memory).") + metricsReportURL = flag.String("metrics_report_url", "", "If set, reports metrics to this URL.") + versionFlag = flag.Bool("version", false, "Output version info") + onlyLogTps = flag.Bool("only_log_tps", false, "Only log TPS if true") + dnsZone = flag.String("dns_zone", "", "if given and not empty, use peers from the zone (default: use libp2p peer discovery instead)") + dnsFlag = flag.Bool("dns", true, "[deprecated] equivalent to -dns_zone t.hmny.io") + //Leader needs to have a minimal number of peers to start consensus + minPeers = flag.Int("min_peers", 32, "Minimal number of Peers in shard") + // Key file to store the private key + keyFile = flag.String("key", "./.hmykey", "the p2p key file of the harmony node") + // isGenesis indicates this node is a genesis node + isGenesis = flag.Bool("is_genesis", true, "true means this node is a genesis node") + // isArchival indicates this node is an archival node that will save and archive current blockchain + isArchival = flag.Bool("is_archival", true, "false makes node faster by turning caching off") + // delayCommit is the commit-delay timer, used by Harmony nodes + delayCommit = flag.String("delay_commit", "0ms", "how long to delay sending commit messages in consensus, ex: 500ms, 1s") + // nodeType indicates the type of the node: validator, explorer + nodeType = flag.String("node_type", "validator", "node type: validator, explorer") + // networkType indicates the type of the network + networkType = flag.String("network_type", "mainnet", "type of the network: mainnet, testnet, devnet, localnet") + // syncFreq indicates sync frequency + syncFreq = flag.Int("sync_freq", 60, "unit in seconds") + // beaconSyncFreq indicates beaconchain sync frequency + beaconSyncFreq = flag.Int("beacon_sync_freq", 60, "unit in seconds") + // blockPeriod indicates the how long the leader waits to propose a new block. + blockPeriod = flag.Int("block_period", 8, "how long in second the leader waits to propose a new block.") + leaderOverride = flag.Bool("leader_override", false, "true means override the default leader role and acts as validator") + // shardID indicates the shard ID of this node + shardID = flag.Int("shard_id", -1, "the shard ID of this node") + enableMemProfiling = flag.Bool("enableMemProfiling", false, "Enable memsize logging.") + enableGC = flag.Bool("enableGC", true, "Enable calling garbage collector manually .") + blsKeyFile = flag.String("blskey_file", "", "The encrypted file of bls serialized private key by passphrase.") + blsPass = flag.String("blspass", "", "The file containing passphrase to decrypt the encrypted bls file.") + blsPassphrase string + // Sharding configuration parameters for devnet + devnetNumShards = flag.Uint("dn_num_shards", 2, "number of shards for -network_type=devnet (default: 2)") + devnetShardSize = flag.Int("dn_shard_size", 10, "number of nodes per shard for -network_type=devnet (default 10)") + devnetHarmonySize = flag.Int("dn_hmy_size", -1, "number of Harmony-operated nodes per shard for -network_type=devnet; negative (default) means equal to -dn_shard_size") + // logConn logs incoming/outgoing connections + logConn = flag.Bool("log_conn", false, "log incoming/outgoing connections") + keystoreDir = flag.String("keystore", hmykey.DefaultKeyStoreDir, "The default keystore directory") + initialAccount = &genesis.DeployAccount{} + // logging verbosity + verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)") + // dbDir is the database directory. + dbDir = flag.String("db_dir", "", "blockchain database directory") + // Disable view change. + disableViewChange = flag.Bool("disable_view_change", false, "Do not propose view change (testing only)") + // metrics flag to collct meetrics or not, pushgateway ip and port for metrics + metricsFlag = flag.Bool("metrics", false, "Collect and upload node metrics") + pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "Metrics view ip") + pushgatewayPort = flag.String("pushgateway_port", "9091", "Metrics view port") + publicRPC = flag.Bool("public_rpc", false, "Enable Public RPC Access (default: false)") +) + +func initSetup() { + + // maybe request passphrase for bls key. + passphraseForBls() + + // Configure log parameters + utils.SetLogContext(*port, *ip) + utils.SetLogVerbosity(log.Lvl(*verbosity)) + utils.AddLogFile(fmt.Sprintf("%v/validator-%v-%v.log", *logFolder, *ip, *port), *logMaxSize) + + if *onlyLogTps { + matchFilterHandler := log.MatchFilterHandler("msg", "TPS Report", utils.GetLogInstance().GetHandler()) + utils.GetLogInstance().SetHandler(matchFilterHandler) + } + + // Add GOMAXPROCS to achieve max performance. + runtime.GOMAXPROCS(runtime.NumCPU() * 4) + + // Set port and ip to global config. + nodeconfig.GetDefaultConfig().Port = *port + nodeconfig.GetDefaultConfig().IP = *ip + + // Setup mem profiling. + memprofiling.GetMemProfiling().Config() + + // Set default keystore Dir + hmykey.DefaultKeyStoreDir = *keystoreDir + + // Set up randomization seed. + rand.Seed(int64(time.Now().Nanosecond())) + + if len(utils.BootNodes) == 0 { + bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings) + if err != nil { + panic(err) + } + utils.BootNodes = bootNodeAddrs + } +} + +func passphraseForBls() { + // If FN node running, they should either specify blsPrivateKey or the file with passphrase + // However, explorer or non-validator nodes need no blskey + if *nodeType != "validator" { + return + } + + if *blsKeyFile == "" || *blsPass == "" { + fmt.Println("Internal nodes need to have pass to decrypt blskey") + os.Exit(101) + } + passphrase, err := utils.GetPassphraseFromSource(*blsPass) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR when reading passphrase file: %v\n", err) + os.Exit(100) + } + blsPassphrase = passphrase +} + +func setupInitialAccount() (isLeader bool) { + genesisShardingConfig := shard.Schedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch)) + pubKey := setupConsensusKey(nodeconfig.GetDefaultConfig()) + + reshardingEpoch := genesisShardingConfig.ReshardingEpoch() + if reshardingEpoch != nil && len(reshardingEpoch) > 0 { + for _, epoch := range reshardingEpoch { + config := shard.Schedule.InstanceForEpoch(epoch) + isLeader, initialAccount = config.FindAccount(pubKey.SerializeToHexStr()) + if initialAccount != nil { + break + } + } + } else { + isLeader, initialAccount = genesisShardingConfig.FindAccount(pubKey.SerializeToHexStr()) + } + + if initialAccount == nil { + fmt.Fprintf(os.Stderr, "ERROR cannot find your BLS key in the genesis/FN tables: %s\n", pubKey.SerializeToHexStr()) + os.Exit(100) + } + + fmt.Printf("My Genesis Account: %v\n", *initialAccount) + + return isLeader +} + +func setupConsensusKey(nodeConfig *nodeconfig.ConfigType) *bls.PublicKey { + consensusPriKey, err := blsgen.LoadBlsKeyWithPassPhrase(*blsKeyFile, blsPassphrase) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR when loading bls key, err :%v\n", err) + os.Exit(100) + } + pubKey := consensusPriKey.GetPublicKey() + + // Consensus keys are the BLS12-381 keys used to sign consensus messages + nodeConfig.ConsensusPriKey, nodeConfig.ConsensusPubKey = consensusPriKey, consensusPriKey.GetPublicKey() + if nodeConfig.ConsensusPriKey == nil || nodeConfig.ConsensusPubKey == nil { + fmt.Println("error to get consensus keys.") + os.Exit(100) + } + return pubKey +} + +func createGlobalConfig() *nodeconfig.ConfigType { + var err error + + nodeConfig := nodeconfig.GetShardConfig(initialAccount.ShardID) + if *nodeType == "validator" { + // Set up consensus keys. + setupConsensusKey(nodeConfig) + } else { + nodeConfig.ConsensusPriKey = &bls.SecretKey{} // set dummy bls key for consensus object + } + + // Set network type + netType := nodeconfig.NetworkType(*networkType) + switch netType { + case nodeconfig.Mainnet, nodeconfig.Testnet, nodeconfig.Pangaea, nodeconfig.Localnet, nodeconfig.Devnet: + nodeconfig.SetNetworkType(netType) + default: + panic(fmt.Sprintf("invalid network type: %s", *networkType)) + } + + nodeConfig.SetPushgatewayIP(*pushgatewayIP) + nodeConfig.SetPushgatewayPort(*pushgatewayPort) + nodeConfig.SetMetricsFlag(*metricsFlag) + + // P2p private key is used for secure message transfer between p2p nodes. + nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile) + if err != nil { + panic(err) + } + + selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey} + + myHost, err = p2pimpl.NewHost(&selfPeer, nodeConfig.P2pPriKey) + if *logConn && nodeConfig.GetNetworkType() != nodeconfig.Mainnet { + myHost.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogger())) + } + if err != nil { + panic("unable to new host in harmony") + } + + nodeConfig.DBDir = *dbDir + + return nodeConfig +} + +func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { + // Consensus object. + // TODO: consensus object shouldn't start here + // TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of consensus later. + decider := quorum.NewDecider(quorum.SuperMajorityVote) + currentConsensus, err := consensus.New( + myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey, decider, + ) + currentConsensus.Decider.SetShardIDProvider(func() (uint32, error) { + return currentConsensus.ShardID, nil + }) + currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address) + + if err != nil { + fmt.Fprintf(os.Stderr, "Error :%v \n", err) + os.Exit(1) + } + commitDelay, err := time.ParseDuration(*delayCommit) + if err != nil || commitDelay < 0 { + _, _ = fmt.Fprintf(os.Stderr, "ERROR invalid commit delay %#v", *delayCommit) + os.Exit(1) + } + currentConsensus.SetCommitDelay(commitDelay) + currentConsensus.MinPeers = *minPeers + + if *disableViewChange { + currentConsensus.DisableViewChangeForTestingOnly() + } + + // Current node. + chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir} + currentNode := node.New(myHost, currentConsensus, chainDBFactory, *isArchival) + + switch { + case *networkType == nodeconfig.Localnet: + epochConfig := shard.Schedule.InstanceForEpoch(ethCommon.Big0) + selfPort, err := strconv.ParseUint(*port, 10, 16) + if err != nil { + utils.Logger().Fatal(). + Err(err). + Str("self_port_string", *port). + Msg("cannot convert self port string into port number") + } + currentNode.SyncingPeerProvider = node.NewLocalSyncingPeerProvider( + 6000, uint16(selfPort), epochConfig.NumShards(), uint32(epochConfig.NumNodesPerShard())) + case *dnsZone != "": + currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(*dnsZone, syncing.GetSyncingPort(*port)) + case *dnsFlag: + currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider("t.hmny.io", syncing.GetSyncingPort(*port)) + default: + currentNode.SyncingPeerProvider = node.NewLegacySyncingPeerProvider(currentNode) + + } + + // TODO: refactor the creation of blockchain out of node.New() + currentConsensus.ChainReader = currentNode.Blockchain() + + // Set up prometheus pushgateway for metrics monitoring serivce. + currentNode.NodeConfig.SetPushgatewayIP(nodeConfig.PushgatewayIP) + currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort) + currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag) + + currentNode.NodeConfig.SetBeaconGroupID(nodeconfig.NewGroupIDByShardID(0)) + + switch *nodeType { + case "explorer": + currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode) + currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(*shardID))) + currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(*shardID))) + case "validator": + currentNode.NodeConfig.SetRole(nodeconfig.Validator) + if nodeConfig.ShardID == shard.BeaconChainShardID { + currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID)) + currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(shard.BeaconChainShardID)) + } else { + currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID))) + currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID))) + } + } + currentNode.NodeConfig.ConsensusPubKey = nodeConfig.ConsensusPubKey + currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey + + // Setup block period for currentNode. + currentNode.BlockPeriod = time.Duration(*blockPeriod) * time.Second + + // TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection. + // Enable it back after mainnet. + // dRand := drand.New(nodeConfig.Host, nodeConfig.ShardID, []p2p.Peer{}, nodeConfig.Leader, currentNode.ConfirmedBlockChannel, nodeConfig.ConsensusPriKey) + // currentNode.Consensus.RegisterPRndChannel(dRand.PRndChannel) + // currentNode.Consensus.RegisterRndChannel(dRand.RndChannel) + // currentNode.DRand = dRand + + // This needs to be executed after consensus and drand are setup + if err := currentNode.InitConsensusWithValidators(); err != nil { + utils.Logger().Warn(). + Int("shardID", *shardID). + Err(err). + Msg("InitConsensusWithMembers failed") + } + + // Set the consensus ID to be the current block number + viewID := currentNode.Blockchain().CurrentBlock().Header().ViewID().Uint64() + currentConsensus.SetViewID(viewID) + utils.Logger().Info(). + Uint64("viewID", viewID). + Msg("Init Blockchain") + + // Assign closure functions to the consensus object + currentConsensus.BlockVerifier = currentNode.VerifyNewBlock + currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing + currentNode.State = node.NodeWaitToJoin + + // update consensus information based on the blockchain + mode := currentConsensus.UpdateConsensusInformation() + currentConsensus.SetMode(mode) + + // Watching currentNode and currentConsensus. + memprofiling.GetMemProfiling().Add("currentNode", currentNode) + memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus) + return currentNode +} + +func main() { + flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress (delimited by ,)") + flag.Parse() + + switch *nodeType { + case "validator": + case "explorer": + break + default: + fmt.Fprintf(os.Stderr, "Unknown node type: %s\n", *nodeType) + os.Exit(1) + } + + nodeconfig.SetPublicRPC(*publicRPC) + nodeconfig.SetVersion(fmt.Sprintf("Harmony (C) 2019. %v, version %v-%v (%v %v)", path.Base(os.Args[0]), version, commit, builtBy, builtAt)) + if *versionFlag { + printVersion() + } + + switch *networkType { + case nodeconfig.Mainnet: + shard.Schedule = shardingconfig.MainnetSchedule + case nodeconfig.Testnet: + shard.Schedule = shardingconfig.TestnetSchedule + case nodeconfig.Pangaea: + shard.Schedule = shardingconfig.PangaeaSchedule + case nodeconfig.Localnet: + shard.Schedule = shardingconfig.LocalnetSchedule + case nodeconfig.Devnet: + if *devnetHarmonySize < 0 { + *devnetHarmonySize = *devnetShardSize + } + // TODO (leo): use a passing list of accounts here + devnetConfig, err := shardingconfig.NewInstance( + uint32(*devnetNumShards), *devnetShardSize, *devnetHarmonySize, genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, nil) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "ERROR invalid devnet sharding config: %s", + err) + os.Exit(1) + } + shard.Schedule = shardingconfig.NewFixedSchedule(devnetConfig) + } + + initSetup() + + // Set up manual call for garbage collection. + if *enableGC { + memprofiling.MaybeCallGCPeriodically() + } + + if *nodeType == "validator" { + setupInitialAccount() + } + + if *shardID >= 0 { + utils.Logger().Info(). + Uint32("original", initialAccount.ShardID). + Int("override", *shardID). + Msg("ShardID Override") + initialAccount.ShardID = uint32(*shardID) + } + + nodeConfig := createGlobalConfig() + currentNode := setupConsensusAndNode(nodeConfig) + //setup state syncing and beacon syncing frequency + currentNode.SetSyncFreq(*syncFreq) + currentNode.SetBeaconSyncFreq(*beaconSyncFreq) + + if nodeConfig.ShardID != shard.BeaconChainShardID && currentNode.NodeConfig.Role() != nodeconfig.ExplorerNode { + utils.Logger().Info().Uint32("shardID", currentNode.Blockchain().ShardID()).Uint32("shardID", nodeConfig.ShardID).Msg("SupportBeaconSyncing") + go currentNode.SupportBeaconSyncing() + } + + startMsg := "==== New Harmony Node ====" + if *nodeType == "explorer" { + startMsg = "==== New Explorer Node ====" + } + + utils.Logger().Info(). + Str("BlsPubKey", hex.EncodeToString(nodeConfig.ConsensusPubKey.Serialize())). + Uint32("ShardID", nodeConfig.ShardID). + Str("ShardGroupID", nodeConfig.GetShardGroupID().String()). + Str("BeaconGroupID", nodeConfig.GetBeaconGroupID().String()). + Str("ClientGroupID", nodeConfig.GetClientGroupID().String()). + Str("Role", currentNode.NodeConfig.Role().String()). + Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, myHost.GetID().Pretty())). + Msg(startMsg) + + if *enableMemProfiling { + memprofiling.GetMemProfiling().Start() + } + go currentNode.SupportSyncing() + currentNode.ServiceManagerSetup() + + currentNode.RunServices() + // RPC for SDK not supported for mainnet. + if err := currentNode.StartRPC(*port); err != nil { + utils.Logger().Warn(). + Err(err). + Msg("StartRPC failed") + } + + // Run additional node collectors + // Collect node metrics if metrics flag is set + if currentNode.NodeConfig.GetMetricsFlag() { + go currentNode.CollectMetrics() + } + // Commit committtee if node role is explorer + if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { + go currentNode.CommitCommittee() + } + + currentNode.StartServer() +} diff --git a/node/node.go b/node/node.go new file mode 100644 index 000000000..0abb2ce0f --- /dev/null +++ b/node/node.go @@ -0,0 +1,685 @@ +package node + +import ( + "crypto/ecdsa" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/accounts" + "github.com/harmony-one/harmony/api/client" + clientService "github.com/harmony-one/harmony/api/client/service" + msg_pb "github.com/harmony-one/harmony/api/proto/message" + proto_node "github.com/harmony-one/harmony/api/proto/node" + "github.com/harmony-one/harmony/api/service" + "github.com/harmony-one/harmony/api/service/syncing" + "github.com/harmony-one/harmony/api/service/syncing/downloader" + "github.com/harmony-one/harmony/block" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/consensus/reward" + "github.com/harmony-one/harmony/contracts" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/drand" + "github.com/harmony-one/harmony/internal/chain" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/params" + "github.com/harmony-one/harmony/internal/shardchain" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/msgq" + "github.com/harmony-one/harmony/node/worker" + "github.com/harmony-one/harmony/p2p" + p2p_host "github.com/harmony-one/harmony/p2p/host" + "github.com/harmony-one/harmony/shard" + "github.com/harmony-one/harmony/shard/committee" + staking "github.com/harmony-one/harmony/staking/types" +) + +// State is a state of a node. +type State byte + +// All constants except the NodeLeader below are for validators only. +const ( + NodeInit State = iota // Node just started, before contacting BeaconChain + NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard + NodeNotInSync // Node out of sync, might be just joined Shard or offline for a period of time + NodeOffline // Node is offline + NodeReadyForConsensus // Node is ready for doing consensus + NodeDoingConsensus // Node is already doing consensus + NodeLeader // Node is the leader of some shard. +) + +const ( + // TxPoolLimit is the limit of transaction pool. + TxPoolLimit = 20000 + // NumTryBroadCast is the number of times trying to broadcast + NumTryBroadCast = 3 + // ClientRxQueueSize is the number of client messages to queue before tail-dropping. + ClientRxQueueSize = 16384 + // ShardRxQueueSize is the number of shard messages to queue before tail-dropping. + ShardRxQueueSize = 16384 + // GlobalRxQueueSize is the number of global messages to queue before tail-dropping. + GlobalRxQueueSize = 16384 + // ClientRxWorkers is the number of concurrent client message handlers. + ClientRxWorkers = 8 + // ShardRxWorkers is the number of concurrent shard message handlers. + ShardRxWorkers = 32 + // GlobalRxWorkers is the number of concurrent global message handlers. + GlobalRxWorkers = 32 +) + +func (state State) String() string { + switch state { + case NodeInit: + return "NodeInit" + case NodeWaitToJoin: + return "NodeWaitToJoin" + case NodeNotInSync: + return "NodeNotInSync" + case NodeOffline: + return "NodeOffline" + case NodeReadyForConsensus: + return "NodeReadyForConsensus" + case NodeDoingConsensus: + return "NodeDoingConsensus" + case NodeLeader: + return "NodeLeader" + } + return "Unknown" +} + +const ( + maxBroadcastNodes = 10 // broadcast at most maxBroadcastNodes peers that need in sync + broadcastTimeout int64 = 60 * 1000000000 // 1 mins + //SyncIDLength is the length of bytes for syncID + SyncIDLength = 20 +) + +// use to push new block to outofsync node +type syncConfig struct { + timestamp int64 + client *downloader.Client +} + +// Node represents a protocol-participating node in the network +type Node struct { + Consensus *consensus.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits) + BlockChannel chan *types.Block // The channel to send newly proposed blocks + ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks + BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes + DRand *drand.DRand // The instance for distributed randomness protocol + pendingCrossLinks []*block.Header + pendingClMutex sync.Mutex + + pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus + pendingCXMutex sync.Mutex + + // Shard databases + shardChains shardchain.Collection + + Client *client.Client // The presence of a client object means this node will also act as a client + SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. + BCPeers []p2p.Peer // list of Beacon Chain Peers. This is needed by all nodes. + + // TODO: Neighbors should store only neighbor nodes in the same shard + Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer + numPeers int // Number of Peers + State State // State of the Node + stateMutex sync.Mutex // mutex for change node state + + // BeaconNeighbors store only neighbor nodes in the beacon chain shard + BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer + + TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below + + CxPool *core.CxPool // pool for missing cross shard receipts resend + + pendingTransactions map[common.Hash]*types.Transaction // All the transactions received but not yet processed for Consensus + pendingTxMutex sync.Mutex + recentTxsStats types.RecentTxsStats + + pendingStakingTransactions map[common.Hash]*staking.StakingTransaction // All the staking transactions received but not yet processed for Consensus + pendingStakingTxMutex sync.Mutex + + Worker *worker.Worker + BeaconWorker *worker.Worker // worker for beacon chain + + // Client server (for wallet requests) + clientServer *clientService.Server + + // Syncing component. + syncID [SyncIDLength]byte // a unique ID for the node during the state syncing process with peers + downloaderServer *downloader.Server + stateSync *syncing.StateSync + beaconSync *syncing.StateSync + peerRegistrationRecord map[string]*syncConfig // record registration time (unixtime) of peers begin in syncing + SyncingPeerProvider SyncingPeerProvider + + // syncing frequency parameters + syncFreq int + beaconSyncFreq int + + // The p2p host used to send/receive p2p messages + host p2p.Host + + // Incoming messages to process. + clientRxQueue *msgq.Queue + shardRxQueue *msgq.Queue + globalRxQueue *msgq.Queue + + // Service manager. + serviceManager *service.Manager + + // Demo account. + DemoContractAddress common.Address + LotteryManagerPrivateKey *ecdsa.PrivateKey + + // Puzzle account. + PuzzleContractAddress common.Address + PuzzleManagerPrivateKey *ecdsa.PrivateKey + + // For test only; TODO ek – remove this + TestBankKeys []*ecdsa.PrivateKey + + ContractDeployerKey *ecdsa.PrivateKey + ContractDeployerCurrentNonce uint64 // The nonce of the deployer contract at current block + ContractAddresses []common.Address + + // For puzzle contracts + AddressNonce sync.Map + + // Shard group Message Receiver + shardGroupReceiver p2p.GroupReceiver + + // Global group Message Receiver, communicate with beacon chain, or cross-shard TX + globalGroupReceiver p2p.GroupReceiver + + // Client Message Receiver to handle light client messages + // Beacon leader needs to use this receiver to talk to new node + clientReceiver p2p.GroupReceiver + + // Duplicated Ping Message Received + duplicatedPing sync.Map + + // Channel to notify consensus service to really start consensus + startConsensus chan struct{} + + // node configuration, including group ID, shard ID, etc + NodeConfig *nodeconfig.ConfigType + + // Chain configuration. + chainConfig params.ChainConfig + + // map of service type to its message channel. + serviceMessageChan map[service.Type]chan *msg_pb.Message + + // Used to call smart contract locally + ContractCaller *contracts.ContractCaller + + accountManager *accounts.Manager + + // Next shard state + nextShardState struct { + // The received master shard state + master *shard.EpochShardState + + // When for a leader to propose the next shard state, + // or for a validator to wait for a proposal before view change. + // TODO ek – replace with retry-based logic instead of delay + proposeTime time.Time + } + + isFirstTime bool // the node was started with a fresh database + // How long in second the leader needs to wait to propose a new block. + BlockPeriod time.Duration + + // last time consensus reached for metrics + lastConsensusTime int64 +} + +// Blockchain returns the blockchain for the node's current shard. +func (node *Node) Blockchain() *core.BlockChain { + shardID := node.NodeConfig.ShardID + bc, err := node.shardChains.ShardChain(shardID) + if err != nil { + utils.Logger().Error(). + Uint32("shardID", shardID). + Err(err). + Msg("cannot get shard chain") + } + return bc +} + +// Beaconchain returns the beaconchain from node. +func (node *Node) Beaconchain() *core.BlockChain { + bc, err := node.shardChains.ShardChain(0) + if err != nil { + utils.Logger().Error().Err(err).Msg("cannot get beaconchain") + } + return bc +} + +func (node *Node) tryBroadcast(tx *types.Transaction) { + msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) + + shardGroupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(tx.ShardID())) + utils.Logger().Info().Str("shardGroupID", string(shardGroupID)).Msg("tryBroadcast") + + for attempt := 0; attempt < NumTryBroadCast; attempt++ { + if err := node.host.SendMessageToGroups([]nodeconfig.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast { + utils.Logger().Error().Int("attempt", attempt).Msg("Error when trying to broadcast tx") + } else { + break + } + } +} + +// Add new transactions to the pending transaction list. +func (node *Node) addPendingTransactions(newTxs types.Transactions) { + txPoolLimit := shard.Schedule.MaxTxPoolSizeLimit() + node.pendingTxMutex.Lock() + for _, tx := range newTxs { + if _, ok := node.pendingTransactions[tx.Hash()]; !ok { + node.pendingTransactions[tx.Hash()] = tx + } + if len(node.pendingTransactions) > txPoolLimit { + break + } + } + node.pendingTxMutex.Unlock() + utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions") +} + +// Add new staking transactions to the pending staking transaction list. +func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) { + txPoolLimit := shard.Schedule.MaxTxPoolSizeLimit() + node.pendingStakingTxMutex.Lock() + for _, tx := range newStakingTxs { + if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok { + node.pendingStakingTransactions[tx.Hash()] = tx + } + if len(node.pendingStakingTransactions) > txPoolLimit { + break + } + } + node.pendingStakingTxMutex.Unlock() + utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions") +} + +// AddPendingStakingTransaction staking transactions +func (node *Node) AddPendingStakingTransaction( + newStakingTx *staking.StakingTransaction) { + node.addPendingStakingTransactions(staking.StakingTransactions{newStakingTx}) +} + +// AddPendingTransaction adds one new transaction to the pending transaction list. +// This is only called from SDK. +func (node *Node) AddPendingTransaction(newTx *types.Transaction) { + if node.Consensus.IsLeader() && newTx.ShardID() == node.NodeConfig.ShardID { + node.addPendingTransactions(types.Transactions{newTx}) + } else { + utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx") + node.tryBroadcast(newTx) + } + utils.Logger().Debug().Int("totalPending", len(node.pendingTransactions)).Msg("Got ONE more transaction") +} + +// AddPendingReceipts adds one receipt message to pending list. +func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) { + node.pendingCXMutex.Lock() + defer node.pendingCXMutex.Unlock() + + if receipts.ContainsEmptyField() { + utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("CXReceiptsProof contains empty field") + return + } + + blockNum := receipts.Header.Number().Uint64() + shardID := receipts.Header.ShardID() + key := utils.GetPendingCXKey(shardID, blockNum) + + if _, ok := node.pendingCXReceipts[key]; ok { + utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Already Got Same Receipt message") + return + } + node.pendingCXReceipts[key] = receipts + utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message") +} + +// Take out a subset of valid transactions from the pending transaction list +// Note the pending transaction list will then contain the rest of the txs +func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Transactions, staking.StakingTransactions) { + txsThrottleConfig := shard.Schedule.TxsThrottleConfig() + + // the next block number to be added in consensus protocol, which is always one more than current chain header block + newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1 + // remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block + for blockNum := range node.recentTxsStats { + recentTxsBlockNumGap := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod) + if recentTxsBlockNumGap < newBlockNum-blockNum { + delete(node.recentTxsStats, blockNum) + } + } + node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts) + // Must update to the correct current state before processing potential txns + if err := node.Worker.UpdateCurrent(coinbase); err != nil { + utils.Logger().Error(). + Err(err). + Msg("Failed updating worker's state before txn selection") + return types.Transactions{}, staking.StakingTransactions{} + } + + node.pendingTxMutex.Lock() + defer node.pendingTxMutex.Unlock() + node.pendingStakingTxMutex.Lock() + defer node.pendingStakingTxMutex.Unlock() + pendingTransactions := types.Transactions{} + pendingStakingTransactions := staking.StakingTransactions{} + for _, tx := range node.pendingTransactions { + pendingTransactions = append(pendingTransactions, tx) + } + for _, tx := range node.pendingStakingTransactions { + pendingStakingTransactions = append(pendingStakingTransactions, tx) + } + + selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) + + selectedStaking, unselectedStaking, invalidStaking := + node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, coinbase) + + node.pendingTransactions = make(map[common.Hash]*types.Transaction) + for _, unselectedTx := range unselected { + node.pendingTransactions[unselectedTx.Hash()] = unselectedTx + } + utils.Logger().Info(). + Int("remainPending", len(node.pendingTransactions)). + Int("selected", len(selected)). + Int("invalidDiscarded", len(invalid)). + Msg("Selecting Transactions") + + node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction) + for _, unselectedStakingTx := range unselectedStaking { + node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx + } + utils.Logger().Info(). + Int("remainPending", len(node.pendingStakingTransactions)). + Int("selected", len(unselectedStaking)). + Int("invalidDiscarded", len(invalidStaking)). + Msg("Selecting Staking Transactions") + + return selected, selectedStaking +} + +func (node *Node) startRxPipeline( + receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int, +) { + // consumers + for i := 0; i < numWorkers; i++ { + go queue.HandleMessages(node) + } + // provider + go node.receiveGroupMessage(receiver, queue) +} + +// StartServer starts a server and process the requests by a handler. +func (node *Node) StartServer() { + + // client messages are sent by clients, like txgen, wallet + node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers) + + // start the goroutine to receive group message + node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers) + + // start the goroutine to receive global message, used for cross-shard TX + // FIXME (leo): we use beacon client topic as the global topic for now + node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers) + + select {} +} + +// Count the total number of transactions in the blockchain +// Currently used for stats reporting purpose +func (node *Node) countNumTransactionsInBlockchain() int { + count := 0 + for block := node.Blockchain().CurrentBlock(); block != nil; block = node.Blockchain().GetBlockByHash(block.Header().ParentHash()) { + count += len(block.Transactions()) + } + return count +} + +// GetSyncID returns the syncID of this node +func (node *Node) GetSyncID() [SyncIDLength]byte { + return node.syncID +} + +// New creates a new node. +func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node { + node := Node{} + + node.syncFreq = SyncFrequency + node.beaconSyncFreq = SyncFrequency + + // Get the node config that's created in the harmony.go program. + if consensusObj != nil { + node.NodeConfig = nodeconfig.GetShardConfig(consensusObj.ShardID) + } else { + node.NodeConfig = nodeconfig.GetDefaultConfig() + } + + copy(node.syncID[:], GenerateRandomString(SyncIDLength)) + if host != nil { + node.host = host + node.SelfPeer = host.GetSelfPeer() + } + + chainConfig := *params.TestnetChainConfig + switch node.NodeConfig.GetNetworkType() { + case nodeconfig.Mainnet: + chainConfig = *params.MainnetChainConfig + case nodeconfig.Pangaea: + chainConfig = *params.PangaeaChainConfig + } + node.chainConfig = chainConfig + + collection := shardchain.NewCollection( + chainDBFactory, &genesisInitializer{&node}, chain.Engine, &chainConfig, + ) + if isArchival { + collection.DisableCache() + } + node.shardChains = collection + + if host != nil && consensusObj != nil { + // Consensus and associated channel to communicate blocks + node.Consensus = consensusObj + + // Load the chains. + blockchain := node.Blockchain() // this also sets node.isFirstTime if the DB is fresh + beaconChain := node.Beaconchain() + + node.BlockChannel = make(chan *types.Block) + node.ConfirmedBlockChannel = make(chan *types.Block) + node.BeaconBlockChannel = make(chan *types.Block) + node.recentTxsStats = make(types.RecentTxsStats) + node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain) + node.CxPool = core.NewCxPool(core.CxPoolSize) + node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine) + + if node.Blockchain().ShardID() != shard.BeaconChainShardID { + node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine) + } + + node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof) + node.pendingTransactions = make(map[common.Hash]*types.Transaction) + node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction) + node.Consensus.VerifiedNewBlock = make(chan *types.Block) + chain.Engine.SetRewarder(node.Consensus.Decider.(reward.Distributor)) + // the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block + node.Consensus.SetBlockNum(blockchain.CurrentBlock().NumberU64() + 1) + + // Add Faucet contract to all shards, so that on testnet, we can demo wallet in explorer + // TODO (leo): we need to have support of cross-shard tx later so that the token can be transferred from beacon chain shard to other tx shards. + if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { + if node.isFirstTime { + // Setup one time smart contracts + node.AddFaucetContractToPendingTransactions() + } else { + node.AddContractKeyAndAddress(scFaucet) + } + node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), node.Blockchain().Config()) + // Create test keys. Genesis will later need this. + var err error + node.TestBankKeys, err = CreateTestBankKeys(TestAccountNumber) + if err != nil { + utils.Logger().Error().Err(err).Msg("Error while creating test keys") + } + } + } + + utils.Logger().Info(). + Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)). + Msg("Genesis block hash") + + node.clientRxQueue = msgq.New(ClientRxQueueSize) + node.shardRxQueue = msgq.New(ShardRxQueueSize) + node.globalRxQueue = msgq.New(GlobalRxQueueSize) + + // Setup initial state of syncing. + node.peerRegistrationRecord = make(map[string]*syncConfig) + node.startConsensus = make(chan struct{}) + go node.bootstrapConsensus() + return &node +} + +// InitConsensusWithValidators initialize shard state from latest epoch and update committee pub +// keys for consensus and drand +func (node *Node) InitConsensusWithValidators() (err error) { + if node.Consensus == nil { + utils.Logger().Error().Msg("[InitConsensusWithValidators] consenus is nil; Cannot figure out shardID") + return ctxerror.New("[InitConsensusWithValidators] consenus is nil; Cannot figure out shardID") + } + shardID := node.Consensus.ShardID + blockNum := node.Blockchain().CurrentBlock().NumberU64() + node.Consensus.SetMode(consensus.Listening) + epoch := shard.Schedule.CalcEpochNumber(blockNum) + utils.Logger().Info(). + Uint64("blockNum", blockNum). + Uint32("shardID", shardID). + Uint64("epoch", epoch.Uint64()). + Msg("[InitConsensusWithValidators] Try To Get PublicKeys") + _, pubKeys := committee.WithStakingEnabled.ComputePublicKeys( + epoch, node.Consensus.ChainReader, int(shardID), + ) + if len(pubKeys) == 0 { + utils.Logger().Error(). + Uint32("shardID", shardID). + Uint64("blockNum", blockNum). + Msg("[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys") + return ctxerror.New( + "[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys", + "shardID", shardID, + "blockNum", blockNum) + } + + for i := range pubKeys { + if pubKeys[i].IsEqual(node.Consensus.PubKey) { + utils.Logger().Info(). + Uint64("blockNum", blockNum). + Int("numPubKeys", len(pubKeys)). + Msg("[InitConsensusWithValidators] Successfully updated public keys") + node.Consensus.UpdatePublicKeys(pubKeys) + node.Consensus.SetMode(consensus.Normal) + return nil + } + } + // TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection. + // node.DRand.UpdatePublicKeys(pubKeys) + return nil +} + +// AddPeers adds neighbors nodes +func (node *Node) AddPeers(peers []*p2p.Peer) int { + count := 0 + for _, p := range peers { + key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID) + _, ok := node.Neighbors.LoadOrStore(key, *p) + if !ok { + // !ok means new peer is stored + count++ + node.host.AddPeer(p) + node.numPeers++ + continue + } + } + + return count +} + +// AddBeaconPeer adds beacon chain neighbors nodes +// Return false means new neighbor peer was added +// Return true means redundant neighbor peer wasn't added +func (node *Node) AddBeaconPeer(p *p2p.Peer) bool { + key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID) + _, ok := node.BeaconNeighbors.LoadOrStore(key, *p) + return ok +} + +// isBeacon = true if the node is beacon node +// isClient = true if the node light client(wallet) +func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { + chanPeer := make(chan p2p.Peer) + + nodeConfig := service.NodeConfig{ + PushgatewayIP: node.NodeConfig.GetPushgatewayIP(), + PushgatewayPort: node.NodeConfig.GetPushgatewayPort(), + IsClient: node.NodeConfig.IsClient(), + Beacon: nodeconfig.NewGroupIDByShardID(0), + ShardGroupID: node.NodeConfig.GetShardGroupID(), + Actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType), + } + + if nodeConfig.IsClient { + nodeConfig.Actions[nodeconfig.NewClientGroupIDByShardID(0)] = nodeconfig.ActionStart + } else { + nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = nodeconfig.ActionStart + } + + var err error + node.shardGroupReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetShardGroupID()) + if err != nil { + utils.Logger().Error().Err(err).Msg("Failed to create shard receiver") + } + + node.globalGroupReceiver, err = node.host.GroupReceiver(nodeconfig.NewClientGroupIDByShardID(0)) + if err != nil { + utils.Logger().Error().Err(err).Msg("Failed to create global receiver") + } + + node.clientReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetClientGroupID()) + if err != nil { + utils.Logger().Error().Err(err).Msg("Failed to create client receiver") + } + return nodeConfig, chanPeer +} + +// AccountManager ... +func (node *Node) AccountManager() *accounts.Manager { + return node.accountManager +} + +// ServiceManager ... +func (node *Node) ServiceManager() *service.Manager { + return node.serviceManager +} + +// SetSyncFreq sets the syncing frequency in the loop +func (node *Node) SetSyncFreq(syncFreq int) { + node.syncFreq = syncFreq +} + +// SetBeaconSyncFreq sets the syncing frequency in the loop +func (node *Node) SetBeaconSyncFreq(syncFreq int) { + node.beaconSyncFreq = syncFreq +}