fix node syncing integration

pull/905/head
chao 6 years ago committed by chaosma
parent 52cf621b2c
commit 00d002caa6
  1. 29
      consensus/consensus_service.go
  2. 35
      consensus/consensus_v2.go
  3. 18
      consensus/view_change.go
  4. 4
      node/node_syncing.go

@ -551,7 +551,9 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi
// just ignore consensus check for the first time when node join
if consensus.ignoreViewIDCheck {
consensus.viewID = viewID
consensus.ToggleConsensusCheck()
consensus.mutex.Lock()
consensus.ignoreViewIDCheck = false
consensus.mutex.Unlock()
return nil
} else if viewID != consensus.viewID {
utils.GetLogInstance().Warn("Wrong consensus Id", "myViewId", consensus.viewID, "theirViewId", viewID, "consensus", consensus)
@ -566,6 +568,31 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi
return nil
}
// Check viewID
func (consensus *Consensus) checkViewID(message *msg_pb.Message) error {
consensusMsg := message.GetConsensus()
viewID := consensusMsg.ViewId
// just ignore consensus check for the first time when node join
if consensus.ignoreViewIDCheck {
consensus.viewID = viewID
consensus.mutex.Lock()
consensus.ignoreViewIDCheck = false
consensus.mutex.Unlock()
return nil
} else if viewID != consensus.viewID {
utils.GetLogger().Warn("wrong consensus id", "myViewId", consensus.viewID, "theirViewId", viewID, "consensus", consensus)
// notify state syncing to start
select {
case consensus.ViewIDLowChan <- struct{}{}:
default:
}
return consensus_engine.ErrViewIDNotMatch
}
return nil
}
// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
func (consensus *Consensus) SetBlockNum(blockNum uint64) {
consensus.mutex.Lock()

@ -151,7 +151,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
ctxerror.Log15(utils.GetLogInstance().Warn, err)
return
}
//blockObj.Logger(utils.GetLogger()).Debug("received announce", "viewID", recvMsg.ViewID, "msgBlockNum", recvMsg.BlockNum)
logMsgs := consensus.pbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID)
if len(logMsgs) > 0 {
if logMsgs[0].BlockHash != blockObj.Header().Hash() {
@ -160,13 +160,17 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
}
return
}
blockPayload := make([]byte, len(block))
copy(blockPayload[:], block[:])
consensus.block = blockPayload
consensus.blockHash = recvMsg.BlockHash
consensus.pbftLog.AddMessage(recvMsg)
consensus.pbftLog.AddBlock(&blockObj)
if consensus.checkViewID(msg) != nil {
utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
return
}
consensus.tryPrepare(blockObj.Header().Hash())
consensus.consensusTimeout["bootstrap"].Stop()
@ -315,6 +319,9 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
return
}
if recvMsg.BlockNum < consensus.blockNum {
utils.GetLogger().Debug("old block received, ignoring",
"receivedNumber", recvMsg.BlockNum,
"expectedNumber", consensus.blockNum)
return
}
@ -359,10 +366,24 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
return
}
// TODO ek/cm - make sure we update blocks for syncing
consensus.pbftLog.AddMessage(recvMsg)
if recvMsg.BlockNum > consensus.blockNum || consensus.phase != Prepare || recvMsg.ViewID != consensus.viewID {
if consensus.checkViewID(msg) != nil {
utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
return
}
if recvMsg.BlockNum > consensus.blockNum {
utils.GetLogger().Debug("future block received, ignoring",
"receivedNumber", recvMsg.BlockNum,
"expectedNumber", consensus.blockNum)
return
}
if consensus.phase != Prepare {
utils.GetLogger().Debug("we are in a wrong phase",
"actualPhase", consensus.phase,
"expectedPhase", Prepare)
// return // TODO: check if we should return
}
consensus.aggregatedPrepareSig = &deserializedMultiSig
consensus.prepareBitmap = mask
@ -575,6 +596,11 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.pbftLog.AddMessage(recvMsg)
if consensus.checkViewID(msg) != nil {
utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
return
}
if recvMsg.BlockNum > consensus.blockNum || consensus.phase != Commit || recvMsg.ViewID != consensus.viewID {
return
}
@ -688,6 +714,9 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
case <-ticker.C:
if !consensus.PubKey.IsEqual(consensus.LeaderPubKey) {
for k, v := range consensus.consensusTimeout {
if consensus.mode.Mode() == Syncing {
v.Stop()
}
if !v.CheckExpire() {
continue
}

@ -31,6 +31,7 @@ type Mode int
const (
Normal Mode = iota
ViewChanging
Syncing
)
// PbftMode contains mode and viewID of viewchanging
@ -52,6 +53,11 @@ func (pm *PbftMode) SetMode(m Mode) {
pm.mode = m
}
// SetMode set mode for consensus
func (consensus *Consensus) SetMode(m Mode) {
consensus.mode.mode = m
}
// ViewID return the current viewchanging id
func (pm *PbftMode) ViewID() uint32 {
return pm.viewID
@ -108,7 +114,7 @@ func (consensus *Consensus) getIndexOfPubKey(pubKey *bls.PublicKey) int {
// ResetViewChangeState reset the state for viewchange
func (consensus *Consensus) ResetViewChangeState() {
consensus.mode.SetMode(Normal)
consensus.SetMode(Normal)
bhpBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
nilBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
consensus.bhpBitmap = bhpBitmap
@ -138,8 +144,8 @@ func (consensus *Consensus) startViewChange(viewID uint32) {
consensus.consensusTimeout[k].Stop()
}
}
consensus.mode.SetMode(ViewChanging)
consensus.mode.SetViewID(viewID)
consensus.SetMode(ViewChanging)
consensus.SetViewID(viewID)
nextLeaderKey := consensus.GetNextLeaderKey()
consensus.LeaderPubKey = consensus.GetNextLeaderKey()
if nextLeaderKey.IsEqual(consensus.PubKey) {
@ -161,7 +167,7 @@ func (consensus *Consensus) startViewChange(viewID uint32) {
// new leader send new view message
func (consensus *Consensus) startNewView() {
utils.GetLogInstance().Info("startNewView", "viewID", consensus.mode.GetViewID())
consensus.mode.SetMode(Normal)
consensus.SetMode(Normal)
consensus.switchPhase(Announce)
msgToSend := consensus.constructNewViewMessage()
@ -201,7 +207,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.vcLock.Lock()
defer consensus.vcLock.Unlock()
consensus.mode.SetMode(ViewChanging)
consensus.SetMode(ViewChanging)
consensus.mode.SetViewID(recvMsg.ViewID)
_, ok1 := consensus.nilSigs[consensus.SelfAddress]
@ -289,7 +295,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
}
if (len(consensus.bhpSigs) + len(consensus.nilSigs)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
consensus.mode.SetMode(Normal)
consensus.SetMode(Normal)
consensus.LeaderPubKey = consensus.PubKey
if len(consensus.m1Payload) == 0 {
consensus.phase = Announce

@ -12,6 +12,7 @@ import (
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -104,6 +105,7 @@ SyncingLoop:
utils.GetLogInstance().Debug("[SYNC] out of sync, doing syncing", "willJoinConsensus", willJoinConsensus)
node.stateMutex.Lock()
node.State = NodeNotInSync
node.Consensus.SetMode(consensus.Syncing)
node.stateMutex.Unlock()
node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false)
getLogger().Debug("now in sync")
@ -112,6 +114,8 @@ SyncingLoop:
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
node.Consensus.SetMode(consensus.Normal)
node.Consensus.SetBlockNum(node.Consensus.ChainReader.CurrentHeader().Number.Uint64() + 1)
node.Consensus.ToggleConsensusCheck()
}
}

Loading…
Cancel
Save