From 2c0fc8d2f955de8f8b78656d888527bc60bb1665 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 00:33:11 -0700 Subject: [PATCH 1/8] Fix p2p cpu issue --- api/service/networkinfo/service.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 7145399a5..d527edaed 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -143,14 +143,19 @@ func (s *Service) Run() { return } - var err error - s.peerInfo, err = s.discovery.FindPeers(ctx, string(s.Rendezvous)) - if err != nil { - utils.GetLogInstance().Error("FindPeers", "error", err) - return - } + go func() { + for { + var err error + s.peerInfo, err = s.discovery.FindPeers(ctx, string(s.Rendezvous)) + if err != nil { + utils.GetLogInstance().Error("FindPeers", "error", err) + return + } - go s.DoService() + s.DoService() + time.Sleep(60 * time.Second) + } + }() } // DoService does network info. @@ -163,7 +168,11 @@ func (s *Service) DoService() { tick := time.NewTicker(dhtTicker) for { select { - case peer := <-s.peerInfo: + case peer, ok := <-s.peerInfo: + if !ok { + utils.GetLogInstance().Info("PeerInfo Channel Closed.") + return + } if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { // utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil { From dc4e79d2695c03ba0e6775b6b2f3f29685c03684 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 00:51:07 -0700 Subject: [PATCH 2/8] Fix mistake --- api/service/networkinfo/service.go | 95 +++++++++++++++--------------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index d527edaed..6751ae131 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -7,6 +7,8 @@ import ( "sync" "time" + manet "github.com/multiformats/go-multiaddr-net" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" @@ -14,7 +16,6 @@ import ( libp2pdis "github.com/libp2p/go-libp2p-discovery" libp2pdht "github.com/libp2p/go-libp2p-kad-dht" peerstore "github.com/libp2p/go-libp2p-peerstore" - manet "github.com/multiformats/go-multiaddr-net" ) // Service is the network info service. @@ -143,8 +144,20 @@ func (s *Service) Run() { return } - go func() { - for { + go s.DoService() +} + +// DoService does network info. +func (s *Service) DoService() { + tick := time.NewTicker(dhtTicker) + for { + select { + case <-s.stopChan: + return + case <-tick.C: + libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) + utils.GetLogInstance().Info("Successfully announced!", "Rendezvous", string(s.Rendezvous)) + default: var err error s.peerInfo, err = s.discovery.FindPeers(ctx, string(s.Rendezvous)) if err != nil { @@ -152,64 +165,54 @@ func (s *Service) Run() { return } - s.DoService() + s.findPeers() time.Sleep(60 * time.Second) } - }() + } } -// DoService does network info. -func (s *Service) DoService() { +func (s *Service) findPeers() { _, cgnPrefix, err := net.ParseCIDR("100.64.0.0/10") if err != nil { utils.GetLogInstance().Error("can't parse CIDR", "error", err) return } - tick := time.NewTicker(dhtTicker) - for { - select { - case peer, ok := <-s.peerInfo: - if !ok { - utils.GetLogInstance().Info("PeerInfo Channel Closed.") - return + for peer := range s.peerInfo { + utils.GetLogInstance().Info("Got peers", "peer", peer) + if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { + // utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) + if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil { + utils.GetLogInstance().Warn("can't connect to peer node", "error", err, "peer", peer) + // break if the node can't connect to peers, waiting for another peer + break + } else { + utils.GetLogInstance().Info("connected to peer node", "peer", peer) } - if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { - // utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) - if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil { - utils.GetLogInstance().Warn("can't connect to peer node", "error", err, "peer", peer) - // break if the node can't connect to peers, waiting for another peer - break - } else { - utils.GetLogInstance().Info("connected to peer node", "peer", peer) - } - // figure out the public ip/port - var ip, port string - - for _, addr := range peer.Addrs { - netaddr, err := manet.ToNetAddr(addr) - if err != nil { - continue - } - nip := netaddr.(*net.TCPAddr).IP - if (nip.IsGlobalUnicast() && !utils.IsPrivateIP(nip)) || cgnPrefix.Contains(nip) { - ip = nip.String() - port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port) - break - } + // figure out the public ip/port + var ip, port string + + for _, addr := range peer.Addrs { + netaddr, err := manet.ToNetAddr(addr) + if err != nil { + continue } - p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs} - utils.GetLogInstance().Info("Notify peerChan", "peer", p) - if s.peerChan != nil { - s.peerChan <- p + nip := netaddr.(*net.TCPAddr).IP + if (nip.IsGlobalUnicast() && !utils.IsPrivateIP(nip)) || cgnPrefix.Contains(nip) { + ip = nip.String() + port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port) + break } } - case <-s.stopChan: - return - case <-tick.C: - libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) - utils.GetLogInstance().Info("Successfully announced!", "Rendezvous", string(s.Rendezvous)) + p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs} + utils.GetLogInstance().Info("Notify peerChan", "peer", p) + if s.peerChan != nil { + s.peerChan <- p + } } } + + utils.GetLogInstance().Info("PeerInfo Channel Closed.") + return } // StopService stops network info service. From 2644d59bea495da114501b9db071e002f7125eb4 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 00:56:58 -0700 Subject: [PATCH 3/8] Make find peer interval const --- api/service/networkinfo/service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 6751ae131..0435c8dff 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -46,6 +46,7 @@ var ( const ( waitInRetry = 2 * time.Second connectionTimeout = 3 * time.Minute + findPeerInterval = 1 * time.Minute // register to bootnode every ticker dhtTicker = 6 * time.Hour @@ -166,7 +167,7 @@ func (s *Service) DoService() { } s.findPeers() - time.Sleep(60 * time.Second) + time.Sleep(findPeerInterval) } } } From 6e0cb6ac3cdb0046ad24bab475d8ae9b683269fd Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 02:21:13 -0700 Subject: [PATCH 4/8] Remove threshold --- consensus/consensus_v2.go | 2 +- node/node.go | 3 +-- node/node_newblock.go | 20 +++++--------------- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 08d36dafc..d571431b6 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -403,7 +403,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { if consensus.BlockVerifier == nil { // do nothing } else if err := consensus.BlockVerifier(&blockObj); err != nil { - consensus.getLogger().Info("[OnPrepared] Block verification faied") + consensus.getLogger().Info("[OnPrepared] Block verification failed", "error", err) return } } diff --git a/node/node.go b/node/node.go index 07c0b0791..cf0c85211 100644 --- a/node/node.go +++ b/node/node.go @@ -370,8 +370,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc } } - utils.GetLogInstance().Info("Genesis block hash", - "hash", node.Blockchain().GetBlockByNumber(0).Hash().Hex(), "state root", node.Blockchain().GetBlockByNumber(0).Header().Root.Hex()) + utils.GetLogInstance().Info("Genesis block hash", "genesis block header", node.Blockchain().GetBlockByNumber(0).Header()) if consensusObj != nil && nodeconfig.GetDefaultConfig().IsLeader() { node.State = NodeLeader } else { diff --git a/node/node_newblock.go b/node/node_newblock.go index 76eaa3481..2cb4c98b5 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -16,10 +16,8 @@ import ( // Constants of lower bound limit of a new block. const ( - DefaultThreshold = 1 - FirstTimeThreshold = 2 - ConsensusTimeOut = 30 - PeriodicBlock = 1 * time.Second + ConsensusTimeOut = 30 + PeriodicBlock = 1 * time.Second ) // WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus. @@ -33,7 +31,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch utils.GetLogInstance().Debug("Waiting for Consensus ready") time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) - firstTime := true timeoutCount := 0 var newBlock *types.Block @@ -57,14 +54,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch } case <-readySignal: for { - // threshold and firstTime are for the test-only built-in smart contract tx. - // TODO: remove in production - threshold := DefaultThreshold - if firstTime { - threshold = FirstTimeThreshold - firstTime = false - } - if len(node.pendingTransactions) < threshold && time.Now().Before(deadline) { + if time.Now().Before(deadline) { time.Sleep(PeriodicBlock) continue } @@ -75,7 +65,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { selectedTxs = node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock, coinbase) } - utils.GetLogInstance().Info("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "selectedTxs", len(selectedTxs)) + utils.GetLogInstance().Info("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "selectedTxs", len(selectedTxs)) if err := node.Worker.CommitTransactions(selectedTxs, coinbase); err != nil { ctxerror.Log15(utils.GetLogger().Error, ctxerror.New("cannot commit transactions"). @@ -110,7 +100,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch } else { utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", newBlock.NumberU64(), "numTxs", newBlock.Transactions().Len()) - // Set deadline will be BlockPeriod from now at this place. Anounce stage happens right after this. + // Set deadline will be BlockPeriod from now at this place. Announce stage happens right after this. deadline = time.Now().Add(node.BlockPeriod) // Send the new block to Consensus so it can be confirmed. node.BlockChannel <- newBlock From 00b7a0a74266aa6c8282c73ccc0fe96be1c53c8b Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 02:23:03 -0700 Subject: [PATCH 5/8] Adjust findpeer interval --- api/service/networkinfo/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 0435c8dff..f0b51f993 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -46,7 +46,7 @@ var ( const ( waitInRetry = 2 * time.Second connectionTimeout = 3 * time.Minute - findPeerInterval = 1 * time.Minute + findPeerInterval = 6 * time.Hour // register to bootnode every ticker dhtTicker = 6 * time.Hour From a41602b60139c35ee2fb1d7ac02326aa58b56b69 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 03:03:57 -0700 Subject: [PATCH 6/8] Enable consensus retry --- node/node_newblock.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/node_newblock.go b/node/node_newblock.go index 2cb4c98b5..e11e8a5c7 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -17,7 +17,7 @@ import ( // Constants of lower bound limit of a new block. const ( ConsensusTimeOut = 30 - PeriodicBlock = 1 * time.Second + PeriodicBlock = 200 * time.Millisecond ) // WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus. @@ -86,7 +86,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch log.Debug("Failed updating worker's state", "Error", err) continue } - newBlock, err := node.Worker.Commit(sig, mask, viewID, coinbase) + newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase) if err != nil { ctxerror.Log15(utils.GetLogger().Error, From 7a64cc1a7d60d0621e9356d80c3f4495ee85a9cf Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 04:02:35 -0700 Subject: [PATCH 7/8] Revert findPeerInterval --- api/service/networkinfo/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index f0b51f993..0435c8dff 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -46,7 +46,7 @@ var ( const ( waitInRetry = 2 * time.Second connectionTimeout = 3 * time.Minute - findPeerInterval = 6 * time.Hour + findPeerInterval = 1 * time.Minute // register to bootnode every ticker dhtTicker = 6 * time.Hour From daadeb37528001be09abf32fcfbcbbd89903a282 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 23 Jun 2019 04:28:14 -0700 Subject: [PATCH 8/8] Lower findpeer interval --- api/service/networkinfo/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 0435c8dff..8162dae68 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -46,7 +46,7 @@ var ( const ( waitInRetry = 2 * time.Second connectionTimeout = 3 * time.Minute - findPeerInterval = 1 * time.Minute + findPeerInterval = 30 * time.Second // register to bootnode every ticker dhtTicker = 6 * time.Hour