[consensus][sync] Better coordination between state sync and consensus module. (#3352)

* [rawdb] add error handling to all rawdb write. Add fdlimit module. Fix the node stuck

* [core] switch back the batch write condition in InsertReceiptChain

* [rawdb] add error handling to all rawdb write. Add fdlimit module. Fix the node stuck

* [consensus] refactored and optimized tryCatchup logic

* [sync] added consensus last mile block in sync.

* [consensus] remove time wait for consensus inform sync. Make block low chan a buffered chan

* [consensus] fix rebase errors, and optimize one line code

* [consensus][sync] fix golint error and added prune logic in sync

* [consensus] move header verify after adding FBFT log in onPrepared

* [consensus] more change on block verification logic

* [consensus] fix the verified panic issue

* [consensus][sync] add block verification in consensus last mile, change it to iterator

* [consensus] fix two nil pointer references when running local node (Still cannot find the root cause for it)

* remove coverage.txt and add to gitignore

* [consensus] add leader key check. Move quorum check logic after tryCatchup and can spin state sync

* [consensus] remove the leader sender check for now. Will add later

* [consensus] refactor fbftlog to get rid of unsafe mapset module. Replace with map

* [consensus] move the isQuorumAchived logic back. We surely need to check it before add to FBFTlog

* [consensus] remove the redundant block nil check

* [test] fix the consensus test

* [consensus] rebase main and fix stuff. Removed isSendByLeader

* [consensus] added logic to spin up sync when received message is greater than consensus block number

* [consensus] more changes in consensus. Remove some spin sync logic.

* fix error in main

* [consensus] change the hash algorithm of the FBFTLog to get rid of rlp error

* [consensus] use seperate mutex in FBFT message

* [consensus] change fbft log id to a shorter form. Added unit test case
pull/3392/head
Jacky Wang 4 years ago committed by GitHub
parent 6407afc747
commit 7801a1d678
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 30
      api/service/syncing/syncing.go
  3. 19
      consensus/checks.go
  4. 4
      consensus/consensus.go
  5. 1
      consensus/consensus_test.go
  6. 242
      consensus/consensus_v2.go
  7. 246
      consensus/fbft_log.go
  8. 46
      consensus/fbft_log_test.go
  9. 53
      consensus/validator.go
  10. 2
      node/node.go
  11. 17
      node/node_handler.go

1
.gitignore vendored

@ -83,3 +83,4 @@ db-127.0.0.1-*
# testdata # testdata
.testdata .testdata
coverage.txt

@ -927,7 +927,7 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, isBeac
utils.Logger().Info(). utils.Logger().Info().
Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight) isBeacon, bc.ShardID(), otherHeight, currentHeight)
return break
} }
utils.Logger().Info(). utils.Logger().Info().
Msgf("[SYNC] Node is OUT OF SYNC (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", Msgf("[SYNC] Node is OUT OF SYNC (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
@ -943,15 +943,39 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, isBeac
utils.Logger().Error().Err(err). utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight) isBeacon, bc.ShardID(), otherHeight, currentHeight)
ss.purgeOldBlocksFromCache()
break
} }
ss.purgeOldBlocksFromCache() ss.purgeOldBlocksFromCache()
if consensus != nil { }
consensus.SetMode(consensus.UpdateConsensusInformation()) if consensus != nil {
if err := ss.addConsensusLastMile(bc, consensus); err != nil {
utils.Logger().Error().Err(err).Msg("[SYNC] Add consensus last mile")
} }
consensus.SetMode(consensus.UpdateConsensusInformation())
} }
ss.purgeAllBlocksFromCache() ss.purgeAllBlocksFromCache()
} }
func (ss *StateSync) addConsensusLastMile(bc *core.BlockChain, consensus *consensus.Consensus) error {
curNumber := bc.CurrentBlock().NumberU64()
blockIter, err := consensus.GetLastMileBlockIter(curNumber + 1)
if err != nil {
return err
}
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
}
consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64() + 1)
return nil
}
// GetSyncingPort returns the syncing port. // GetSyncingPort returns the syncing port.
func GetSyncingPort(nodePort string) string { func GetSyncingPort(nodePort string) string {
if port, err := strconv.Atoi(nodePort); err == nil { if port, err := strconv.Atoi(nodePort); err == nil {

@ -10,7 +10,6 @@ import (
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/chain"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -136,24 +135,6 @@ func (consensus *Consensus) onPreparedSanityChecks(
Msg("[OnPrepared] BlockHash not match") Msg("[OnPrepared] BlockHash not match")
return false return false
} }
if consensus.current.Mode() == Normal {
err := chain.Engine.VerifyHeader(consensus.ChainReader, blockObj.Header(), true)
if err != nil {
consensus.getLogger().Error().
Err(err).
Str("inChain", consensus.ChainReader.CurrentHeader().Number().String()).
Str("MsgBlockNum", blockObj.Header().Number().String()).
Msg("[OnPrepared] Block header is not verified successfully")
return false
}
if consensus.BlockVerifier == nil {
// do nothing
} else if err := consensus.BlockVerifier(blockObj); err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed")
return false
}
}
return true return true
} }

@ -79,7 +79,7 @@ type Consensus struct {
ReadySignal chan struct{} ReadySignal chan struct{}
// The post-consensus processing func passed from Node object // The post-consensus processing func passed from Node object
// Called when consensus on a new block is done // Called when consensus on a new block is done
OnConsensusDone func(*types.Block) OnConsensusDone func(*types.Block) error
// The verifier func passed from Node object // The verifier func passed from Node object
BlockVerifier BlockVerifierFunc BlockVerifier BlockVerifierFunc
// verified block to state sync broadcast // verified block to state sync broadcast
@ -176,7 +176,7 @@ func New(
consensus.Decider = Decider consensus.Decider = Decider
consensus.host = host consensus.host = host
consensus.msgSender = NewMessageSender(host) consensus.msgSender = NewMessageSender(host)
consensus.BlockNumLowChan = make(chan struct{}) consensus.BlockNumLowChan = make(chan struct{}, 1)
// FBFT related // FBFT related
consensus.FBFTLog = NewFBFTLog() consensus.FBFTLog = NewFBFTLog()
consensus.phase = FBFTAnnounce consensus.phase = FBFTAnnounce

@ -37,7 +37,6 @@ func TestConsensusInitialization(t *testing.T) {
// FBFTLog // FBFTLog
assert.Equal(t, fbtLog, consensus.FBFTLog) assert.Equal(t, fbtLog, consensus.FBFTLog)
assert.Equal(t, maxLogSize, consensus.FBFTLog.maxLogSize)
assert.Equal(t, FBFTAnnounce, consensus.phase) assert.Equal(t, FBFTAnnounce, consensus.phase)

@ -7,12 +7,13 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/harmony-one/harmony/crypto/bls" "github.com/rs/zerolog"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls" vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -222,103 +223,6 @@ func (consensus *Consensus) BlockCommitSig(blockNum uint64) ([]byte, []byte, err
return aggSig, bitmap, nil return aggSig, bitmap, nil
} }
// try to catch up if fall behind
func (consensus *Consensus) tryCatchup() {
consensus.getLogger().Info().Msg("[TryCatchup] commit new blocks")
currentBlockNum := consensus.blockNum
for {
msgs := consensus.FBFTLog.GetMessagesByTypeSeq(
msg_pb.MessageType_COMMITTED, consensus.blockNum,
)
if len(msgs) == 0 {
break
}
if len(msgs) > 1 {
consensus.getLogger().Error().
Int("numMsgs", len(msgs)).
Msg("[TryCatchup] DANGER!!! we should only get one committed message for a given blockNum")
}
var committedMsg *FBFTMessage
var block *types.Block
for i := range msgs {
tmpBlock := consensus.FBFTLog.GetBlockByHash(msgs[i].BlockHash)
if tmpBlock == nil {
blksRepr, msgsRepr, incomingMsg :=
consensus.FBFTLog.Blocks().String(),
consensus.FBFTLog.Messages().String(),
msgs[i].String()
consensus.getLogger().Debug().
Str("FBFT-log-blocks", blksRepr).
Str("FBFT-log-messages", msgsRepr).
Str("incoming-message", incomingMsg).
Uint64("blockNum", msgs[i].BlockNum).
Uint64("viewID", msgs[i].ViewID).
Str("blockHash", msgs[i].BlockHash.Hex()).
Msg("[TryCatchup] Failed finding a matching block for committed message")
continue
}
committedMsg = msgs[i]
block = tmpBlock
break
}
if block == nil || committedMsg == nil {
consensus.getLogger().Error().Msg("[TryCatchup] Failed finding a valid committed message.")
break
}
if block.ParentHash() != consensus.ChainReader.CurrentHeader().Hash() {
consensus.getLogger().Debug().Msg("[TryCatchup] parent block hash not match")
break
}
consensus.getLogger().Info().Msg("[TryCatchup] block found to commit")
preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeqHash(
msg_pb.MessageType_PREPARED, committedMsg.BlockNum, committedMsg.BlockHash,
)
msg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs)
if msg == nil {
break
}
consensus.getLogger().Info().Msg("[TryCatchup] prepared message found to commit")
atomic.AddUint64(&consensus.blockNum, 1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkey
consensus.getLogger().Info().Msg("[TryCatchup] Adding block to chain")
// Fill in the commit signatures
block.SetCurrentCommitSig(committedMsg.Payload)
consensus.OnConsensusDone(block)
consensus.ResetState()
select {
case consensus.VerifiedNewBlock <- block:
default:
consensus.getLogger().Info().
Str("blockHash", block.Hash().String()).
Msg("[TryCatchup] consensus verified block send to chan failed")
continue
}
break
}
if currentBlockNum < consensus.blockNum {
consensus.switchPhase("TryCatchup", FBFTAnnounce)
}
// catup up and skip from view change trap
if currentBlockNum < consensus.blockNum &&
consensus.IsViewChangingMode() {
consensus.current.SetMode(Normal)
consensus.consensusTimeout[timeoutViewChange].Stop()
}
// clean up old log
consensus.FBFTLog.DeleteBlocksLessThan(consensus.blockNum - 1)
consensus.FBFTLog.DeleteMessagesLessThan(consensus.blockNum - 1)
}
// Start waits for the next new block and run consensus // Start waits for the next new block and run consensus
func (consensus *Consensus) Start( func (consensus *Consensus) Start(
blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{}, blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{},
@ -372,7 +276,7 @@ func (consensus *Consensus) Start(
break break
} else { } else {
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!") consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!")
viewID := consensus.GetViewChangingID() viewID := consensus.GetCurBlockViewID()
consensus.startViewChange(viewID + 1) consensus.startViewChange(viewID + 1)
break break
} }
@ -501,6 +405,146 @@ func (consensus *Consensus) Start(
}() }()
} }
// LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache.
// All blocks returned are guaranteed to pass the verification.
type LastMileBlockIter struct {
blockCandidates []*types.Block
fbftLog FBFTLog
verify func(*types.Block) error
curIndex int
logger *zerolog.Logger
}
// GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart
func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64) (*LastMileBlockIter, error) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if consensus.BlockVerifier == nil {
return nil, errors.New("consensus haven't initialized yet")
}
blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart)
if err != nil {
return nil, err
}
return &LastMileBlockIter{
blockCandidates: blocks,
verify: consensus.BlockVerifier,
curIndex: 0,
logger: consensus.getLogger(),
}, nil
}
// Next iterate to the next last mile block
func (iter *LastMileBlockIter) Next() *types.Block {
if iter.curIndex >= len(iter.blockCandidates) {
return nil
}
block := iter.blockCandidates[iter.curIndex]
iter.curIndex++
if !iter.fbftLog.IsBlockVerified(block) {
if err := iter.verify(block); err != nil {
iter.logger.Debug().Err(err).Msg("block verification failed in consensus last mile block")
return nil
}
iter.fbftLog.MarkBlockVerified(block)
}
return block
}
func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Block, []*FBFTMessage, error) {
var (
blocks []*types.Block
msgs []*FBFTMessage
)
for blockNum := bnStart; ; blockNum++ {
blk, msg, err := consensus.FBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger())
if err != nil {
if err == errFBFTLogNotFound {
break
}
return nil, nil, err
}
blocks = append(blocks, blk)
msgs = append(msgs, msg)
}
return blocks, msgs, nil
}
// tryCatchup add the last mile block in PBFT log memory cache to blockchain.
func (consensus *Consensus) tryCatchup() error {
// TODO: change this to a more systematic symbol
if consensus.BlockVerifier == nil {
return errors.New("consensus haven't finished initialization")
}
initBN := consensus.blockNum
defer consensus.postCatchup(initBN)
blks, msgs, err := consensus.getLastMileBlocksAndMsg(initBN)
if err != nil {
return errors.Wrapf(err, "[TryCatchup] Failed to get last mile blocks: %v", err)
}
for i := range blks {
blk, msg := blks[i], msgs[i]
if blk == nil {
return nil
}
blk.SetCurrentCommitSig(msg.Payload)
if !consensus.FBFTLog.IsBlockVerified(blk) {
if err := consensus.BlockVerifier(blk); err != nil {
consensus.getLogger().Err(err).Msg("[TryCatchup] failed block verifier")
return err
}
consensus.FBFTLog.MarkBlockVerified(blk)
}
consensus.getLogger().Info().Msg("[TryCatchup] Adding block to chain")
if err := consensus.commitBlock(blk, msgs[i]); err != nil {
consensus.getLogger().Error().Err(err).Msg("[TryCatchup] Failed to add block to chain")
return err
}
select {
case consensus.VerifiedNewBlock <- blk:
default:
consensus.getLogger().Info().
Str("blockHash", blk.Hash().String()).
Msg("[TryCatchup] consensus verified block send to chan failed")
continue
}
}
return nil
}
func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if err := consensus.OnConsensusDone(blk); err != nil {
return err
}
atomic.AddUint64(&consensus.blockNum, 1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkey
consensus.ResetState()
return nil
}
func (consensus *Consensus) postCatchup(initBN uint64) {
if initBN < consensus.blockNum {
consensus.getLogger().Info().
Uint64("From", initBN).
Uint64("To", consensus.blockNum).
Msg("[TryCatchup] Caught up!")
consensus.switchPhase("TryCatchup", FBFTAnnounce)
}
// catch up and skip from view change trap
if initBN < consensus.blockNum && consensus.IsViewChangingMode() {
consensus.current.SetMode(Normal)
consensus.consensusTimeout[timeoutViewChange].Stop()
}
// clean up old log
consensus.FBFTLog.PruneCacheBeforeBlock(consensus.blockNum)
}
// GenerateVrfAndProof generates new VRF/Proof from hash of previous block // GenerateVrfAndProof generates new VRF/Proof from hash of previous block
func (consensus *Consensus) GenerateVrfAndProof(newBlock *types.Block, vrfBlockNumbers []uint64) []uint64 { func (consensus *Consensus) GenerateVrfAndProof(newBlock *types.Block, vrfBlockNumbers []uint64) []uint64 {
key, err := consensus.GetConsensusLeaderPrivateKey() key, err := consensus.GetConsensusLeaderPrivateKey()

@ -1,25 +1,21 @@
package consensus package consensus
import ( import (
"encoding/binary"
"fmt" "fmt"
"sync"
"github.com/harmony-one/harmony/crypto/bls"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls" bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/pkg/errors"
"github.com/rs/zerolog"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
bls_cosi "github.com/harmony-one/harmony/crypto/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls"
) )
// FBFTLog represents the log stored by a node during FBFT process
type FBFTLog struct {
blocks mapset.Set //store blocks received in FBFT
messages mapset.Set // store messages received in FBFT
maxLogSize uint32
}
// FBFTMessage is the record of pbft messages received by a node during FBFT process // FBFTMessage is the record of pbft messages received by a node during FBFT process
type FBFTMessage struct { type FBFTMessage struct {
MessageType msg_pb.MessageType MessageType msg_pb.MessageType
@ -59,106 +55,153 @@ func (m *FBFTMessage) String() string {
) )
} }
const (
idTypeBytes = 4
idHashBytes = common.HashLength
idSenderBytes = bls.PublicKeySizeInBytes
idBytes = idTypeBytes + idHashBytes + idSenderBytes
)
type (
// fbftMsgID is the id that uniquely defines a fbft message.
fbftMsgID [idBytes]byte
)
// id return the ID of the FBFT message which uniquely identifies a FBFT message.
// The ID is a concatenation of MsgType, BlockHash, and sender key
func (m *FBFTMessage) id() fbftMsgID {
var id fbftMsgID
binary.LittleEndian.PutUint32(id[:], uint32(m.MessageType))
copy(id[idTypeBytes:], m.BlockHash[:])
if m.SenderPubkey != nil {
copy(id[idTypeBytes+idHashBytes:], m.SenderPubkey.Bytes[:])
}
return id
}
// FBFTLog represents the log stored by a node during FBFT process
type FBFTLog struct {
blocks map[common.Hash]*types.Block // store blocks received in FBFT
verifiedBlocks map[common.Hash]struct{} // store block hashes for blocks that has already been verified
blockLock sync.RWMutex
messages map[fbftMsgID]*FBFTMessage // store messages received in FBFT
msgLock sync.RWMutex
}
// NewFBFTLog returns new instance of FBFTLog // NewFBFTLog returns new instance of FBFTLog
func NewFBFTLog() *FBFTLog { func NewFBFTLog() *FBFTLog {
blocks := mapset.NewSet() pbftLog := FBFTLog{
messages := mapset.NewSet() blocks: make(map[common.Hash]*types.Block),
logSize := maxLogSize messages: make(map[fbftMsgID]*FBFTMessage),
pbftLog := FBFTLog{blocks: blocks, messages: messages, maxLogSize: logSize} verifiedBlocks: make(map[common.Hash]struct{}),
}
return &pbftLog return &pbftLog
} }
// Blocks return the blocks stored in the log // AddBlock add a new block into the log
func (log *FBFTLog) Blocks() mapset.Set { func (log *FBFTLog) AddBlock(block *types.Block) {
return log.blocks log.blockLock.Lock()
defer log.blockLock.Unlock()
log.blocks[block.Hash()] = block
} }
// Messages return the messages stored in the log // MarkBlockVerified marks the block as verified
func (log *FBFTLog) Messages() mapset.Set { func (log *FBFTLog) MarkBlockVerified(block *types.Block) {
return log.messages log.blockLock.Lock()
defer log.blockLock.Unlock()
log.verifiedBlocks[block.Hash()] = struct{}{}
} }
// AddBlock add a new block into the log // IsBlockVerified checks whether the block is verified
func (log *FBFTLog) AddBlock(block *types.Block) { func (log *FBFTLog) IsBlockVerified(block *types.Block) bool {
log.blocks.Add(block) log.blockLock.RLock()
defer log.blockLock.RUnlock()
_, exist := log.verifiedBlocks[block.Hash()]
return exist
} }
// GetBlockByHash returns the block matches the given block hash // GetBlockByHash returns the block matches the given block hash
func (log *FBFTLog) GetBlockByHash(hash common.Hash) *types.Block { func (log *FBFTLog) GetBlockByHash(hash common.Hash) *types.Block {
var found *types.Block log.blockLock.RLock()
it := log.Blocks().Iterator() defer log.blockLock.RUnlock()
for block := range it.C {
if block.(*types.Block).Header().Hash() == hash { return log.blocks[hash]
found = block.(*types.Block)
it.Stop()
}
}
return found
} }
// GetBlocksByNumber returns the blocks match the given block number // GetBlocksByNumber returns the blocks match the given block number
func (log *FBFTLog) GetBlocksByNumber(number uint64) []*types.Block { func (log *FBFTLog) GetBlocksByNumber(number uint64) []*types.Block {
found := []*types.Block{} log.blockLock.RLock()
it := log.Blocks().Iterator() defer log.blockLock.RUnlock()
for block := range it.C {
if block.(*types.Block).NumberU64() == number { var blocks []*types.Block
found = append(found, block.(*types.Block)) for _, block := range log.blocks {
if block.NumberU64() == number {
blocks = append(blocks, block)
} }
} }
return found return blocks
} }
// DeleteBlocksLessThan deletes blocks less than given block number // DeleteBlocksLessThan deletes blocks less than given block number
func (log *FBFTLog) DeleteBlocksLessThan(number uint64) { func (log *FBFTLog) DeleteBlocksLessThan(number uint64) {
found := mapset.NewSet() log.blockLock.Lock()
it := log.Blocks().Iterator() defer log.blockLock.Unlock()
for block := range it.C {
if block.(*types.Block).NumberU64() < number { for h, block := range log.blocks {
found.Add(block) if block.NumberU64() < number {
delete(log.blocks, h)
delete(log.verifiedBlocks, h)
} }
} }
log.blocks = log.blocks.Difference(found)
} }
// DeleteBlockByNumber deletes block of specific number // DeleteBlockByNumber deletes block of specific number
func (log *FBFTLog) DeleteBlockByNumber(number uint64) { func (log *FBFTLog) DeleteBlockByNumber(number uint64) {
found := mapset.NewSet() log.blockLock.Lock()
it := log.Blocks().Iterator() defer log.blockLock.Unlock()
for block := range it.C {
if block.(*types.Block).NumberU64() == number { for h, block := range log.blocks {
found.Add(block) if block.NumberU64() == number {
delete(log.blocks, h)
delete(log.verifiedBlocks, h)
} }
} }
log.blocks = log.blocks.Difference(found)
} }
// DeleteMessagesLessThan deletes messages less than given block number // DeleteMessagesLessThan deletes messages less than given block number
func (log *FBFTLog) DeleteMessagesLessThan(number uint64) { func (log *FBFTLog) DeleteMessagesLessThan(number uint64) {
found := mapset.NewSet() log.msgLock.Lock()
it := log.Messages().Iterator() defer log.msgLock.Unlock()
for msg := range it.C {
if msg.(*FBFTMessage).BlockNum < number { for h, msg := range log.messages {
found.Add(msg) if msg.BlockNum < number {
delete(log.messages, h)
} }
} }
log.messages = log.messages.Difference(found)
} }
// AddMessage adds a pbft message into the log // AddMessage adds a pbft message into the log
func (log *FBFTLog) AddMessage(msg *FBFTMessage) { func (log *FBFTLog) AddMessage(msg *FBFTMessage) {
log.messages.Add(msg) log.msgLock.Lock()
defer log.msgLock.Unlock()
log.messages[msg.id()] = msg
} }
// GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash // GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash
func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage { func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
found := []*FBFTMessage{} log.msgLock.RLock()
it := log.Messages().Iterator() defer log.msgLock.RUnlock()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && var found []*FBFTMessage
msg.(*FBFTMessage).BlockNum == blockNum && for _, msg := range log.messages {
msg.(*FBFTMessage).ViewID == viewID && if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash {
msg.(*FBFTMessage).BlockHash == blockHash { found = append(found, msg)
found = append(found, msg.(*FBFTMessage))
} }
} }
return found return found
@ -166,12 +209,13 @@ func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNu
// GetMessagesByTypeSeq returns pbft messages with matching type, blockNum // GetMessagesByTypeSeq returns pbft messages with matching type, blockNum
func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage { func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage {
found := []*FBFTMessage{} log.msgLock.RLock()
it := log.Messages().Iterator() defer log.msgLock.RUnlock()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && var found []*FBFTMessage
msg.(*FBFTMessage).BlockNum == blockNum { for _, msg := range log.messages {
found = append(found, msg.(*FBFTMessage)) if msg.MessageType == typ && msg.BlockNum == blockNum {
found = append(found, msg)
} }
} }
return found return found
@ -179,13 +223,13 @@ func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64
// GetMessagesByTypeSeqHash returns pbft messages with matching type, blockNum // GetMessagesByTypeSeqHash returns pbft messages with matching type, blockNum
func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage { func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage {
found := []*FBFTMessage{} log.msgLock.RLock()
it := log.Messages().Iterator() defer log.msgLock.RUnlock()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && var found []*FBFTMessage
msg.(*FBFTMessage).BlockNum == blockNum && for _, msg := range log.messages {
msg.(*FBFTMessage).BlockHash == blockHash { if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash {
found = append(found, msg.(*FBFTMessage)) found = append(found, msg)
} }
} }
return found return found
@ -217,13 +261,15 @@ func (log *FBFTLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, bloc
// GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID // GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID
func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum uint64, viewID uint64) []*FBFTMessage { func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum uint64, viewID uint64) []*FBFTMessage {
found := []*FBFTMessage{} log.msgLock.RLock()
it := log.Messages().Iterator() defer log.msgLock.RUnlock()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType != typ || msg.(*FBFTMessage).BlockNum != blockNum || msg.(*FBFTMessage).ViewID != viewID { var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID {
continue continue
} }
found = append(found, msg.(*FBFTMessage)) found = append(found, msg)
} }
return found return found
} }
@ -267,3 +313,37 @@ func ParseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
return &pbftMsg, nil return &pbftMsg, nil
} }
var errFBFTLogNotFound = errors.New("FBFT log not found")
// GetCommittedBlockAndMsgsFromNumber get committed block and message starting from block number bn.
func (log *FBFTLog) GetCommittedBlockAndMsgsFromNumber(bn uint64, logger *zerolog.Logger) (*types.Block, *FBFTMessage, error) {
msgs := log.GetMessagesByTypeSeq(
msg_pb.MessageType_COMMITTED, bn,
)
if len(msgs) == 0 {
return nil, nil, errFBFTLogNotFound
}
if len(msgs) > 1 {
logger.Error().Int("numMsgs", len(msgs)).Err(errors.New("DANGER!!! multiple COMMITTED message in PBFT log observed"))
}
for i := range msgs {
block := log.GetBlockByHash(msgs[i].BlockHash)
if block == nil {
logger.Debug().
Uint64("blockNum", msgs[i].BlockNum).
Uint64("viewID", msgs[i].ViewID).
Str("blockHash", msgs[i].BlockHash.Hex()).
Err(errors.New("failed finding a matching block for committed message"))
continue
}
return block, msgs[i], nil
}
return nil, nil, errFBFTLogNotFound
}
// PruneCacheBeforeBlock prune all blocks before bn
func (log *FBFTLog) PruneCacheBeforeBlock(bn uint64) {
log.DeleteBlocksLessThan(bn - 1)
log.DeleteMessagesLessThan(bn - 1)
}

@ -1,11 +1,57 @@
package consensus package consensus
import ( import (
"bytes"
"encoding/binary"
"testing" "testing"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/crypto/bls"
) )
func TestFBFTLog_id(t *testing.T) {
tests := []FBFTMessage{
{
MessageType: msg_pb.MessageType_ANNOUNCE,
BlockHash: [32]byte{01, 02},
SenderPubkey: &bls.PublicKeyWrapper{
Bytes: bls.SerializedPublicKey{0x01, 0x02},
},
},
{
MessageType: msg_pb.MessageType_COMMIT,
BlockHash: [32]byte{02, 03},
},
}
for _, msg := range tests {
id := msg.id()
if uint32(msg.MessageType) != binary.LittleEndian.Uint32(id[:]) {
t.Errorf("message type not expected")
}
if !bytes.Equal(id[idTypeBytes:idTypeBytes+idHashBytes], msg.BlockHash[:]) {
t.Errorf("block hash not expected")
}
if msg.SenderPubkey == nil {
var emptyKey bls.SerializedPublicKey
if !bytes.Equal(id[idTypeBytes+idHashBytes:], emptyKey[:]) {
t.Errorf("sender key not expected when empty")
}
} else {
if !bytes.Equal(id[idTypeBytes+idHashBytes:], msg.SenderPubkey.Bytes[:]) {
t.Errorf("sender key not expected")
}
}
if idDup := msg.id(); idDup != id {
t.Errorf("id not replicable")
}
msg.MessageType = 100
if idDiff := msg.id(); idDiff == id {
t.Errorf("id not unique")
}
}
}
func TestGetMessagesByTypeSeqViewHash(t *testing.T) { func TestGetMessagesByTypeSeqViewHash(t *testing.T) {
pbftMsg := FBFTMessage{ pbftMsg := FBFTMessage{
MessageType: msg_pb.MessageType_ANNOUNCE, MessageType: msg_pb.MessageType_ANNOUNCE,

@ -146,6 +146,10 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Msg("Wrong BlockNum Received, ignoring!") Msg("Wrong BlockNum Received, ignoring!")
return return
} }
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Warn().Msgf("[OnPrepared] low consensus block number. Spin sync")
consensus.spinUpStateSync()
}
// check validity of prepared signature // check validity of prepared signature
blockHash := recvMsg.BlockHash blockHash := recvMsg.BlockHash
@ -154,13 +158,10 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!")
return return
} }
if !consensus.Decider.IsQuorumAchievedByMask(mask) { if !consensus.Decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn(). consensus.getLogger().Warn().Msgf("[OnPrepared] Quorum Not achieved.")
Msgf("[OnPrepared] Quorum Not achieved")
return return
} }
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) { if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
myBlockHash := common.Hash{} myBlockHash := common.Hash{}
myBlockHash.SetBytes(consensus.blockHash[:]) myBlockHash.SetBytes(consensus.blockHash[:])
@ -201,13 +202,20 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Msg("[OnPrepared] Prepared message and block added") Msg("[OnPrepared] Prepared message and block added")
// tryCatchup is also run in onCommitted(), so need to lock with commitMutex. // tryCatchup is also run in onCommitted(), so need to lock with commitMutex.
consensus.tryCatchup()
if consensus.current.Mode() != Normal { if consensus.current.Mode() != Normal {
// don't sign the block that is not verified // don't sign the block that is not verified
consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!") consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!")
return return
} }
if consensus.BlockVerifier == nil {
consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring")
return
}
if err := consensus.BlockVerifier(&blockObj); err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed")
return
}
consensus.FBFTLog.MarkBlockVerified(&blockObj)
if consensus.checkViewID(recvMsg) != nil { if consensus.checkViewID(recvMsg) != nil {
if consensus.current.Mode() == Normal { if consensus.current.Mode() == Normal {
@ -255,16 +263,18 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
if !consensus.isRightBlockNumCheck(recvMsg) { if !consensus.isRightBlockNumCheck(recvMsg) {
return return
} }
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync")
consensus.spinUpStateSync()
}
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed")
return return
} }
if !consensus.Decider.IsQuorumAchievedByMask(mask) { if !consensus.Decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn(). consensus.getLogger().Warn().Msgf("[OnCommitted] Quorum Not achieved.")
Msgf("[OnCommitted] Quorum Not achieved")
return return
} }
@ -295,22 +305,12 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.aggregatedCommitSig = aggSig consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask consensus.commitBitmap = mask
if recvMsg.BlockNum > consensus.blockNum && recvMsg.BlockNum-consensus.blockNum > consensusBlockNumBuffer { consensus.tryCatchup()
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC") consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC")
go func() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
case <-time.After(1 * time.Second):
}
}()
return return
} }
consensus.tryCatchup()
if consensus.IsViewChangingMode() { if consensus.IsViewChangingMode() {
consensus.getLogger().Info().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!") consensus.getLogger().Info().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!")
return return
@ -324,3 +324,14 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
} }
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
} }
func (consensus *Consensus) spinUpStateSync() {
select {
case consensus.BlockNumLowChan <- struct{}{}:
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
default:
}
}

@ -919,7 +919,7 @@ func New(
} }
node.pendingCXReceipts = map[string]*types.CXReceiptsProof{} node.pendingCXReceipts = map[string]*types.CXReceiptsProof{}
node.Consensus.VerifiedNewBlock = make(chan *types.Block) node.Consensus.VerifiedNewBlock = make(chan *types.Block, 1)
chain.Engine.SetBeaconchain(beaconChain) chain.Engine.SetBeaconchain(beaconChain)
// the sequence number is the next block number to be added in consensus protocol, which is // the sequence number is the next block number to be added in consensus protocol, which is
// always one more than current chain header block // always one more than current chain header block

@ -352,17 +352,9 @@ func (node *Node) numSignaturesIncludedInBlock(block *types.Block) uint32 {
// 1. add the new block to blockchain // 1. add the new block to blockchain
// 2. [leader] send new block to the client // 2. [leader] send new block to the client
// 3. [leader] send cross shard tx receipts to destination shard // 3. [leader] send cross shard tx receipts to destination shard
func (node *Node) PostConsensusProcessing( func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
newBlock *types.Block,
) {
if _, err := node.Blockchain().InsertChain([]*types.Block{newBlock}, true); err != nil { if _, err := node.Blockchain().InsertChain([]*types.Block{newBlock}, true); err != nil {
utils.Logger().Error(). return err
Err(err).
Uint64("blockNum", newBlock.NumberU64()).
Str("parentHash", newBlock.Header().ParentHash().Hex()).
Str("hash", newBlock.Header().Hash().Hex()).
Msg("Error Adding new block to blockchain")
return
} }
utils.Logger().Info(). utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()). Uint64("blockNum", newBlock.NumberU64()).
@ -412,11 +404,11 @@ func (node *Node) PostConsensusProcessing(
for _, addr := range node.GetAddresses(newBlock.Epoch()) { for _, addr := range node.GetAddresses(newBlock.Epoch()) {
wrapper, err := node.Beaconchain().ReadValidatorInformation(addr) wrapper, err := node.Beaconchain().ReadValidatorInformation(addr)
if err != nil { if err != nil {
return return err
} }
snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr) snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr)
if err != nil { if err != nil {
return return err
} }
computed := availability.ComputeCurrentSigning( computed := availability.ComputeCurrentSigning(
snapshot.Validator, wrapper, snapshot.Validator, wrapper,
@ -435,6 +427,7 @@ func (node *Node) PostConsensusProcessing(
} }
} }
} }
return nil
} }
// BootstrapConsensus is the a goroutine to check number of peers and start the consensus // BootstrapConsensus is the a goroutine to check number of peers and start the consensus

Loading…
Cancel
Save