diff --git a/.gitignore b/.gitignore index f20cf6adf..46971b7d9 100644 --- a/.gitignore +++ b/.gitignore @@ -83,3 +83,4 @@ db-127.0.0.1-* # testdata .testdata +coverage.txt diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 455ba692c..873da9e90 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -927,7 +927,7 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, isBeac utils.Logger().Info(). Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) - return + break } utils.Logger().Info(). Msgf("[SYNC] Node is OUT OF SYNC (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", @@ -943,15 +943,39 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, isBeac utils.Logger().Error().Err(err). Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) + ss.purgeOldBlocksFromCache() + break } ss.purgeOldBlocksFromCache() - if consensus != nil { - consensus.SetMode(consensus.UpdateConsensusInformation()) + } + if consensus != nil { + if err := ss.addConsensusLastMile(bc, consensus); err != nil { + utils.Logger().Error().Err(err).Msg("[SYNC] Add consensus last mile") } + consensus.SetMode(consensus.UpdateConsensusInformation()) } ss.purgeAllBlocksFromCache() } +func (ss *StateSync) addConsensusLastMile(bc *core.BlockChain, consensus *consensus.Consensus) error { + curNumber := bc.CurrentBlock().NumberU64() + blockIter, err := consensus.GetLastMileBlockIter(curNumber + 1) + if err != nil { + return err + } + for { + block := blockIter.Next() + if block == nil { + break + } + if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil { + return errors.Wrap(err, "failed to InsertChain") + } + } + consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64() + 1) + return nil +} + // GetSyncingPort returns the syncing port. func GetSyncingPort(nodePort string) string { if port, err := strconv.Atoi(nodePort); err == nil { diff --git a/consensus/checks.go b/consensus/checks.go index 4091ff851..138f9d637 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -10,7 +10,6 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/hash" - "github.com/harmony-one/harmony/internal/chain" "github.com/pkg/errors" ) @@ -136,24 +135,6 @@ func (consensus *Consensus) onPreparedSanityChecks( Msg("[OnPrepared] BlockHash not match") return false } - if consensus.current.Mode() == Normal { - err := chain.Engine.VerifyHeader(consensus.ChainReader, blockObj.Header(), true) - if err != nil { - consensus.getLogger().Error(). - Err(err). - Str("inChain", consensus.ChainReader.CurrentHeader().Number().String()). - Str("MsgBlockNum", blockObj.Header().Number().String()). - Msg("[OnPrepared] Block header is not verified successfully") - return false - } - if consensus.BlockVerifier == nil { - // do nothing - } else if err := consensus.BlockVerifier(blockObj); err != nil { - consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed") - return false - } - } - return true } diff --git a/consensus/consensus.go b/consensus/consensus.go index 45f80657f..2fb79a7aa 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -79,7 +79,7 @@ type Consensus struct { ReadySignal chan struct{} // The post-consensus processing func passed from Node object // Called when consensus on a new block is done - OnConsensusDone func(*types.Block) + OnConsensusDone func(*types.Block) error // The verifier func passed from Node object BlockVerifier BlockVerifierFunc // verified block to state sync broadcast @@ -176,7 +176,7 @@ func New( consensus.Decider = Decider consensus.host = host consensus.msgSender = NewMessageSender(host) - consensus.BlockNumLowChan = make(chan struct{}) + consensus.BlockNumLowChan = make(chan struct{}, 1) // FBFT related consensus.FBFTLog = NewFBFTLog() consensus.phase = FBFTAnnounce diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index a9a480b9a..2cada40e4 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -37,7 +37,6 @@ func TestConsensusInitialization(t *testing.T) { // FBFTLog assert.Equal(t, fbtLog, consensus.FBFTLog) - assert.Equal(t, maxLogSize, consensus.FBFTLog.maxLogSize) assert.Equal(t, FBFTAnnounce, consensus.phase) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 10f52a16a..ce21feb73 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -7,12 +7,13 @@ import ( "sync/atomic" "time" - "github.com/harmony-one/harmony/crypto/bls" + "github.com/rs/zerolog" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/p2p" @@ -222,103 +223,6 @@ func (consensus *Consensus) BlockCommitSig(blockNum uint64) ([]byte, []byte, err return aggSig, bitmap, nil } -// try to catch up if fall behind -func (consensus *Consensus) tryCatchup() { - consensus.getLogger().Info().Msg("[TryCatchup] commit new blocks") - currentBlockNum := consensus.blockNum - for { - msgs := consensus.FBFTLog.GetMessagesByTypeSeq( - msg_pb.MessageType_COMMITTED, consensus.blockNum, - ) - if len(msgs) == 0 { - break - } - if len(msgs) > 1 { - consensus.getLogger().Error(). - Int("numMsgs", len(msgs)). - Msg("[TryCatchup] DANGER!!! we should only get one committed message for a given blockNum") - } - - var committedMsg *FBFTMessage - var block *types.Block - for i := range msgs { - tmpBlock := consensus.FBFTLog.GetBlockByHash(msgs[i].BlockHash) - if tmpBlock == nil { - blksRepr, msgsRepr, incomingMsg := - consensus.FBFTLog.Blocks().String(), - consensus.FBFTLog.Messages().String(), - msgs[i].String() - consensus.getLogger().Debug(). - Str("FBFT-log-blocks", blksRepr). - Str("FBFT-log-messages", msgsRepr). - Str("incoming-message", incomingMsg). - Uint64("blockNum", msgs[i].BlockNum). - Uint64("viewID", msgs[i].ViewID). - Str("blockHash", msgs[i].BlockHash.Hex()). - Msg("[TryCatchup] Failed finding a matching block for committed message") - continue - } - - committedMsg = msgs[i] - block = tmpBlock - break - } - if block == nil || committedMsg == nil { - consensus.getLogger().Error().Msg("[TryCatchup] Failed finding a valid committed message.") - break - } - - if block.ParentHash() != consensus.ChainReader.CurrentHeader().Hash() { - consensus.getLogger().Debug().Msg("[TryCatchup] parent block hash not match") - break - } - consensus.getLogger().Info().Msg("[TryCatchup] block found to commit") - - preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeqHash( - msg_pb.MessageType_PREPARED, committedMsg.BlockNum, committedMsg.BlockHash, - ) - msg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs) - if msg == nil { - break - } - consensus.getLogger().Info().Msg("[TryCatchup] prepared message found to commit") - - atomic.AddUint64(&consensus.blockNum, 1) - consensus.SetCurBlockViewID(committedMsg.ViewID + 1) - consensus.LeaderPubKey = committedMsg.SenderPubkey - - consensus.getLogger().Info().Msg("[TryCatchup] Adding block to chain") - - // Fill in the commit signatures - block.SetCurrentCommitSig(committedMsg.Payload) - consensus.OnConsensusDone(block) - consensus.ResetState() - - select { - case consensus.VerifiedNewBlock <- block: - default: - consensus.getLogger().Info(). - Str("blockHash", block.Hash().String()). - Msg("[TryCatchup] consensus verified block send to chan failed") - continue - } - - break - } - if currentBlockNum < consensus.blockNum { - consensus.switchPhase("TryCatchup", FBFTAnnounce) - } - // catup up and skip from view change trap - if currentBlockNum < consensus.blockNum && - consensus.IsViewChangingMode() { - consensus.current.SetMode(Normal) - consensus.consensusTimeout[timeoutViewChange].Stop() - } - // clean up old log - consensus.FBFTLog.DeleteBlocksLessThan(consensus.blockNum - 1) - consensus.FBFTLog.DeleteMessagesLessThan(consensus.blockNum - 1) -} - // Start waits for the next new block and run consensus func (consensus *Consensus) Start( blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{}, @@ -372,7 +276,7 @@ func (consensus *Consensus) Start( break } else { consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!") - viewID := consensus.GetViewChangingID() + viewID := consensus.GetCurBlockViewID() consensus.startViewChange(viewID + 1) break } @@ -501,6 +405,146 @@ func (consensus *Consensus) Start( }() } +// LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache. +// All blocks returned are guaranteed to pass the verification. +type LastMileBlockIter struct { + blockCandidates []*types.Block + fbftLog FBFTLog + verify func(*types.Block) error + curIndex int + logger *zerolog.Logger +} + +// GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart +func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64) (*LastMileBlockIter, error) { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + + if consensus.BlockVerifier == nil { + return nil, errors.New("consensus haven't initialized yet") + } + blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart) + if err != nil { + return nil, err + } + return &LastMileBlockIter{ + blockCandidates: blocks, + verify: consensus.BlockVerifier, + curIndex: 0, + logger: consensus.getLogger(), + }, nil +} + +// Next iterate to the next last mile block +func (iter *LastMileBlockIter) Next() *types.Block { + if iter.curIndex >= len(iter.blockCandidates) { + return nil + } + block := iter.blockCandidates[iter.curIndex] + iter.curIndex++ + + if !iter.fbftLog.IsBlockVerified(block) { + if err := iter.verify(block); err != nil { + iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block") + return nil + } + iter.fbftLog.MarkBlockVerified(block) + } + return block +} + +func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Block, []*FBFTMessage, error) { + var ( + blocks []*types.Block + msgs []*FBFTMessage + ) + for blockNum := bnStart; ; blockNum++ { + blk, msg, err := consensus.FBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger()) + if err != nil { + if err == errFBFTLogNotFound { + break + } + return nil, nil, err + } + blocks = append(blocks, blk) + msgs = append(msgs, msg) + } + return blocks, msgs, nil +} + +// tryCatchup add the last mile block in PBFT log memory cache to blockchain. +func (consensus *Consensus) tryCatchup() error { + // TODO: change this to a more systematic symbol + if consensus.BlockVerifier == nil { + return errors.New("consensus haven't finished initialization") + } + initBN := consensus.blockNum + defer consensus.postCatchup(initBN) + + blks, msgs, err := consensus.getLastMileBlocksAndMsg(initBN) + if err != nil { + return errors.Wrapf(err, "[TryCatchup] Failed to get last mile blocks: %v", err) + } + for i := range blks { + blk, msg := blks[i], msgs[i] + if blk == nil { + return nil + } + blk.SetCurrentCommitSig(msg.Payload) + + if !consensus.FBFTLog.IsBlockVerified(blk) { + if err := consensus.BlockVerifier(blk); err != nil { + consensus.getLogger().Err(err).Msg("[TryCatchup] failed block verifier") + return err + } + consensus.FBFTLog.MarkBlockVerified(blk) + } + consensus.getLogger().Info().Msg("[TryCatchup] Adding block to chain") + if err := consensus.commitBlock(blk, msgs[i]); err != nil { + consensus.getLogger().Error().Err(err).Msg("[TryCatchup] Failed to add block to chain") + return err + } + select { + case consensus.VerifiedNewBlock <- blk: + default: + consensus.getLogger().Info(). + Str("blockHash", blk.Hash().String()). + Msg("[TryCatchup] consensus verified block send to chan failed") + continue + } + } + return nil +} + +func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { + if err := consensus.OnConsensusDone(blk); err != nil { + return err + } + + atomic.AddUint64(&consensus.blockNum, 1) + consensus.SetCurBlockViewID(committedMsg.ViewID + 1) + consensus.LeaderPubKey = committedMsg.SenderPubkey + consensus.ResetState() + return nil +} + +func (consensus *Consensus) postCatchup(initBN uint64) { + if initBN < consensus.blockNum { + consensus.getLogger().Info(). + Uint64("From", initBN). + Uint64("To", consensus.blockNum). + Msg("[TryCatchup] Caught up!") + consensus.switchPhase("TryCatchup", FBFTAnnounce) + } + // catch up and skip from view change trap + if initBN < consensus.blockNum && consensus.IsViewChangingMode() { + consensus.current.SetMode(Normal) + consensus.consensusTimeout[timeoutViewChange].Stop() + } + // clean up old log + consensus.FBFTLog.PruneCacheBeforeBlock(consensus.blockNum) +} + // GenerateVrfAndProof generates new VRF/Proof from hash of previous block func (consensus *Consensus) GenerateVrfAndProof(newBlock *types.Block, vrfBlockNumbers []uint64) []uint64 { key, err := consensus.GetConsensusLeaderPrivateKey() diff --git a/consensus/fbft_log.go b/consensus/fbft_log.go index 0591a4689..a684fbff3 100644 --- a/consensus/fbft_log.go +++ b/consensus/fbft_log.go @@ -1,25 +1,21 @@ package consensus import ( + "encoding/binary" "fmt" + "sync" - "github.com/harmony-one/harmony/crypto/bls" - - mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" bls_core "github.com/harmony-one/bls/ffi/go/bls" + "github.com/pkg/errors" + "github.com/rs/zerolog" + msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls" ) -// FBFTLog represents the log stored by a node during FBFT process -type FBFTLog struct { - blocks mapset.Set //store blocks received in FBFT - messages mapset.Set // store messages received in FBFT - maxLogSize uint32 -} - // FBFTMessage is the record of pbft messages received by a node during FBFT process type FBFTMessage struct { MessageType msg_pb.MessageType @@ -59,106 +55,153 @@ func (m *FBFTMessage) String() string { ) } +const ( + idTypeBytes = 4 + idHashBytes = common.HashLength + idSenderBytes = bls.PublicKeySizeInBytes + + idBytes = idTypeBytes + idHashBytes + idSenderBytes +) + +type ( + // fbftMsgID is the id that uniquely defines a fbft message. + fbftMsgID [idBytes]byte +) + +// id return the ID of the FBFT message which uniquely identifies a FBFT message. +// The ID is a concatenation of MsgType, BlockHash, and sender key +func (m *FBFTMessage) id() fbftMsgID { + var id fbftMsgID + binary.LittleEndian.PutUint32(id[:], uint32(m.MessageType)) + copy(id[idTypeBytes:], m.BlockHash[:]) + if m.SenderPubkey != nil { + copy(id[idTypeBytes+idHashBytes:], m.SenderPubkey.Bytes[:]) + } + return id +} + +// FBFTLog represents the log stored by a node during FBFT process +type FBFTLog struct { + blocks map[common.Hash]*types.Block // store blocks received in FBFT + verifiedBlocks map[common.Hash]struct{} // store block hashes for blocks that has already been verified + blockLock sync.RWMutex + + messages map[fbftMsgID]*FBFTMessage // store messages received in FBFT + msgLock sync.RWMutex +} + // NewFBFTLog returns new instance of FBFTLog func NewFBFTLog() *FBFTLog { - blocks := mapset.NewSet() - messages := mapset.NewSet() - logSize := maxLogSize - pbftLog := FBFTLog{blocks: blocks, messages: messages, maxLogSize: logSize} + pbftLog := FBFTLog{ + blocks: make(map[common.Hash]*types.Block), + messages: make(map[fbftMsgID]*FBFTMessage), + verifiedBlocks: make(map[common.Hash]struct{}), + } return &pbftLog } -// Blocks return the blocks stored in the log -func (log *FBFTLog) Blocks() mapset.Set { - return log.blocks +// AddBlock add a new block into the log +func (log *FBFTLog) AddBlock(block *types.Block) { + log.blockLock.Lock() + defer log.blockLock.Unlock() + + log.blocks[block.Hash()] = block } -// Messages return the messages stored in the log -func (log *FBFTLog) Messages() mapset.Set { - return log.messages +// MarkBlockVerified marks the block as verified +func (log *FBFTLog) MarkBlockVerified(block *types.Block) { + log.blockLock.Lock() + defer log.blockLock.Unlock() + + log.verifiedBlocks[block.Hash()] = struct{}{} } -// AddBlock add a new block into the log -func (log *FBFTLog) AddBlock(block *types.Block) { - log.blocks.Add(block) +// IsBlockVerified checks whether the block is verified +func (log *FBFTLog) IsBlockVerified(block *types.Block) bool { + log.blockLock.RLock() + defer log.blockLock.RUnlock() + + _, exist := log.verifiedBlocks[block.Hash()] + return exist } // GetBlockByHash returns the block matches the given block hash func (log *FBFTLog) GetBlockByHash(hash common.Hash) *types.Block { - var found *types.Block - it := log.Blocks().Iterator() - for block := range it.C { - if block.(*types.Block).Header().Hash() == hash { - found = block.(*types.Block) - it.Stop() - } - } - return found + log.blockLock.RLock() + defer log.blockLock.RUnlock() + + return log.blocks[hash] } // GetBlocksByNumber returns the blocks match the given block number func (log *FBFTLog) GetBlocksByNumber(number uint64) []*types.Block { - found := []*types.Block{} - it := log.Blocks().Iterator() - for block := range it.C { - if block.(*types.Block).NumberU64() == number { - found = append(found, block.(*types.Block)) + log.blockLock.RLock() + defer log.blockLock.RUnlock() + + var blocks []*types.Block + for _, block := range log.blocks { + if block.NumberU64() == number { + blocks = append(blocks, block) } } - return found + return blocks } // DeleteBlocksLessThan deletes blocks less than given block number func (log *FBFTLog) DeleteBlocksLessThan(number uint64) { - found := mapset.NewSet() - it := log.Blocks().Iterator() - for block := range it.C { - if block.(*types.Block).NumberU64() < number { - found.Add(block) + log.blockLock.Lock() + defer log.blockLock.Unlock() + + for h, block := range log.blocks { + if block.NumberU64() < number { + delete(log.blocks, h) + delete(log.verifiedBlocks, h) } } - log.blocks = log.blocks.Difference(found) } // DeleteBlockByNumber deletes block of specific number func (log *FBFTLog) DeleteBlockByNumber(number uint64) { - found := mapset.NewSet() - it := log.Blocks().Iterator() - for block := range it.C { - if block.(*types.Block).NumberU64() == number { - found.Add(block) + log.blockLock.Lock() + defer log.blockLock.Unlock() + + for h, block := range log.blocks { + if block.NumberU64() == number { + delete(log.blocks, h) + delete(log.verifiedBlocks, h) } } - log.blocks = log.blocks.Difference(found) } // DeleteMessagesLessThan deletes messages less than given block number func (log *FBFTLog) DeleteMessagesLessThan(number uint64) { - found := mapset.NewSet() - it := log.Messages().Iterator() - for msg := range it.C { - if msg.(*FBFTMessage).BlockNum < number { - found.Add(msg) + log.msgLock.Lock() + defer log.msgLock.Unlock() + + for h, msg := range log.messages { + if msg.BlockNum < number { + delete(log.messages, h) } } - log.messages = log.messages.Difference(found) } // AddMessage adds a pbft message into the log func (log *FBFTLog) AddMessage(msg *FBFTMessage) { - log.messages.Add(msg) + log.msgLock.Lock() + defer log.msgLock.Unlock() + + log.messages[msg.id()] = msg } // GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage { - found := []*FBFTMessage{} - it := log.Messages().Iterator() - for msg := range it.C { - if msg.(*FBFTMessage).MessageType == typ && - msg.(*FBFTMessage).BlockNum == blockNum && - msg.(*FBFTMessage).ViewID == viewID && - msg.(*FBFTMessage).BlockHash == blockHash { - found = append(found, msg.(*FBFTMessage)) + log.msgLock.RLock() + defer log.msgLock.RUnlock() + + var found []*FBFTMessage + for _, msg := range log.messages { + if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash { + found = append(found, msg) } } return found @@ -166,12 +209,13 @@ func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNu // GetMessagesByTypeSeq returns pbft messages with matching type, blockNum func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage { - found := []*FBFTMessage{} - it := log.Messages().Iterator() - for msg := range it.C { - if msg.(*FBFTMessage).MessageType == typ && - msg.(*FBFTMessage).BlockNum == blockNum { - found = append(found, msg.(*FBFTMessage)) + log.msgLock.RLock() + defer log.msgLock.RUnlock() + + var found []*FBFTMessage + for _, msg := range log.messages { + if msg.MessageType == typ && msg.BlockNum == blockNum { + found = append(found, msg) } } return found @@ -179,13 +223,13 @@ func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64 // GetMessagesByTypeSeqHash returns pbft messages with matching type, blockNum func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage { - found := []*FBFTMessage{} - it := log.Messages().Iterator() - for msg := range it.C { - if msg.(*FBFTMessage).MessageType == typ && - msg.(*FBFTMessage).BlockNum == blockNum && - msg.(*FBFTMessage).BlockHash == blockHash { - found = append(found, msg.(*FBFTMessage)) + log.msgLock.RLock() + defer log.msgLock.RUnlock() + + var found []*FBFTMessage + for _, msg := range log.messages { + if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash { + found = append(found, msg) } } return found @@ -217,13 +261,15 @@ func (log *FBFTLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, bloc // GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum uint64, viewID uint64) []*FBFTMessage { - found := []*FBFTMessage{} - it := log.Messages().Iterator() - for msg := range it.C { - if msg.(*FBFTMessage).MessageType != typ || msg.(*FBFTMessage).BlockNum != blockNum || msg.(*FBFTMessage).ViewID != viewID { + log.msgLock.RLock() + defer log.msgLock.RUnlock() + + var found []*FBFTMessage + for _, msg := range log.messages { + if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID { continue } - found = append(found, msg.(*FBFTMessage)) + found = append(found, msg) } return found } @@ -267,3 +313,37 @@ func ParseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage, error) { return &pbftMsg, nil } + +var errFBFTLogNotFound = errors.New("FBFT log not found") + +// GetCommittedBlockAndMsgsFromNumber get committed block and message starting from block number bn. +func (log *FBFTLog) GetCommittedBlockAndMsgsFromNumber(bn uint64, logger *zerolog.Logger) (*types.Block, *FBFTMessage, error) { + msgs := log.GetMessagesByTypeSeq( + msg_pb.MessageType_COMMITTED, bn, + ) + if len(msgs) == 0 { + return nil, nil, errFBFTLogNotFound + } + if len(msgs) > 1 { + logger.Error().Int("numMsgs", len(msgs)).Err(errors.New("DANGER!!! multiple COMMITTED message in PBFT log observed")) + } + for i := range msgs { + block := log.GetBlockByHash(msgs[i].BlockHash) + if block == nil { + logger.Debug(). + Uint64("blockNum", msgs[i].BlockNum). + Uint64("viewID", msgs[i].ViewID). + Str("blockHash", msgs[i].BlockHash.Hex()). + Err(errors.New("failed finding a matching block for committed message")) + continue + } + return block, msgs[i], nil + } + return nil, nil, errFBFTLogNotFound +} + +// PruneCacheBeforeBlock prune all blocks before bn +func (log *FBFTLog) PruneCacheBeforeBlock(bn uint64) { + log.DeleteBlocksLessThan(bn - 1) + log.DeleteMessagesLessThan(bn - 1) +} diff --git a/consensus/fbft_log_test.go b/consensus/fbft_log_test.go index 48926a9ab..3dc1f2ce5 100644 --- a/consensus/fbft_log_test.go +++ b/consensus/fbft_log_test.go @@ -1,11 +1,57 @@ package consensus import ( + "bytes" + "encoding/binary" "testing" msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/crypto/bls" ) +func TestFBFTLog_id(t *testing.T) { + tests := []FBFTMessage{ + { + MessageType: msg_pb.MessageType_ANNOUNCE, + BlockHash: [32]byte{01, 02}, + SenderPubkey: &bls.PublicKeyWrapper{ + Bytes: bls.SerializedPublicKey{0x01, 0x02}, + }, + }, + { + MessageType: msg_pb.MessageType_COMMIT, + BlockHash: [32]byte{02, 03}, + }, + } + for _, msg := range tests { + id := msg.id() + + if uint32(msg.MessageType) != binary.LittleEndian.Uint32(id[:]) { + t.Errorf("message type not expected") + } + if !bytes.Equal(id[idTypeBytes:idTypeBytes+idHashBytes], msg.BlockHash[:]) { + t.Errorf("block hash not expected") + } + if msg.SenderPubkey == nil { + var emptyKey bls.SerializedPublicKey + if !bytes.Equal(id[idTypeBytes+idHashBytes:], emptyKey[:]) { + t.Errorf("sender key not expected when empty") + } + } else { + if !bytes.Equal(id[idTypeBytes+idHashBytes:], msg.SenderPubkey.Bytes[:]) { + t.Errorf("sender key not expected") + } + } + if idDup := msg.id(); idDup != id { + t.Errorf("id not replicable") + } + msg.MessageType = 100 + if idDiff := msg.id(); idDiff == id { + t.Errorf("id not unique") + } + } +} + func TestGetMessagesByTypeSeqViewHash(t *testing.T) { pbftMsg := FBFTMessage{ MessageType: msg_pb.MessageType_ANNOUNCE, diff --git a/consensus/validator.go b/consensus/validator.go index c39297265..df15c2011 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -146,6 +146,10 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { Msg("Wrong BlockNum Received, ignoring!") return } + if recvMsg.BlockNum > consensus.blockNum { + consensus.getLogger().Warn().Msgf("[OnPrepared] low consensus block number. Spin sync") + consensus.spinUpStateSync() + } // check validity of prepared signature blockHash := recvMsg.BlockHash @@ -154,13 +158,10 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") return } - if !consensus.Decider.IsQuorumAchievedByMask(mask) { - consensus.getLogger().Warn(). - Msgf("[OnPrepared] Quorum Not achieved") + consensus.getLogger().Warn().Msgf("[OnPrepared] Quorum Not achieved.") return } - if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) { myBlockHash := common.Hash{} myBlockHash.SetBytes(consensus.blockHash[:]) @@ -201,13 +202,20 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { Msg("[OnPrepared] Prepared message and block added") // tryCatchup is also run in onCommitted(), so need to lock with commitMutex. - consensus.tryCatchup() - if consensus.current.Mode() != Normal { // don't sign the block that is not verified consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!") return } + if consensus.BlockVerifier == nil { + consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring") + return + } + if err := consensus.BlockVerifier(&blockObj); err != nil { + consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed") + return + } + consensus.FBFTLog.MarkBlockVerified(&blockObj) if consensus.checkViewID(recvMsg) != nil { if consensus.current.Mode() == Normal { @@ -255,16 +263,18 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { if !consensus.isRightBlockNumCheck(recvMsg) { return } + if recvMsg.BlockNum > consensus.blockNum { + consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync") + consensus.spinUpStateSync() + } aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) if err != nil { consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") return } - if !consensus.Decider.IsQuorumAchievedByMask(mask) { - consensus.getLogger().Warn(). - Msgf("[OnCommitted] Quorum Not achieved") + consensus.getLogger().Warn().Msgf("[OnCommitted] Quorum Not achieved.") return } @@ -295,22 +305,12 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.aggregatedCommitSig = aggSig consensus.commitBitmap = mask - if recvMsg.BlockNum > consensus.blockNum && recvMsg.BlockNum-consensus.blockNum > consensusBlockNumBuffer { + consensus.tryCatchup() + if recvMsg.BlockNum > consensus.blockNum { consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC") - go func() { - select { - case consensus.BlockNumLowChan <- struct{}{}: - consensus.current.SetMode(Syncing) - for _, v := range consensus.consensusTimeout { - v.Stop() - } - case <-time.After(1 * time.Second): - } - }() return } - consensus.tryCatchup() if consensus.IsViewChangingMode() { consensus.getLogger().Info().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!") return @@ -324,3 +324,14 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { } consensus.consensusTimeout[timeoutConsensus].Start() } + +func (consensus *Consensus) spinUpStateSync() { + select { + case consensus.BlockNumLowChan <- struct{}{}: + consensus.current.SetMode(Syncing) + for _, v := range consensus.consensusTimeout { + v.Stop() + } + default: + } +} diff --git a/node/node.go b/node/node.go index bbf42e3fb..8dd212826 100644 --- a/node/node.go +++ b/node/node.go @@ -919,7 +919,7 @@ func New( } node.pendingCXReceipts = map[string]*types.CXReceiptsProof{} - node.Consensus.VerifiedNewBlock = make(chan *types.Block) + node.Consensus.VerifiedNewBlock = make(chan *types.Block, 1) chain.Engine.SetBeaconchain(beaconChain) // the sequence number is the next block number to be added in consensus protocol, which is // always one more than current chain header block diff --git a/node/node_handler.go b/node/node_handler.go index 4013bc1a4..42c8a7920 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -352,17 +352,9 @@ func (node *Node) numSignaturesIncludedInBlock(block *types.Block) uint32 { // 1. add the new block to blockchain // 2. [leader] send new block to the client // 3. [leader] send cross shard tx receipts to destination shard -func (node *Node) PostConsensusProcessing( - newBlock *types.Block, -) { +func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { if _, err := node.Blockchain().InsertChain([]*types.Block{newBlock}, true); err != nil { - utils.Logger().Error(). - Err(err). - Uint64("blockNum", newBlock.NumberU64()). - Str("parentHash", newBlock.Header().ParentHash().Hex()). - Str("hash", newBlock.Header().Hash().Hex()). - Msg("Error Adding new block to blockchain") - return + return err } utils.Logger().Info(). Uint64("blockNum", newBlock.NumberU64()). @@ -412,11 +404,11 @@ func (node *Node) PostConsensusProcessing( for _, addr := range node.GetAddresses(newBlock.Epoch()) { wrapper, err := node.Beaconchain().ReadValidatorInformation(addr) if err != nil { - return + return err } snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr) if err != nil { - return + return err } computed := availability.ComputeCurrentSigning( snapshot.Validator, wrapper, @@ -435,6 +427,7 @@ func (node *Node) PostConsensusProcessing( } } } + return nil } // BootstrapConsensus is the a goroutine to check number of peers and start the consensus