modify txgen using libp2p

Signed-off-by: Leo Chen <leo@harmony.one>
pull/467/head
Leo Chen 6 years ago
parent 7a1ac90a5b
commit f8e3da4ee4
  1. 75
      cmd/client/txgen/main.go

@ -19,6 +19,7 @@ import (
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl" "github.com/harmony-one/harmony/p2p/p2pimpl"
peerstore "github.com/libp2p/go-libp2p-peerstore" peerstore "github.com/libp2p/go-libp2p-peerstore"
multiaddr "github.com/multiformats/go-multiaddr" multiaddr "github.com/multiformats/go-multiaddr"
@ -70,13 +71,34 @@ func main() {
// Add GOMAXPROCS to achieve max performance. // Add GOMAXPROCS to achieve max performance.
runtime.GOMAXPROCS(1024) runtime.GOMAXPROCS(1024)
var bcPeer *p2p.Peer // Logging setup
var shardIDLeaderMap map[uint32]p2p.Peer utils.SetPortAndIP(*port, *ip)
priKey, _, err := utils.LoadKeyFromFile(*keyFile)
if len(utils.BootNodes) == 0 {
bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings)
if err != nil {
panic(err)
}
utils.BootNodes = bootNodeAddrs
}
var shardIDLeaderMap map[uint32]p2p.Peer = make(map[uint32]p2p.Peer)
shardIDLeaderMap[0] = p2p.Peer{}
nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile)
if err != nil { if err != nil {
panic(err) panic(err)
} }
peerPriKey, peerPubKey := utils.GenKey(*ip, *port)
if peerPriKey == nil || peerPubKey == nil {
panic(fmt.Errorf("generate key error"))
}
selfPeer := p2p.Peer{IP: *ip, Port: *port, ValidatorID: -1, PubKey: peerPubKey}
if !*libp2pPD {
var bcPeer *p2p.Peer
if *bcAddr != "" { if *bcAddr != "" {
// Turn the destination into a multiaddr. // Turn the destination into a multiaddr.
maddr, err := multiaddr.NewMultiaddr(*bcAddr) maddr, err := multiaddr.NewMultiaddr(*bcAddr)
@ -95,7 +117,7 @@ func main() {
bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort}
} }
candidateNode := newnode.New(*ip, *port, priKey) candidateNode := newnode.New(*ip, *port, nodePriKey)
candidateNode.AddPeer(bcPeer) candidateNode.AddPeer(bcPeer)
candidateNode.ContactBeaconChain(*bcPeer) candidateNode.ContactBeaconChain(*bcPeer)
selfPeer := candidateNode.GetSelfPeer() selfPeer := candidateNode.GetSelfPeer()
@ -104,6 +126,9 @@ func main() {
shardIDLeaderMap = candidateNode.Leaders shardIDLeaderMap = candidateNode.Leaders
debugPrintShardIDLeaderMap(shardIDLeaderMap) debugPrintShardIDLeaderMap(shardIDLeaderMap)
} else {
utils.UseLibP2P = true
}
// Do cross shard tx if there are more than one shard // Do cross shard tx if there are more than one shard
setting := txgen.Settings{ setting := txgen.Settings{
@ -124,7 +149,7 @@ func main() {
// Nodes containing blockchain data to mirror the shards' data in the network // Nodes containing blockchain data to mirror the shards' data in the network
nodes := []*node.Node{} nodes := []*node.Node{}
host, err := p2pimpl.NewHost(&selfPeer, priKey) host, err := p2pimpl.NewHost(&selfPeer, nodePriKey)
if err != nil { if err != nil {
panic("unable to new host in txgen") panic("unable to new host in txgen")
} }
@ -147,15 +172,15 @@ func main() {
}() }()
// This func is used to update the client's blockchain when new blocks are received from the leaders // This func is used to update the client's blockchain when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*types.Block) { updateBlocksFunc := func(blocks []*types.Block) {
log.Info("[Txgen] Received new block", "block", blocks) utils.GetLogInstance().Info("[Txgen] Received new block", "block", blocks)
for _, block := range blocks { for _, block := range blocks {
for _, node := range nodes { for _, node := range nodes {
shardID := block.ShardID() shardID := block.ShardID()
if node.Consensus.ShardID == shardID { if node.Consensus.ShardID == shardID {
// Add it to blockchain // Add it to blockchain
log.Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex()) utils.GetLogInstance().Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex())
log.Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex()) utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex())
node.AddNewBlock(block) node.AddNewBlock(block)
stateMutex.Lock() stateMutex.Lock()
node.Worker.UpdateCurrent() node.Worker.UpdateCurrent()
@ -169,24 +194,24 @@ func main() {
} }
clientNode.Client.UpdateBlocks = updateBlocksFunc clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go clientNode.StartServer()
for _, leader := range shardIDLeaderMap { for _, leader := range shardIDLeaderMap {
log.Debug("Client Join Shard", "leader", leader)
clientNode.GetHost().AddPeer(&leader)
if *libp2pPD { if *libp2pPD {
clientNode.Role = node.NewNode clientNode.Role = node.NewNode
} else { } else {
clientNode.GetHost().AddPeer(&leader)
utils.GetLogInstance().Debug("Client Join Shard", "leader", leader)
go clientNode.JoinShard(leader) go clientNode.JoinShard(leader)
} }
clientNode.State = node.NodeReadyForConsensus clientNode.State = node.NodeReadyForConsensus
} }
if *libp2pPD { if *libp2pPD {
clientNode.ServiceManagerSetup() clientNode.ServiceManagerSetup()
clientNode.RunServices() clientNode.RunServices()
clientNode.StartServer() go clientNode.StartServer()
} else { } else {
// Start the client server to listen to leader's message
go clientNode.StartServer()
// wait for 1 seconds for client to send ping message to leader // wait for 1 seconds for client to send ping message to leader
time.Sleep(time.Second) time.Sleep(time.Second)
clientNode.StopPing <- struct{}{} clientNode.StopPing <- struct{}{}
@ -194,14 +219,14 @@ func main() {
clientNode.State = node.NodeReadyForConsensus clientNode.State = node.NodeReadyForConsensus
// Transaction generation process // Transaction generation process
time.Sleep(2 * time.Second) // wait for nodes to be ready time.Sleep(5 * time.Second) // wait for nodes to be ready
start := time.Now() start := time.Now()
totalTime := float64(*duration) totalTime := float64(*duration)
for { for {
t := time.Now() t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) utils.GetLogInstance().Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break break
} }
select { select {
@ -210,7 +235,7 @@ func main() {
lock := sync.Mutex{} lock := sync.Mutex{}
stateMutex.Lock() stateMutex.Lock()
log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) utils.GetLogInstance().Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting)
lock.Lock() lock.Lock()
@ -226,26 +251,34 @@ func main() {
}(shardID, txs) }(shardID, txs)
} }
lock.Unlock() lock.Unlock()
case <-time.After(2 * time.Second): case <-time.After(10 * time.Second):
log.Warn("No new block is received so far") utils.GetLogInstance().Warn("No new block is received so far")
} }
} }
// Send a stop message to stop the nodes at the end // Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage() msg := proto_node.ConstructStopMessage()
if utils.UseLibP2P {
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg) clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg)
}
time.Sleep(3000 * time.Millisecond) time.Sleep(3000 * time.Millisecond)
} }
// SendTxsToLeader sends txs to leader account. // SendTxsToLeader sends txs to leader account.
func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) { func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) {
log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) utils.GetLogInstance().Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessageAccount(txs) msg := proto_node.ConstructTransactionListMessageAccount(txs)
if utils.UseLibP2P {
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientNode.SendMessage(leader, msg) clientNode.SendMessage(leader, msg)
}
} }
func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) { func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) {
for k, v := range leaderMap { for k, v := range leaderMap {
log.Debug("Leader", "ShardID", k, "Leader", v) utils.GetLogInstance().Debug("Leader", "ShardID", k, "Leader", v)
} }
} }

Loading…
Cancel
Save