diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 92926e1a6..51dc251c8 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -551,7 +551,9 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi // just ignore consensus check for the first time when node join if consensus.ignoreViewIDCheck { consensus.viewID = viewID - consensus.ToggleConsensusCheck() + consensus.mutex.Lock() + consensus.ignoreViewIDCheck = false + consensus.mutex.Unlock() return nil } else if viewID != consensus.viewID { utils.GetLogInstance().Warn("Wrong consensus Id", "myViewId", consensus.viewID, "theirViewId", viewID, "consensus", consensus) @@ -566,6 +568,31 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi return nil } +// Check viewID +func (consensus *Consensus) checkViewID(message *msg_pb.Message) error { + consensusMsg := message.GetConsensus() + viewID := consensusMsg.ViewId + + // just ignore consensus check for the first time when node join + if consensus.ignoreViewIDCheck { + consensus.viewID = viewID + consensus.mutex.Lock() + consensus.ignoreViewIDCheck = false + consensus.mutex.Unlock() + return nil + } else if viewID != consensus.viewID { + utils.GetLogger().Warn("wrong consensus id", "myViewId", consensus.viewID, "theirViewId", viewID, "consensus", consensus) + // notify state syncing to start + select { + case consensus.ViewIDLowChan <- struct{}{}: + default: + } + + return consensus_engine.ErrViewIDNotMatch + } + return nil +} + // SetBlockNum sets the blockNum in consensus object, called at node bootstrap func (consensus *Consensus) SetBlockNum(blockNum uint64) { consensus.mutex.Lock() diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index b5a0d86d8..a21573670 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -151,7 +151,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { ctxerror.Log15(utils.GetLogInstance().Warn, err) return } - + //blockObj.Logger(utils.GetLogger()).Debug("received announce", "viewID", recvMsg.ViewID, "msgBlockNum", recvMsg.BlockNum) logMsgs := consensus.pbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID) if len(logMsgs) > 0 { if logMsgs[0].BlockHash != blockObj.Header().Hash() { @@ -160,13 +160,17 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } return } - blockPayload := make([]byte, len(block)) copy(blockPayload[:], block[:]) consensus.block = blockPayload consensus.blockHash = recvMsg.BlockHash consensus.pbftLog.AddMessage(recvMsg) consensus.pbftLog.AddBlock(&blockObj) + + if consensus.checkViewID(msg) != nil { + utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID) + return + } consensus.tryPrepare(blockObj.Header().Hash()) consensus.consensusTimeout["bootstrap"].Stop() @@ -315,6 +319,9 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { return } if recvMsg.BlockNum < consensus.blockNum { + utils.GetLogger().Debug("old block received, ignoring", + "receivedNumber", recvMsg.BlockNum, + "expectedNumber", consensus.blockNum) return } @@ -359,10 +366,24 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { return } + // TODO ek/cm - make sure we update blocks for syncing consensus.pbftLog.AddMessage(recvMsg) - if recvMsg.BlockNum > consensus.blockNum || consensus.phase != Prepare || recvMsg.ViewID != consensus.viewID { + if consensus.checkViewID(msg) != nil { + utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID) + return + } + if recvMsg.BlockNum > consensus.blockNum { + utils.GetLogger().Debug("future block received, ignoring", + "receivedNumber", recvMsg.BlockNum, + "expectedNumber", consensus.blockNum) return } + if consensus.phase != Prepare { + utils.GetLogger().Debug("we are in a wrong phase", + "actualPhase", consensus.phase, + "expectedPhase", Prepare) + // return // TODO: check if we should return + } consensus.aggregatedPrepareSig = &deserializedMultiSig consensus.prepareBitmap = mask @@ -575,6 +596,11 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.pbftLog.AddMessage(recvMsg) + if consensus.checkViewID(msg) != nil { + utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID) + return + } + if recvMsg.BlockNum > consensus.blockNum || consensus.phase != Commit || recvMsg.ViewID != consensus.viewID { return } @@ -688,6 +714,9 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan case <-ticker.C: if !consensus.PubKey.IsEqual(consensus.LeaderPubKey) { for k, v := range consensus.consensusTimeout { + if consensus.mode.Mode() == Syncing { + v.Stop() + } if !v.CheckExpire() { continue } diff --git a/consensus/view_change.go b/consensus/view_change.go index ad384c08a..50bbe2968 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -31,6 +31,7 @@ type Mode int const ( Normal Mode = iota ViewChanging + Syncing ) // PbftMode contains mode and viewID of viewchanging @@ -52,6 +53,11 @@ func (pm *PbftMode) SetMode(m Mode) { pm.mode = m } +// SetMode set mode for consensus +func (consensus *Consensus) SetMode(m Mode) { + consensus.mode.mode = m +} + // ViewID return the current viewchanging id func (pm *PbftMode) ViewID() uint32 { return pm.viewID @@ -108,7 +114,7 @@ func (consensus *Consensus) getIndexOfPubKey(pubKey *bls.PublicKey) int { // ResetViewChangeState reset the state for viewchange func (consensus *Consensus) ResetViewChangeState() { - consensus.mode.SetMode(Normal) + consensus.SetMode(Normal) bhpBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil) nilBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil) consensus.bhpBitmap = bhpBitmap @@ -138,8 +144,8 @@ func (consensus *Consensus) startViewChange(viewID uint32) { consensus.consensusTimeout[k].Stop() } } - consensus.mode.SetMode(ViewChanging) - consensus.mode.SetViewID(viewID) + consensus.SetMode(ViewChanging) + consensus.SetViewID(viewID) nextLeaderKey := consensus.GetNextLeaderKey() consensus.LeaderPubKey = consensus.GetNextLeaderKey() if nextLeaderKey.IsEqual(consensus.PubKey) { @@ -161,7 +167,7 @@ func (consensus *Consensus) startViewChange(viewID uint32) { // new leader send new view message func (consensus *Consensus) startNewView() { utils.GetLogInstance().Info("startNewView", "viewID", consensus.mode.GetViewID()) - consensus.mode.SetMode(Normal) + consensus.SetMode(Normal) consensus.switchPhase(Announce) msgToSend := consensus.constructNewViewMessage() @@ -201,7 +207,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { consensus.vcLock.Lock() defer consensus.vcLock.Unlock() - consensus.mode.SetMode(ViewChanging) + consensus.SetMode(ViewChanging) consensus.mode.SetViewID(recvMsg.ViewID) _, ok1 := consensus.nilSigs[consensus.SelfAddress] @@ -289,7 +295,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { } if (len(consensus.bhpSigs) + len(consensus.nilSigs)) >= ((len(consensus.PublicKeys)*2)/3 + 1) { - consensus.mode.SetMode(Normal) + consensus.SetMode(Normal) consensus.LeaderPubKey = consensus.PubKey if len(consensus.m1Payload) == 0 { consensus.phase = Announce diff --git a/node/node_syncing.go b/node/node_syncing.go index 525c35b75..4cf3ca02d 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/api/service/syncing" "github.com/harmony-one/harmony/api/service/syncing/downloader" downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" @@ -104,6 +105,7 @@ SyncingLoop: utils.GetLogInstance().Debug("[SYNC] out of sync, doing syncing", "willJoinConsensus", willJoinConsensus) node.stateMutex.Lock() node.State = NodeNotInSync + node.Consensus.SetMode(consensus.Syncing) node.stateMutex.Unlock() node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false) getLogger().Debug("now in sync") @@ -112,6 +114,8 @@ SyncingLoop: node.stateMutex.Lock() node.State = NodeReadyForConsensus node.stateMutex.Unlock() + node.Consensus.SetMode(consensus.Normal) + node.Consensus.SetBlockNum(node.Consensus.ChainReader.CurrentHeader().Number.Uint64() + 1) node.Consensus.ToggleConsensusCheck() } }