Merge pull request #1147 from rlan35/p2p_cpu_fix

Fix p2p cpu issue
pull/1148/head
Rongjian Lan 5 years ago committed by GitHub
commit 4487509457
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 101
      api/service/networkinfo/service.go
  2. 2
      consensus/consensus_v2.go
  3. 3
      node/node.go
  4. 22
      node/node_newblock.go

@ -7,6 +7,8 @@ import (
"sync"
"time"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
@ -14,7 +16,6 @@ import (
libp2pdis "github.com/libp2p/go-libp2p-discovery"
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
peerstore "github.com/libp2p/go-libp2p-peerstore"
manet "github.com/multiformats/go-multiaddr-net"
)
// Service is the network info service.
@ -45,6 +46,7 @@ var (
const (
waitInRetry = 2 * time.Second
connectionTimeout = 3 * time.Minute
findPeerInterval = 30 * time.Second
// register to bootnode every ticker
dhtTicker = 6 * time.Hour
@ -143,66 +145,77 @@ func (s *Service) Run() {
return
}
var err error
s.peerInfo, err = s.discovery.FindPeers(ctx, string(s.Rendezvous))
if err != nil {
utils.GetLogInstance().Error("FindPeers", "error", err)
return
}
go s.DoService()
}
// DoService does network info.
func (s *Service) DoService() {
_, cgnPrefix, err := net.ParseCIDR("100.64.0.0/10")
if err != nil {
utils.GetLogInstance().Error("can't parse CIDR", "error", err)
return
}
tick := time.NewTicker(dhtTicker)
for {
select {
case peer := <-s.peerInfo:
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 {
// utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID())
if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil {
utils.GetLogInstance().Warn("can't connect to peer node", "error", err, "peer", peer)
// break if the node can't connect to peers, waiting for another peer
break
} else {
utils.GetLogInstance().Info("connected to peer node", "peer", peer)
}
// figure out the public ip/port
var ip, port string
for _, addr := range peer.Addrs {
netaddr, err := manet.ToNetAddr(addr)
if err != nil {
continue
}
nip := netaddr.(*net.TCPAddr).IP
if (nip.IsGlobalUnicast() && !utils.IsPrivateIP(nip)) || cgnPrefix.Contains(nip) {
ip = nip.String()
port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port)
break
}
}
p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs}
utils.GetLogInstance().Info("Notify peerChan", "peer", p)
if s.peerChan != nil {
s.peerChan <- p
}
}
case <-s.stopChan:
return
case <-tick.C:
libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous))
utils.GetLogInstance().Info("Successfully announced!", "Rendezvous", string(s.Rendezvous))
default:
var err error
s.peerInfo, err = s.discovery.FindPeers(ctx, string(s.Rendezvous))
if err != nil {
utils.GetLogInstance().Error("FindPeers", "error", err)
return
}
s.findPeers()
time.Sleep(findPeerInterval)
}
}
}
func (s *Service) findPeers() {
_, cgnPrefix, err := net.ParseCIDR("100.64.0.0/10")
if err != nil {
utils.GetLogInstance().Error("can't parse CIDR", "error", err)
return
}
for peer := range s.peerInfo {
utils.GetLogInstance().Info("Got peers", "peer", peer)
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 {
// utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID())
if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil {
utils.GetLogInstance().Warn("can't connect to peer node", "error", err, "peer", peer)
// break if the node can't connect to peers, waiting for another peer
break
} else {
utils.GetLogInstance().Info("connected to peer node", "peer", peer)
}
// figure out the public ip/port
var ip, port string
for _, addr := range peer.Addrs {
netaddr, err := manet.ToNetAddr(addr)
if err != nil {
continue
}
nip := netaddr.(*net.TCPAddr).IP
if (nip.IsGlobalUnicast() && !utils.IsPrivateIP(nip)) || cgnPrefix.Contains(nip) {
ip = nip.String()
port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port)
break
}
}
p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs}
utils.GetLogInstance().Info("Notify peerChan", "peer", p)
if s.peerChan != nil {
s.peerChan <- p
}
}
}
utils.GetLogInstance().Info("PeerInfo Channel Closed.")
return
}
// StopService stops network info service.
func (s *Service) StopService() {
utils.GetLogInstance().Info("Stopping network info service.")

@ -403,7 +403,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
if consensus.BlockVerifier == nil {
// do nothing
} else if err := consensus.BlockVerifier(&blockObj); err != nil {
consensus.getLogger().Info("[OnPrepared] Block verification faied")
consensus.getLogger().Info("[OnPrepared] Block verification failed", "error", err)
return
}
}

@ -370,8 +370,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
}
}
utils.GetLogInstance().Info("Genesis block hash",
"hash", node.Blockchain().GetBlockByNumber(0).Hash().Hex(), "state root", node.Blockchain().GetBlockByNumber(0).Header().Root.Hex())
utils.GetLogInstance().Info("Genesis block hash", "genesis block header", node.Blockchain().GetBlockByNumber(0).Header())
if consensusObj != nil && nodeconfig.GetDefaultConfig().IsLeader() {
node.State = NodeLeader
} else {

@ -16,10 +16,8 @@ import (
// Constants of lower bound limit of a new block.
const (
DefaultThreshold = 1
FirstTimeThreshold = 2
ConsensusTimeOut = 30
PeriodicBlock = 1 * time.Second
ConsensusTimeOut = 30
PeriodicBlock = 200 * time.Millisecond
)
// WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus.
@ -33,7 +31,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
utils.GetLogInstance().Debug("Waiting for Consensus ready")
time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only)
firstTime := true
timeoutCount := 0
var newBlock *types.Block
@ -57,14 +54,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
}
case <-readySignal:
for {
// threshold and firstTime are for the test-only built-in smart contract tx.
// TODO: remove in production
threshold := DefaultThreshold
if firstTime {
threshold = FirstTimeThreshold
firstTime = false
}
if len(node.pendingTransactions) < threshold && time.Now().Before(deadline) {
if time.Now().Before(deadline) {
time.Sleep(PeriodicBlock)
continue
}
@ -75,7 +65,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet {
selectedTxs = node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock, coinbase)
}
utils.GetLogInstance().Info("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "selectedTxs", len(selectedTxs))
utils.GetLogInstance().Info("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "selectedTxs", len(selectedTxs))
if err := node.Worker.CommitTransactions(selectedTxs, coinbase); err != nil {
ctxerror.Log15(utils.GetLogger().Error,
ctxerror.New("cannot commit transactions").
@ -96,7 +86,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
log.Debug("Failed updating worker's state", "Error", err)
continue
}
newBlock, err := node.Worker.Commit(sig, mask, viewID, coinbase)
newBlock, err = node.Worker.Commit(sig, mask, viewID, coinbase)
if err != nil {
ctxerror.Log15(utils.GetLogger().Error,
@ -110,7 +100,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
} else {
utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", newBlock.NumberU64(), "numTxs", newBlock.Transactions().Len())
// Set deadline will be BlockPeriod from now at this place. Anounce stage happens right after this.
// Set deadline will be BlockPeriod from now at this place. Announce stage happens right after this.
deadline = time.Now().Add(node.BlockPeriod)
// Send the new block to Consensus so it can be confirmed.
node.BlockChannel <- newBlock

Loading…
Cancel
Save