From 2cbb3fb2e1ffb0ca73da891e04e83fd5b138b8e6 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sat, 21 Mar 2020 10:20:17 -0700 Subject: [PATCH] Fix node sync issue which caused failure to sign and node being kicked out (#2564) * Only rollup last crosslink for beacon chain * Fix node sync'ing issue * add more log * Fix sync loop * Remove prepared check during sync'ing * update debug info --- api/service/syncing/syncing.go | 2 +- consensus/checks.go | 15 ++++++ consensus/consensus.go | 9 +--- consensus/consensus_v2.go | 18 ++----- consensus/validator.go | 44 +++++++++-------- core/offchain.go | 19 +++++--- node/node_handler.go | 38 ++++++++------- node/node_syncing.go | 89 ++++++++++++++++++---------------- 8 files changed, 125 insertions(+), 109 deletions(-) diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index ec8231826..e1d07c2c7 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -805,7 +805,7 @@ Loop: } ss.purgeOldBlocksFromCache() if consensus != nil { - consensus.UpdateConsensusInformation() + consensus.SetMode(consensus.UpdateConsensusInformation()) } } } diff --git a/consensus/checks.go b/consensus/checks.go index 110ea1ebf..a6812b66f 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -11,6 +11,11 @@ import ( const MaxBlockNumDiff = 100 func (consensus *Consensus) validatorSanityChecks(msg *msg_pb.Message) bool { + consensus.getLogger().Debug(). + Uint64("blockNum", msg.GetConsensus().BlockNum). + Uint64("viewID", msg.GetConsensus().ViewId). + Str("msgType", msg.Type.String()). + Msg("[validatorSanityChecks] Checking new message") senderKey, err := consensus.verifySenderKey(msg) if err != nil { if err == shard.ErrValidNotInCommittee { @@ -42,6 +47,11 @@ func (consensus *Consensus) validatorSanityChecks(msg *msg_pb.Message) bool { } func (consensus *Consensus) leaderSanityChecks(msg *msg_pb.Message) bool { + consensus.getLogger().Debug(). + Uint64("blockNum", msg.GetConsensus().BlockNum). + Uint64("viewID", msg.GetConsensus().ViewId). + Str("msgType", msg.Type.String()). + Msg("[leaderSanityChecks] Checking new message") senderKey, err := consensus.verifySenderKey(msg) if err != nil { if err == shard.ErrValidNotInCommittee { @@ -186,6 +196,11 @@ func (consensus *Consensus) onPreparedSanityChecks( } func (consensus *Consensus) viewChangeSanityCheck(msg *msg_pb.Message) bool { + consensus.getLogger().Debug(). + Uint64("blockNum", msg.GetConsensus().BlockNum). + Uint64("viewID", msg.GetConsensus().ViewId). + Str("msgType", msg.Type.String()). + Msg("[viewChangeSanityCheck] Checking new message") senderKey, err := consensus.verifyViewChangeSenderKey(msg) if err != nil { consensus.getLogger().Error().Err(err).Msgf( diff --git a/consensus/consensus.go b/consensus/consensus.go index 84cb6c766..57e8d85a9 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -112,7 +112,7 @@ type Consensus struct { // verified block to state sync broadcast VerifiedNewBlock chan *types.Block // will trigger state syncing when blockNum is low - blockNumLowChan chan struct{} + BlockNumLowChan chan struct{} // Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf // randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap. PRndChannel chan []byte @@ -163,11 +163,6 @@ func (consensus *Consensus) BlocksNotSynchronized() { consensus.syncNotReadyChan <- struct{}{} } -// WaitForSyncing informs the node syncing service to start syncing -func (consensus *Consensus) WaitForSyncing() { - <-consensus.blockNumLowChan -} - // VdfSeedSize returns the number of VRFs for VDF computation func (consensus *Consensus) VdfSeedSize() int { return int(consensus.Decider.ParticipantsCount()) * 2 / 3 @@ -204,7 +199,7 @@ func New( consensus.Decider = Decider consensus.host = host consensus.msgSender = NewMessageSender(host) - consensus.blockNumLowChan = make(chan struct{}) + consensus.BlockNumLowChan = make(chan struct{}) // FBFT related consensus.FBFTLog = NewFBFTLog() consensus.phase = FBFTAnnounce diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index d4a8ac979..26f3a7820 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -32,12 +32,9 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) { // when node is in ViewChanging mode, it still accepts normal messages into FBFTLog // in order to avoid possible trap forever but drop PREPARE and COMMIT // which are message types specifically for a node acting as leader - switch { - case (consensus.current.Mode() == ViewChanging) && + if (consensus.current.Mode() == ViewChanging) && (msg.Type == msg_pb.MessageType_PREPARE || - msg.Type == msg_pb.MessageType_COMMIT): - return - case consensus.current.Mode() == Listening: + msg.Type == msg_pb.MessageType_COMMIT) { return } @@ -62,16 +59,8 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) { } } - notMemberButStillCatchup := !consensus.Decider.AmIMemberOfCommitee() && - msg.Type == msg_pb.MessageType_COMMITTED - - if notMemberButStillCatchup { - consensus.onCommitted(msg) - return - } - intendedForValidator, intendedForLeader := - !(consensus.IsLeader() && consensus.current.Mode() == Normal), + !consensus.IsLeader(), consensus.IsLeader() switch t := msg.Type; true { @@ -485,7 +474,6 @@ func (consensus *Consensus) Start( consensus.announce(newBlock) case msg := <-consensus.MsgChan: - consensus.getLogger().Debug().Msg("[ConsensusMainLoop] MsgChan") consensus.handleMessageUpdate(msg) case viewID := <-consensus.commitFinishChan: diff --git a/consensus/validator.go b/consensus/validator.go index 26b4f9f7b..023cb1ff5 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -69,15 +69,17 @@ func (consensus *Consensus) prepare() { } // TODO: this will not return immediatey, may block - if err := consensus.msgSender.SendWithoutRetry( - groupID, - host.ConstructP2pMessage(byte(17), networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") - } else { - consensus.getLogger().Info(). - Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). - Msg("[OnAnnounce] Sent Prepare Message!!") + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + host.ConstructP2pMessage(byte(17), networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") + } else { + consensus.getLogger().Info(). + Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). + Msg("[OnAnnounce] Sent Prepare Message!!") + } } } consensus.getLogger().Debug(). @@ -207,16 +209,18 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { time.Sleep(consensus.delayCommit) } - if err := consensus.msgSender.SendWithoutRetry( - groupID, - host.ConstructP2pMessage(byte(17), networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") - } else { - consensus.getLogger().Info(). - Uint64("blockNum", consensus.blockNum). - Hex("blockHash", consensus.blockHash[:]). - Msg("[OnPrepared] Sent Commit Message!!") + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + host.ConstructP2pMessage(byte(17), networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") + } else { + consensus.getLogger().Info(). + Uint64("blockNum", consensus.blockNum). + Hex("blockHash", consensus.blockHash[:]). + Msg("[OnPrepared] Sent Commit Message!!") + } } } consensus.getLogger().Debug(). @@ -277,7 +281,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.getLogger().Debug().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] out of sync") go func() { select { - case consensus.blockNumLowChan <- struct{}{}: + case consensus.BlockNumLowChan <- struct{}{}: consensus.current.SetMode(Syncing) for _, v := range consensus.consensusTimeout { v.Stop() diff --git a/core/offchain.go b/core/offchain.go index 58cdcc618..9321e4210 100644 --- a/core/offchain.go +++ b/core/offchain.go @@ -147,7 +147,7 @@ func (bc *BlockChain) CommitOffChainData( } // Writing beacon chain cross links - if header.ShardID() == shard.BeaconChainShardID && + if isBeaconChain && bc.chainConfig.IsCrossLink(block.Epoch()) && len(header.CrossLinks()) > 0 { crossLinks := &types.CrossLinks{} @@ -192,13 +192,16 @@ func (bc *BlockChain) CommitOffChainData( Msgf(msg, len(*crossLinks), num) utils.Logger().Debug().Msgf(msg, len(*crossLinks), num) } - // Roll up latest crosslinks - for i, c := uint32(0), shard.Schedule.InstanceForEpoch( - epoch, - ).NumShards(); i < c; i++ { - if err := bc.LastContinuousCrossLink(batch, i); err != nil { - utils.Logger().Info(). - Err(err).Msg("could not batch process last continuous crosslink") + + if isBeaconChain { + // Roll up latest crosslinks + for i, c := uint32(0), shard.Schedule.InstanceForEpoch( + epoch, + ).NumShards(); i < c; i++ { + if err := bc.LastContinuousCrossLink(batch, i); err != nil { + utils.Logger().Info(). + Err(err).Msg("could not batch process last continuous crosslink") + } } } diff --git a/node/node_handler.go b/node/node_handler.go index ddb66842e..97155031e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -7,6 +7,8 @@ import ( "math/rand" "time" + "github.com/harmony-one/harmony/consensus" + "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/api/proto" @@ -456,23 +458,25 @@ func (node *Node) PostConsensusProcessing( } node.BroadcastCXReceipts(newBlock, commitSigAndBitmap) } else { - utils.Logger().Info(). - Uint64("blockNum", newBlock.NumberU64()). - Uint64("epochNum", newBlock.Epoch().Uint64()). - Uint64("ViewId", newBlock.Header().ViewID().Uint64()). - Str("blockHash", newBlock.Hash().String()). - Int("numTxns", len(newBlock.Transactions())). - Int("numStakingTxns", len(newBlock.StakingTransactions())). - Msg("BINGO !!! Reached Consensus") - // 1% of the validator also need to do broadcasting - rand.Seed(time.Now().UTC().UnixNano()) - rnd := rand.Intn(100) - if rnd < 1 { - // Beacon validators also broadcast new blocks to make sure beacon sync is strong. - if node.NodeConfig.ShardID == shard.BeaconChainShardID { - node.BroadcastNewBlock(newBlock) + if node.Consensus.Mode() != consensus.Listening { + utils.Logger().Info(). + Uint64("blockNum", newBlock.NumberU64()). + Uint64("epochNum", newBlock.Epoch().Uint64()). + Uint64("ViewId", newBlock.Header().ViewID().Uint64()). + Str("blockHash", newBlock.Hash().String()). + Int("numTxns", len(newBlock.Transactions())). + Int("numStakingTxns", len(newBlock.StakingTransactions())). + Msg("BINGO !!! Reached Consensus") + // 1% of the validator also need to do broadcasting + rand.Seed(time.Now().UTC().UnixNano()) + rnd := rand.Intn(100) + if rnd < 1 { + // Beacon validators also broadcast new blocks to make sure beacon sync is strong. + if node.NodeConfig.ShardID == shard.BeaconChainShardID { + node.BroadcastNewBlock(newBlock) + } + node.BroadcastCXReceipts(newBlock, commitSigAndBitmap) } - node.BroadcastCXReceipts(newBlock, commitSigAndBitmap) } } @@ -481,7 +485,7 @@ func (node *Node) PostConsensusProcessing( // Update consensus keys at last so the change of leader status doesn't mess up normal flow if len(newBlock.Header().ShardState()) > 0 { - node.Consensus.UpdateConsensusInformation() + node.Consensus.SetMode(node.Consensus.UpdateConsensusInformation()) } if h := node.NodeConfig.WebHooks.Hooks; h != nil { if h.Availability != nil { diff --git a/node/node_syncing.go b/node/node_syncing.go index b6c9ffa56..39aa9bead 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -203,55 +203,62 @@ func (node *Node) DoBeaconSyncing() { // DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { - + ticker := time.NewTicker(time.Duration(node.syncFreq) * time.Second) // TODO ek – infinite loop; add shutdown/cleanup logic -SyncingLoop: for { - if node.stateSync == nil { - node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) - utils.Logger().Debug().Msg("[SYNC] initialized state sync") + select { + case <-ticker.C: + node.doSync(bc, worker, willJoinConsensus) + case <-node.Consensus.BlockNumLowChan: + node.doSync(bc, worker, willJoinConsensus) } - if node.stateSync.GetActivePeerNumber() < MinConnectedPeers { - shardID := bc.ShardID() - peers, err := node.SyncingPeerProvider.SyncingPeers(shardID) - if err != nil { - utils.Logger().Warn(). - Err(err). - Uint32("shard_id", shardID). - Msg("cannot retrieve syncing peers") - continue SyncingLoop - } - if err := node.stateSync.CreateSyncConfig(peers, false); err != nil { - utils.Logger().Warn(). - Err(err). - Interface("peers", peers). - Msg("[SYNC] create peers error") - continue SyncingLoop - } - utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers") + } +} + +// doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up +func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { + if node.stateSync == nil { + node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) + utils.Logger().Debug().Msg("[SYNC] initialized state sync") + } + if node.stateSync.GetActivePeerNumber() < MinConnectedPeers { + shardID := bc.ShardID() + peers, err := node.SyncingPeerProvider.SyncingPeers(shardID) + if err != nil { + utils.Logger().Warn(). + Err(err). + Uint32("shard_id", shardID). + Msg("cannot retrieve syncing peers") + return } - // TODO: treat fake maximum height - if node.stateSync.IsOutOfSync(bc) { - node.stateMutex.Lock() - node.State = NodeNotInSync - node.stateMutex.Unlock() - if willJoinConsensus { - node.Consensus.BlocksNotSynchronized() - } - node.stateSync.SyncLoop(bc, worker, false, node.Consensus) - if willJoinConsensus { - node.stateMutex.Lock() - node.State = NodeReadyForConsensus - node.stateMutex.Unlock() - node.Consensus.BlocksSynchronized() - } + if err := node.stateSync.CreateSyncConfig(peers, false); err != nil { + utils.Logger().Warn(). + Err(err). + Interface("peers", peers). + Msg("[SYNC] create peers error") + return } + utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers") + } + // TODO: treat fake maximum height + if node.stateSync.IsOutOfSync(bc) { node.stateMutex.Lock() - node.State = NodeReadyForConsensus + node.State = NodeNotInSync node.stateMutex.Unlock() - // TODO on demand syncing - time.Sleep(time.Duration(node.syncFreq) * time.Second) + if willJoinConsensus { + node.Consensus.BlocksNotSynchronized() + } + node.stateSync.SyncLoop(bc, worker, false, node.Consensus) + if willJoinConsensus { + node.stateMutex.Lock() + node.State = NodeReadyForConsensus + node.stateMutex.Unlock() + node.Consensus.BlocksSynchronized() + } } + node.stateMutex.Lock() + node.State = NodeReadyForConsensus + node.stateMutex.Unlock() } // SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node