add logic to verify blocks in announce phase (#3897)

* add logic to verify blocks in announce phase

* add block data in announce message
pull/3872/merge
Rongjian Lan 3 years ago committed by GitHub
parent e08fd4199f
commit 471af49477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      consensus/checks.go
  2. 4
      consensus/consensus.go
  3. 6
      consensus/consensus_v2.go
  4. 1
      consensus/construct.go
  5. 4
      consensus/fbft_log.go
  6. 2
      consensus/leader.go
  7. 118
      consensus/validator.go

@ -118,7 +118,7 @@ func (consensus *Consensus) isRightBlockNumCheck(recvMsg *FBFTMessage) bool {
return true return true
} }
func (consensus *Consensus) onPreparedSanityChecks( func (consensus *Consensus) newBlockSanityChecks(
blockObj *types.Block, recvMsg *FBFTMessage, blockObj *types.Block, recvMsg *FBFTMessage,
) bool { ) bool {
if blockObj.NumberU64() != recvMsg.BlockNum || if blockObj.NumberU64() != recvMsg.BlockNum ||
@ -126,7 +126,7 @@ func (consensus *Consensus) onPreparedSanityChecks(
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("blockNum", blockObj.NumberU64()). Uint64("blockNum", blockObj.NumberU64()).
Msg("[OnPrepared] BlockNum not match") Msg("[newBlockSanityChecks] BlockNum not match")
return false return false
} }
if blockObj.Header().Hash() != recvMsg.BlockHash { if blockObj.Header().Hash() != recvMsg.BlockHash {
@ -134,7 +134,7 @@ func (consensus *Consensus) onPreparedSanityChecks(
Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgBlockNum", recvMsg.BlockNum).
Hex("MsgBlockHash", recvMsg.BlockHash[:]). Hex("MsgBlockHash", recvMsg.BlockHash[:]).
Str("blockObjHash", blockObj.Header().Hash().Hex()). Str("blockObjHash", blockObj.Header().Hash().Hex()).
Msg("[OnPrepared] BlockHash not match") Msg("[newBlockSanityChecks] BlockHash not match")
return false return false
} }
return true return true

@ -84,6 +84,8 @@ type Consensus struct {
IgnoreViewIDCheck *abool.AtomicBool IgnoreViewIDCheck *abool.AtomicBool
// consensus mutex // consensus mutex
mutex sync.Mutex mutex sync.Mutex
// mutex for verify new block
verifyBlockMutex sync.Mutex
// ViewChange struct // ViewChange struct
vc *viewChange vc *viewChange
// Signal channel for proposing a new block and start new consensus // Signal channel for proposing a new block and start new consensus
@ -136,7 +138,7 @@ type Consensus struct {
// VerifyBlock is a function used to verify the block and keep trace of verified blocks // VerifyBlock is a function used to verify the block and keep trace of verified blocks
func (consensus *Consensus) VerifyBlock(block *types.Block) error { func (consensus *Consensus) VerifyBlock(block *types.Block) error {
if !consensus.FBFTLog.IsBlockVerified(block) { if !consensus.FBFTLog.IsBlockVerified(block.Hash()) {
if err := consensus.BlockVerifier(block); err != nil { if err := consensus.BlockVerifier(block); err != nil {
return errors.New("Block verification failed") return errors.New("Block verification failed")
} }

@ -495,7 +495,7 @@ func (iter *LastMileBlockIter) Next() *types.Block {
block := iter.blockCandidates[iter.curIndex] block := iter.blockCandidates[iter.curIndex]
iter.curIndex++ iter.curIndex++
if !iter.fbftLog.IsBlockVerified(block) { if !iter.fbftLog.IsBlockVerified(block.Hash()) {
if err := iter.verify(block); err != nil { if err := iter.verify(block); err != nil {
iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block") iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block")
return nil return nil
@ -558,7 +558,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
go func() { go func() {
blk.SetCurrentCommitSig(bareMinimumCommit) blk.SetCurrentCommitSig(bareMinimumCommit)
if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk)); err != nil { if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain") consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return return
} }
@ -658,7 +658,7 @@ func (consensus *Consensus) tryCatchup() error {
func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() { if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() {
if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk)); err != nil { if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain") consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain")
return err return err
} }

@ -99,6 +99,7 @@ func (consensus *Consensus) construct(
needMsgSig := true needMsgSig := true
switch p { switch p {
case msg_pb.MessageType_ANNOUNCE: case msg_pb.MessageType_ANNOUNCE:
consensusMsg.Block = consensus.block
consensusMsg.Payload = consensus.blockHash[:] consensusMsg.Payload = consensus.blockHash[:]
case msg_pb.MessageType_PREPARE: case msg_pb.MessageType_PREPARE:
needMsgSig = false needMsgSig = false

@ -135,11 +135,11 @@ func (log *FBFTLog) MarkBlockVerified(block *types.Block) {
} }
// IsBlockVerified checks whether the block is verified // IsBlockVerified checks whether the block is verified
func (log *FBFTLog) IsBlockVerified(block *types.Block) bool { func (log *FBFTLog) IsBlockVerified(hash common.Hash) bool {
log.blockLock.RLock() log.blockLock.RLock()
defer log.blockLock.RUnlock() defer log.blockLock.RUnlock()
_, exist := log.verifiedBlocks[block.Hash()] _, exist := log.verifiedBlocks[hash]
return exist return exist
} }

@ -29,7 +29,7 @@ func (consensus *Consensus) announce(block *types.Block) {
//// Lock Write - Start //// Lock Write - Start
consensus.mutex.Lock() consensus.mutex.Lock()
copy(consensus.blockHash[:], blockHash[:]) copy(consensus.blockHash[:], blockHash[:])
consensus.block = encodedBlock consensus.block = encodedBlock // Must set block bytes before consensus.construct()
consensus.mutex.Unlock() consensus.mutex.Unlock()
//// Lock Write - End //// Lock Write - End

@ -3,6 +3,8 @@ package consensus
import ( import (
"encoding/hex" "encoding/hex"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -31,7 +33,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
} }
consensus.StartFinalityCount() consensus.StartFinalityCount()
consensus.getLogger().Debug(). consensus.getLogger().Info().
Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnAnnounce] Announce message Added") Msg("[OnAnnounce] Announce message Added")
@ -58,6 +60,81 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
} }
consensus.prepare() consensus.prepare()
consensus.switchPhase("Announce", FBFTPrepare) consensus.switchPhase("Announce", FBFTPrepare)
if len(recvMsg.Block) > 0 {
go func() {
// Best effort check, no need to error out.
_, err := consensus.validateNewBlock(recvMsg)
if err == nil {
consensus.getLogger().Info().
Msg("[Announce] Block verified")
}
}()
}
}
func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) {
// Lock to prevent race condition between announce and prepare
consensus.verifyBlockMutex.Lock()
defer consensus.verifyBlockMutex.Unlock()
if consensus.FBFTLog.IsBlockVerified(recvMsg.BlockHash) {
var blockObj *types.Block
blockObj = consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash)
if blockObj == nil {
if err := rlp.DecodeBytes(recvMsg.Block, &blockObj); err != nil {
consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[validateNewBlock] Unparseable block header data")
return nil, errors.New("Failed parsing new block")
}
}
consensus.getLogger().Info().
Msg("[validateNewBlock] Block Already verified")
return blockObj, nil
}
// check validity of block if any
var blockObj types.Block
if err := rlp.DecodeBytes(recvMsg.Block, &blockObj); err != nil {
consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[validateNewBlock] Unparseable block header data")
return nil, errors.New("Failed parsing new block")
}
consensus.FBFTLog.AddBlock(&blockObj)
// let this handle it own logs
if !consensus.newBlockSanityChecks(&blockObj, recvMsg) {
return nil, errors.New("new block failed sanity checks")
}
// add block field
blockPayload := make([]byte, len(recvMsg.Block))
copy(blockPayload[:], recvMsg.Block[:])
consensus.block = blockPayload
recvMsg.Block = []byte{} // save memory space
consensus.FBFTLog.AddVerifiedMessage(recvMsg)
consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Hex("blockHash", recvMsg.BlockHash[:]).
Msg("[validateNewBlock] Prepared message and block added")
if consensus.BlockVerifier == nil {
consensus.getLogger().Debug().Msg("[validateNewBlock] consensus received message before init. Ignoring")
return nil, errors.New("nil block verifier")
}
if err := consensus.VerifyBlock(&blockObj); err != nil {
consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed")
return nil, errors.New("Block verification failed")
}
return &blockObj, nil
} }
func (consensus *Consensus) prepare() { func (consensus *Consensus) prepare() {
@ -149,41 +226,12 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) {
return return
} }
// check validity of block var blockObj *types.Block
var blockObj types.Block if blockObj, err = consensus.validateNewBlock(recvMsg); err != nil {
if err := rlp.DecodeBytes(recvMsg.Block, &blockObj); err != nil {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnPrepared] Unparseable block header data") Uint64("MsgViewID", recvMsg.ViewID).
return Msg("[OnPrepared] failed to verify new block")
}
// let this handle it own logs
if !consensus.onPreparedSanityChecks(&blockObj, recvMsg) {
return
}
consensus.FBFTLog.AddBlock(&blockObj)
// add block field
blockPayload := make([]byte, len(recvMsg.Block))
copy(blockPayload[:], recvMsg.Block[:])
consensus.block = blockPayload
recvMsg.Block = []byte{} // save memory space
consensus.FBFTLog.AddVerifiedMessage(recvMsg)
consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Hex("blockHash", recvMsg.BlockHash[:]).
Msg("[OnPrepared] Prepared message and block added")
if consensus.BlockVerifier == nil {
consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring")
return
}
if err := consensus.VerifyBlock(&blockObj); err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed")
return
} }
if consensus.checkViewID(recvMsg) != nil { if consensus.checkViewID(recvMsg) != nil {
@ -212,7 +260,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) {
// tryCatchup is also run in onCommitted(), so need to lock with commitMutex. // tryCatchup is also run in onCommitted(), so need to lock with commitMutex.
if consensus.current.Mode() == Normal { if consensus.current.Mode() == Normal {
consensus.sendCommitMessages(&blockObj) consensus.sendCommitMessages(blockObj)
consensus.switchPhase("onPrepared", FBFTCommit) consensus.switchPhase("onPrepared", FBFTCommit)
} else { } else {
// don't sign the block that is not verified // don't sign the block that is not verified

Loading…
Cancel
Save