Merge pull request #1153 from rlan35/p2p_cpu_fix

Fine tune maxprocs and minpeer interval
pull/1155/head
Rongjian Lan 6 years ago committed by GitHub
commit e8cfe86ad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      api/service/networkinfo/service.go
  2. 6
      cmd/harmony/main.go
  3. 2
      consensus/consensus.go
  4. 22
      consensus/consensus_v2.go
  5. 6
      consensus/pbft_log.go
  6. 1
      node/node_genesis.go
  7. 2
      node/node_newblock.go

@ -46,7 +46,7 @@ var (
const ( const (
waitInRetry = 2 * time.Second waitInRetry = 2 * time.Second
connectionTimeout = 3 * time.Minute connectionTimeout = 3 * time.Minute
findPeerInterval = 30 * time.Second findPeerInterval = 60 * time.Second
// register to bootnode every ticker // register to bootnode every ticker
dhtTicker = 6 * time.Hour dhtTicker = 6 * time.Hour
@ -179,7 +179,6 @@ func (s *Service) findPeers() {
return return
} }
for peer := range s.peerInfo { for peer := range s.peerInfo {
utils.GetLogInstance().Info("Got peers", "peer", peer)
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 {
// utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) // utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID())
if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil { if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil {

@ -7,6 +7,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"path" "path"
"runtime"
"time" "time"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
@ -140,6 +141,9 @@ var (
) )
func initSetup() { func initSetup() {
// Add GOMAXPROCS to achieve max performance.
runtime.GOMAXPROCS(runtime.NumCPU() * 4)
// Set port and ip to global config. // Set port and ip to global config.
nodeconfig.GetDefaultConfig().Port = *port nodeconfig.GetDefaultConfig().Port = *port
nodeconfig.GetDefaultConfig().IP = *ip nodeconfig.GetDefaultConfig().IP = *ip
@ -281,7 +285,7 @@ func createGlobalConfig() *nodeconfig.ConfigType {
} }
nodeConfig.Host, err = p2pimpl.NewHost(&nodeConfig.SelfPeer, nodeConfig.P2pPriKey) nodeConfig.Host, err = p2pimpl.NewHost(&nodeConfig.SelfPeer, nodeConfig.P2pPriKey)
if *logConn { if *logConn && nodeConfig.GetNetworkType() != nodeconfig.Mainnet {
nodeConfig.Host.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogInstance())) nodeConfig.Host.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogInstance()))
} }
if err != nil { if err != nil {

@ -28,7 +28,7 @@ import (
) )
// BlockReward is the block reward, to be split evenly among block signers. // BlockReward is the block reward, to be split evenly among block signers.
var BlockReward = new(big.Int).Mul(big.NewInt(30), big.NewInt(denominations.One)) var BlockReward = new(big.Int).Mul(big.NewInt(24), big.NewInt(denominations.One))
// Consensus is the main struct with all states and data related to consensus process. // Consensus is the main struct with all states and data related to consensus process.
type Consensus struct { type Consensus struct {

@ -103,6 +103,7 @@ func (consensus *Consensus) announce(block *types.Block) {
} }
consensus.PbftLog.AddMessage(pbftMsg) consensus.PbftLog.AddMessage(pbftMsg)
consensus.getLogger().Debug("[Announce] Added Announce message in pbftLog", "MsgblockHash", pbftMsg.BlockHash, "MsgViewID", pbftMsg.ViewID, "MsgBlockNum", pbftMsg.BlockNum)
consensus.PbftLog.AddBlock(block) consensus.PbftLog.AddBlock(block)
// Leader sign the block hash itself // Leader sign the block hash itself
@ -116,7 +117,7 @@ func (consensus *Consensus) announce(block *types.Block) {
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[Announce] Cannot send announce message", "groupID", p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))) consensus.getLogger().Warn("[Announce] Cannot send announce message", "groupID", p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))
} else { } else {
consensus.getLogger().Debug("[Announce] Sent Announce Message!!", "BlockHash", block.Hash(), "BlockNum", block.NumberU64()) consensus.getLogger().Info("[Announce] Sent Announce Message!!", "BlockHash", block.Hash(), "BlockNum", block.NumberU64())
} }
consensus.getLogger().Debug("[Announce] Switching phase", "From", consensus.phase, "To", Prepare) consensus.getLogger().Debug("[Announce] Switching phase", "From", consensus.phase, "To", Prepare)
@ -175,7 +176,8 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
consensus.getLogger().Debug("[OnAnnounce] Leader is malicious", "leaderKey", consensus.LeaderPubKey.SerializeToHexStr()) consensus.getLogger().Debug("[OnAnnounce] Leader is malicious", "leaderKey", consensus.LeaderPubKey.SerializeToHexStr())
consensus.startViewChange(consensus.viewID + 1) consensus.startViewChange(consensus.viewID + 1)
} }
return consensus.getLogger().Debug("[OnAnnounce] Announce message received again", "leaderKey", consensus.LeaderPubKey.SerializeToHexStr())
//return
} }
consensus.getLogger().Debug("[OnAnnounce] Announce message Added", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug("[OnAnnounce] Announce message Added", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum)
@ -247,7 +249,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
if !consensus.PbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) { if !consensus.PbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
consensus.getLogger().Debug("[OnPrepare] No Matching Announce message", "MsgblockHash", recvMsg.BlockHash, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug("[OnPrepare] No Matching Announce message", "MsgblockHash", recvMsg.BlockHash, "MsgBlockNum", recvMsg.BlockNum)
return //return
} }
validatorPubKey := recvMsg.SenderPubkey.SerializeToHexStr() validatorPubKey := recvMsg.SenderPubkey.SerializeToHexStr()
@ -260,7 +262,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
if len(prepareSigs) >= consensus.Quorum() { if len(prepareSigs) >= consensus.Quorum() {
// already have enough signatures // already have enough signatures
consensus.getLogger().Info("[OnPrepare] Received Additional Prepare Message", "ValidatorPubKey", validatorPubKey) consensus.getLogger().Debug("[OnPrepare] Received Additional Prepare Message", "ValidatorPubKey", validatorPubKey)
return return
} }
// proceed only when the message is not received before // proceed only when the message is not received before
@ -282,7 +284,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return return
} }
consensus.getLogger().Debug("[OnPrepare] Received New Prepare Signature", "NumReceivedSoFar", len(prepareSigs), "validatorPubKey", validatorPubKey, "PublicKeys", len(consensus.PublicKeys)) consensus.getLogger().Info("[OnPrepare] Received New Prepare Signature", "NumReceivedSoFar", len(prepareSigs), "validatorPubKey", validatorPubKey, "PublicKeys", len(consensus.PublicKeys))
prepareSigs[validatorPubKey] = &sign prepareSigs[validatorPubKey] = &sign
// Set the bitmap indicating that this validator signed. // Set the bitmap indicating that this validator signed.
if err := prepareBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil { if err := prepareBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil {
@ -463,7 +465,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[OnPrepared] Cannot send commit message!!") consensus.getLogger().Warn("[OnPrepared] Cannot send commit message!!")
} else { } else {
consensus.getLogger().Debug("[OnPrepared] Sent Commit Message!!", "BlockHash", consensus.blockHash, "BlockNum", consensus.blockNum) consensus.getLogger().Info("[OnPrepared] Sent Commit Message!!", "BlockHash", consensus.blockHash, "BlockNum", consensus.blockNum)
} }
consensus.getLogger().Debug("[OnPrepared] Switching phase", "From", consensus.phase, "To", Commit) consensus.getLogger().Debug("[OnPrepared] Switching phase", "From", consensus.phase, "To", Commit)
@ -527,7 +529,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
// proceed only when the message is not received before // proceed only when the message is not received before
_, ok := commitSigs[validatorPubKey] _, ok := commitSigs[validatorPubKey]
if ok { if ok {
consensus.getLogger().Info("[OnCommit] Already received commit message from the validator", "validatorPubKey", validatorPubKey) consensus.getLogger().Debug("[OnCommit] Already received commit message from the validator", "validatorPubKey", validatorPubKey)
return return
} }
@ -548,7 +550,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return return
} }
consensus.getLogger().Debug("[OnCommit] Received new commit message", "numReceivedSoFar", len(commitSigs), "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum, "validatorPubKey", validatorPubKey) consensus.getLogger().Info("[OnCommit] Received new commit message", "numReceivedSoFar", len(commitSigs), "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum, "validatorPubKey", validatorPubKey)
commitSigs[validatorPubKey] = &sign commitSigs[validatorPubKey] = &sign
// Set the bitmap indicating that this validator signed. // Set the bitmap indicating that this validator signed.
if err := commitBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil { if err := commitBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil {
@ -571,7 +573,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
if rewardThresholdIsMet { if rewardThresholdIsMet {
go func(viewID uint64) { go func(viewID uint64) {
consensus.commitFinishChan <- viewID consensus.commitFinishChan <- viewID
consensus.getLogger().Debug("[OnCommit] 90% Enough commits received", "NumCommits", len(commitSigs)) consensus.getLogger().Info("[OnCommit] 90% Enough commits received", "NumCommits", len(commitSigs))
}(consensus.viewID) }(consensus.viewID)
} }
} }
@ -612,7 +614,7 @@ func (consensus *Consensus) finalizeCommits() {
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[Finalizing] Cannot send committed message", "error", err) consensus.getLogger().Warn("[Finalizing] Cannot send committed message", "error", err)
} else { } else {
consensus.getLogger().Debug("[Finalizing] Sent Committed Message", "BlockHash", consensus.blockHash, "BlockNum", consensus.blockNum) consensus.getLogger().Info("[Finalizing] Sent Committed Message", "BlockHash", consensus.blockHash, "BlockNum", consensus.blockNum)
} }
consensus.reportMetrics(*block) consensus.reportMetrics(*block)

@ -168,13 +168,13 @@ func (log *PbftLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum ui
// HasMatchingAnnounce returns whether the log contains announce type message with given blockNum, blockHash // HasMatchingAnnounce returns whether the log contains announce type message with given blockNum, blockHash
func (log *PbftLog) HasMatchingAnnounce(blockNum uint64, blockHash common.Hash) bool { func (log *PbftLog) HasMatchingAnnounce(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_ANNOUNCE, blockNum, blockHash) found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_ANNOUNCE, blockNum, blockHash)
return len(found) == 1 return len(found) >= 1
} }
// HasMatchingViewAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash // HasMatchingViewAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash
func (log *PbftLog) HasMatchingViewAnnounce(blockNum uint64, viewID uint64, blockHash common.Hash) bool { func (log *PbftLog) HasMatchingViewAnnounce(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, blockNum, viewID, blockHash) found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, blockNum, viewID, blockHash)
return len(found) == 1 return len(found) >= 1
} }
// HasMatchingPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash // HasMatchingPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
@ -186,7 +186,7 @@ func (log *PbftLog) HasMatchingPrepared(blockNum uint64, blockHash common.Hash)
// HasMatchingViewPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash // HasMatchingViewPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
func (log *PbftLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, blockHash common.Hash) bool { func (log *PbftLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_PREPARED, blockNum, viewID, blockHash) found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_PREPARED, blockNum, viewID, blockHash)
return len(found) == 1 return len(found) >= 1
} }
// GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID // GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID

@ -102,6 +102,7 @@ func (node *Node) SetupGenesisBlock(db ethdb.Database, shardID uint32, myShardSt
GasLimit: params.GenesisGasLimit * 1000, GasLimit: params.GenesisGasLimit * 1000,
ShardStateHash: myShardState.Hash(), ShardStateHash: myShardState.Hash(),
ShardState: myShardState.DeepCopy(), ShardState: myShardState.DeepCopy(),
ExtraData: []byte("Harmony for One and All. Open Consensus for 10B."),
} }
// Store genesis block into db. // Store genesis block into db.

@ -45,7 +45,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
case <-time.After(ConsensusTimeOut * time.Second): case <-time.After(ConsensusTimeOut * time.Second):
if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) { if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) {
utils.GetLogInstance().Debug("Leader consensus timeout, retry!", "count", timeoutCount) utils.GetLogInstance().Debug("Leader consensus timeout, retry!", "count", timeoutCount)
node.Consensus.ResetState() //node.Consensus.ResetState()
timeoutCount++ timeoutCount++
if newBlock != nil { if newBlock != nil {
// Send the new block to Consensus so it can be confirmed. // Send the new block to Consensus so it can be confirmed.

Loading…
Cancel
Save