diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 44be6f380..02db9f66a 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -19,6 +19,7 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" + p2p_host "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/p2pimpl" peerstore "github.com/libp2p/go-libp2p-peerstore" multiaddr "github.com/multiformats/go-multiaddr" @@ -70,40 +71,64 @@ func main() { // Add GOMAXPROCS to achieve max performance. runtime.GOMAXPROCS(1024) - var bcPeer *p2p.Peer - var shardIDLeaderMap map[uint32]p2p.Peer - priKey, _, err := utils.LoadKeyFromFile(*keyFile) - if err != nil { - panic(err) - } + // Logging setup + utils.SetPortAndIP(*port, *ip) - if *bcAddr != "" { - // Turn the destination into a multiaddr. - maddr, err := multiaddr.NewMultiaddr(*bcAddr) + if len(utils.BootNodes) == 0 { + bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings) if err != nil { panic(err) } + utils.BootNodes = bootNodeAddrs + } - // Extract the peer ID from the multiaddr. - info, err := peerstore.InfoFromP2pAddr(maddr) - if err != nil { - panic(err) - } + var shardIDLeaderMap map[uint32]p2p.Peer = make(map[uint32]p2p.Peer) + shardIDLeaderMap[0] = p2p.Peer{} - bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID} - } else { - bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} + nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile) + if err != nil { + panic(err) } - candidateNode := newnode.New(*ip, *port, priKey) - candidateNode.AddPeer(bcPeer) - candidateNode.ContactBeaconChain(*bcPeer) - selfPeer := candidateNode.GetSelfPeer() - selfPeer.PubKey = candidateNode.PubK + peerPriKey, peerPubKey := utils.GenKey(*ip, *port) + if peerPriKey == nil || peerPubKey == nil { + panic(fmt.Errorf("generate key error")) + } + + selfPeer := p2p.Peer{IP: *ip, Port: *port, ValidatorID: -1, PubKey: peerPubKey} + + if !*libp2pPD { + var bcPeer *p2p.Peer + if *bcAddr != "" { + // Turn the destination into a multiaddr. + maddr, err := multiaddr.NewMultiaddr(*bcAddr) + if err != nil { + panic(err) + } + + // Extract the peer ID from the multiaddr. + info, err := peerstore.InfoFromP2pAddr(maddr) + if err != nil { + panic(err) + } - shardIDLeaderMap = candidateNode.Leaders + bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID} + } else { + bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} + } - debugPrintShardIDLeaderMap(shardIDLeaderMap) + candidateNode := newnode.New(*ip, *port, nodePriKey) + candidateNode.AddPeer(bcPeer) + candidateNode.ContactBeaconChain(*bcPeer) + selfPeer := candidateNode.GetSelfPeer() + selfPeer.PubKey = candidateNode.PubK + + shardIDLeaderMap = candidateNode.Leaders + + debugPrintShardIDLeaderMap(shardIDLeaderMap) + } else { + utils.UseLibP2P = true + } // Do cross shard tx if there are more than one shard setting := txgen.Settings{ @@ -124,7 +149,7 @@ func main() { // Nodes containing blockchain data to mirror the shards' data in the network nodes := []*node.Node{} - host, err := p2pimpl.NewHost(&selfPeer, priKey) + host, err := p2pimpl.NewHost(&selfPeer, nodePriKey) if err != nil { panic("unable to new host in txgen") } @@ -147,15 +172,15 @@ func main() { }() // This func is used to update the client's blockchain when new blocks are received from the leaders updateBlocksFunc := func(blocks []*types.Block) { - log.Info("[Txgen] Received new block", "block", blocks) + utils.GetLogInstance().Info("[Txgen] Received new block", "block", blocks) for _, block := range blocks { for _, node := range nodes { shardID := block.ShardID() if node.Consensus.ShardID == shardID { // Add it to blockchain - log.Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex()) - log.Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex()) + utils.GetLogInstance().Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex()) + utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex()) node.AddNewBlock(block) stateMutex.Lock() node.Worker.UpdateCurrent() @@ -169,24 +194,24 @@ func main() { } clientNode.Client.UpdateBlocks = updateBlocksFunc - // Start the client server to listen to leader's message - go clientNode.StartServer() - for _, leader := range shardIDLeaderMap { - log.Debug("Client Join Shard", "leader", leader) - clientNode.GetHost().AddPeer(&leader) if *libp2pPD { clientNode.Role = node.NewNode } else { + clientNode.GetHost().AddPeer(&leader) + utils.GetLogInstance().Debug("Client Join Shard", "leader", leader) go clientNode.JoinShard(leader) } clientNode.State = node.NodeReadyForConsensus } + if *libp2pPD { clientNode.ServiceManagerSetup() clientNode.RunServices() - clientNode.StartServer() + go clientNode.StartServer() } else { + // Start the client server to listen to leader's message + go clientNode.StartServer() // wait for 1 seconds for client to send ping message to leader time.Sleep(time.Second) clientNode.StopPing <- struct{}{} @@ -194,14 +219,14 @@ func main() { clientNode.State = node.NodeReadyForConsensus // Transaction generation process - time.Sleep(2 * time.Second) // wait for nodes to be ready + time.Sleep(5 * time.Second) // wait for nodes to be ready start := time.Now() totalTime := float64(*duration) for { t := time.Now() if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { - log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) + utils.GetLogInstance().Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) break } select { @@ -210,7 +235,7 @@ func main() { lock := sync.Mutex{} stateMutex.Lock() - log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) + utils.GetLogInstance().Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) lock.Lock() @@ -226,26 +251,34 @@ func main() { }(shardID, txs) } lock.Unlock() - case <-time.After(2 * time.Second): - log.Warn("No new block is received so far") + case <-time.After(10 * time.Second): + utils.GetLogInstance().Warn("No new block is received so far") } } // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg) + if utils.UseLibP2P { + clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + } else { + clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg) + } time.Sleep(3000 * time.Millisecond) } // SendTxsToLeader sends txs to leader account. func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) { - log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) + utils.GetLogInstance().Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessageAccount(txs) - clientNode.SendMessage(leader, msg) + if utils.UseLibP2P { + clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + } else { + clientNode.SendMessage(leader, msg) + } } func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) { for k, v := range leaderMap { - log.Debug("Leader", "ShardID", k, "Leader", v) + utils.GetLogInstance().Debug("Leader", "ShardID", k, "Leader", v) } }