From 471af49477afd6d054e7640cbbfe10d87425378a Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 12 Oct 2021 17:00:17 -0700 Subject: [PATCH] add logic to verify blocks in announce phase (#3897) * add logic to verify blocks in announce phase * add block data in announce message --- consensus/checks.go | 6 +- consensus/consensus.go | 4 +- consensus/consensus_v2.go | 6 +- consensus/construct.go | 1 + consensus/fbft_log.go | 4 +- consensus/leader.go | 2 +- consensus/validator.go | 118 +++++++++++++++++++++++++++----------- 7 files changed, 96 insertions(+), 45 deletions(-) diff --git a/consensus/checks.go b/consensus/checks.go index 45676898a..745fdfb59 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -118,7 +118,7 @@ func (consensus *Consensus) isRightBlockNumCheck(recvMsg *FBFTMessage) bool { return true } -func (consensus *Consensus) onPreparedSanityChecks( +func (consensus *Consensus) newBlockSanityChecks( blockObj *types.Block, recvMsg *FBFTMessage, ) bool { if blockObj.NumberU64() != recvMsg.BlockNum || @@ -126,7 +126,7 @@ func (consensus *Consensus) onPreparedSanityChecks( consensus.getLogger().Warn(). Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("blockNum", blockObj.NumberU64()). - Msg("[OnPrepared] BlockNum not match") + Msg("[newBlockSanityChecks] BlockNum not match") return false } if blockObj.Header().Hash() != recvMsg.BlockHash { @@ -134,7 +134,7 @@ func (consensus *Consensus) onPreparedSanityChecks( Uint64("MsgBlockNum", recvMsg.BlockNum). Hex("MsgBlockHash", recvMsg.BlockHash[:]). Str("blockObjHash", blockObj.Header().Hash().Hex()). - Msg("[OnPrepared] BlockHash not match") + Msg("[newBlockSanityChecks] BlockHash not match") return false } return true diff --git a/consensus/consensus.go b/consensus/consensus.go index c9994e745..246c93852 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -84,6 +84,8 @@ type Consensus struct { IgnoreViewIDCheck *abool.AtomicBool // consensus mutex mutex sync.Mutex + // mutex for verify new block + verifyBlockMutex sync.Mutex // ViewChange struct vc *viewChange // Signal channel for proposing a new block and start new consensus @@ -136,7 +138,7 @@ type Consensus struct { // VerifyBlock is a function used to verify the block and keep trace of verified blocks func (consensus *Consensus) VerifyBlock(block *types.Block) error { - if !consensus.FBFTLog.IsBlockVerified(block) { + if !consensus.FBFTLog.IsBlockVerified(block.Hash()) { if err := consensus.BlockVerifier(block); err != nil { return errors.New("Block verification failed") } diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 0dfab2a8f..77b4c8d9d 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -495,7 +495,7 @@ func (iter *LastMileBlockIter) Next() *types.Block { block := iter.blockCandidates[iter.curIndex] iter.curIndex++ - if !iter.fbftLog.IsBlockVerified(block) { + if !iter.fbftLog.IsBlockVerified(block.Hash()) { if err := iter.verify(block); err != nil { iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block") return nil @@ -558,7 +558,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { go func() { blk.SetCurrentCommitSig(bareMinimumCommit) - if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk)); err != nil { + if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil { consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain") return } @@ -658,7 +658,7 @@ func (consensus *Consensus) tryCatchup() error { func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() { - if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk)); err != nil { + if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil { consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain") return err } diff --git a/consensus/construct.go b/consensus/construct.go index 753df2e56..ebf72c1d9 100644 --- a/consensus/construct.go +++ b/consensus/construct.go @@ -99,6 +99,7 @@ func (consensus *Consensus) construct( needMsgSig := true switch p { case msg_pb.MessageType_ANNOUNCE: + consensusMsg.Block = consensus.block consensusMsg.Payload = consensus.blockHash[:] case msg_pb.MessageType_PREPARE: needMsgSig = false diff --git a/consensus/fbft_log.go b/consensus/fbft_log.go index c028c88ef..1a5966572 100644 --- a/consensus/fbft_log.go +++ b/consensus/fbft_log.go @@ -135,11 +135,11 @@ func (log *FBFTLog) MarkBlockVerified(block *types.Block) { } // IsBlockVerified checks whether the block is verified -func (log *FBFTLog) IsBlockVerified(block *types.Block) bool { +func (log *FBFTLog) IsBlockVerified(hash common.Hash) bool { log.blockLock.RLock() defer log.blockLock.RUnlock() - _, exist := log.verifiedBlocks[block.Hash()] + _, exist := log.verifiedBlocks[hash] return exist } diff --git a/consensus/leader.go b/consensus/leader.go index b6ad925af..335b59bbf 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -29,7 +29,7 @@ func (consensus *Consensus) announce(block *types.Block) { //// Lock Write - Start consensus.mutex.Lock() copy(consensus.blockHash[:], blockHash[:]) - consensus.block = encodedBlock + consensus.block = encodedBlock // Must set block bytes before consensus.construct() consensus.mutex.Unlock() //// Lock Write - End diff --git a/consensus/validator.go b/consensus/validator.go index 1fb424504..d8ea8133d 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -3,6 +3,8 @@ package consensus import ( "encoding/hex" + "github.com/pkg/errors" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" @@ -31,7 +33,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } consensus.StartFinalityCount() - consensus.getLogger().Debug(). + consensus.getLogger().Info(). Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgBlockNum", recvMsg.BlockNum). Msg("[OnAnnounce] Announce message Added") @@ -58,6 +60,81 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } consensus.prepare() consensus.switchPhase("Announce", FBFTPrepare) + + if len(recvMsg.Block) > 0 { + go func() { + // Best effort check, no need to error out. + _, err := consensus.validateNewBlock(recvMsg) + + if err == nil { + consensus.getLogger().Info(). + Msg("[Announce] Block verified") + } + }() + } +} + +func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) { + // Lock to prevent race condition between announce and prepare + consensus.verifyBlockMutex.Lock() + defer consensus.verifyBlockMutex.Unlock() + + if consensus.FBFTLog.IsBlockVerified(recvMsg.BlockHash) { + var blockObj *types.Block + + blockObj = consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) + if blockObj == nil { + if err := rlp.DecodeBytes(recvMsg.Block, &blockObj); err != nil { + consensus.getLogger().Warn(). + Err(err). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Msg("[validateNewBlock] Unparseable block header data") + return nil, errors.New("Failed parsing new block") + } + } + consensus.getLogger().Info(). + Msg("[validateNewBlock] Block Already verified") + return blockObj, nil + } + // check validity of block if any + var blockObj types.Block + if err := rlp.DecodeBytes(recvMsg.Block, &blockObj); err != nil { + consensus.getLogger().Warn(). + Err(err). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Msg("[validateNewBlock] Unparseable block header data") + return nil, errors.New("Failed parsing new block") + } + + consensus.FBFTLog.AddBlock(&blockObj) + + // let this handle it own logs + if !consensus.newBlockSanityChecks(&blockObj, recvMsg) { + return nil, errors.New("new block failed sanity checks") + } + + // add block field + blockPayload := make([]byte, len(recvMsg.Block)) + copy(blockPayload[:], recvMsg.Block[:]) + consensus.block = blockPayload + recvMsg.Block = []byte{} // save memory space + consensus.FBFTLog.AddVerifiedMessage(recvMsg) + consensus.getLogger().Debug(). + Uint64("MsgViewID", recvMsg.ViewID). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Hex("blockHash", recvMsg.BlockHash[:]). + Msg("[validateNewBlock] Prepared message and block added") + + if consensus.BlockVerifier == nil { + consensus.getLogger().Debug().Msg("[validateNewBlock] consensus received message before init. Ignoring") + return nil, errors.New("nil block verifier") + } + + if err := consensus.VerifyBlock(&blockObj); err != nil { + consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed") + return nil, errors.New("Block verification failed") + } + return &blockObj, nil } func (consensus *Consensus) prepare() { @@ -149,41 +226,12 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { return } - // check validity of block - var blockObj types.Block - if err := rlp.DecodeBytes(recvMsg.Block, &blockObj); err != nil { + var blockObj *types.Block + if blockObj, err = consensus.validateNewBlock(recvMsg); err != nil { consensus.getLogger().Warn(). - Err(err). Uint64("MsgBlockNum", recvMsg.BlockNum). - Msg("[OnPrepared] Unparseable block header data") - return - } - // let this handle it own logs - if !consensus.onPreparedSanityChecks(&blockObj, recvMsg) { - return - } - - consensus.FBFTLog.AddBlock(&blockObj) - // add block field - blockPayload := make([]byte, len(recvMsg.Block)) - copy(blockPayload[:], recvMsg.Block[:]) - consensus.block = blockPayload - recvMsg.Block = []byte{} // save memory space - consensus.FBFTLog.AddVerifiedMessage(recvMsg) - consensus.getLogger().Debug(). - Uint64("MsgViewID", recvMsg.ViewID). - Uint64("MsgBlockNum", recvMsg.BlockNum). - Hex("blockHash", recvMsg.BlockHash[:]). - Msg("[OnPrepared] Prepared message and block added") - - if consensus.BlockVerifier == nil { - consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring") - return - } - - if err := consensus.VerifyBlock(&blockObj); err != nil { - consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed") - return + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnPrepared] failed to verify new block") } if consensus.checkViewID(recvMsg) != nil { @@ -212,7 +260,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { // tryCatchup is also run in onCommitted(), so need to lock with commitMutex. if consensus.current.Mode() == Normal { - consensus.sendCommitMessages(&blockObj) + consensus.sendCommitMessages(blockObj) consensus.switchPhase("onPrepared", FBFTCommit) } else { // don't sign the block that is not verified