Add out of order message processing logic (#3468)

pull/3470/head
Rongjian Lan 4 years ago committed by GitHub
parent e5356675fc
commit aa643ca856
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      consensus/consensus_service.go
  2. 81
      consensus/consensus_v2.go
  3. 39
      consensus/fbft_log.go
  4. 4
      consensus/fbft_log_test.go
  5. 20
      consensus/leader.go
  6. 2
      consensus/threshold.go
  7. 45
      consensus/validator.go
  8. 17
      consensus/view_change.go
  9. 2
      consensus/view_change_construct.go
  10. 2
      node/node_explorer.go

@ -2,7 +2,6 @@ package consensus
import (
"math/big"
"sync"
"sync/atomic"
"time"
@ -30,11 +29,6 @@ import (
"github.com/rs/zerolog"
)
var (
logOnce sync.Once
logger zerolog.Logger
)
// WaitForNewRandomness listens to the RndChannel to receive new VDF randomness.
func (consensus *Consensus) WaitForNewRandomness() {
go func() {
@ -621,13 +615,11 @@ func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uin
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logOnce.Do(func() {
logger = utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.GetCurBlockViewID()).
Str("phase", consensus.phase.String()).
Str("mode", consensus.current.Mode().String()).
Logger()
})
logger := utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.GetCurBlockViewID()).
Str("phase", consensus.phase.String()).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}

@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/rs/zerolog"
@ -17,7 +18,6 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/vdf/src/vdf_go"
@ -28,6 +28,7 @@ import (
var (
errSenderPubKeyNotLeader = errors.New("sender pubkey doesn't match leader")
errVerifyMessageSignature = errors.New("verify message signature failed")
errParsingFBFTMessage = errors.New("failed parsing FBFT message")
)
// timeout constant
@ -56,57 +57,63 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb
return nil
}
intendedForValidator, intendedForLeader :=
!consensus.IsLeader(),
consensus.IsLeader()
switch t := msg.Type; true {
// Handle validator intended messages first
case t == msg_pb.MessageType_ANNOUNCE && intendedForValidator:
// Do easier check before signature check
if msg.Type == msg_pb.MessageType_ANNOUNCE || msg.Type == msg_pb.MessageType_PREPARED || msg.Type == msg_pb.MessageType_COMMITTED {
// Only validator needs to check whether the message is from the correct leader
if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) &&
consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() {
return errSenderPubKeyNotLeader
}
}
if msg.Type != msg_pb.MessageType_PREPARE && msg.Type != msg_pb.MessageType_COMMIT {
// Leader doesn't need to check validator's message signature since the consensus signature will be checked
if !consensus.senderKeySanityChecks(msg, senderKey) {
return errVerifyMessageSignature
}
}
// Parse FBFT message
var fbftMsg *FBFTMessage
var err error
switch t := msg.Type; true {
case t == msg_pb.MessageType_VIEWCHANGE:
fbftMsg, err = ParseViewChangeMessage(msg)
case t == msg_pb.MessageType_NEWVIEW:
members := consensus.Decider.Participants()
fbftMsg, err = ParseNewViewMessage(msg, members)
default:
fbftMsg, err = consensus.ParseFBFTMessage(msg)
}
if err != nil || fbftMsg == nil {
return errors.Wrapf(err, "unable to parse consensus msg with type: %s", msg.Type)
}
intendedForValidator, intendedForLeader :=
!consensus.IsLeader(),
consensus.IsLeader()
// Route message to handler
switch t := msg.Type; true {
// Handle validator intended messages first
case t == msg_pb.MessageType_ANNOUNCE && intendedForValidator:
consensus.onAnnounce(msg)
case t == msg_pb.MessageType_PREPARED && intendedForValidator:
if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) &&
consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() {
return errSenderPubKeyNotLeader
}
if !consensus.senderKeySanityChecks(msg, senderKey) {
return errVerifyMessageSignature
}
consensus.onPrepared(msg)
consensus.onPrepared(fbftMsg)
case t == msg_pb.MessageType_COMMITTED && intendedForValidator:
if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) &&
consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() {
return errSenderPubKeyNotLeader
}
if !consensus.senderKeySanityChecks(msg, senderKey) {
return errVerifyMessageSignature
}
consensus.onCommitted(msg)
consensus.onCommitted(fbftMsg)
// Handle leader intended messages now
case t == msg_pb.MessageType_PREPARE && intendedForLeader:
consensus.onPrepare(msg)
consensus.onPrepare(fbftMsg)
case t == msg_pb.MessageType_COMMIT && intendedForLeader:
consensus.onCommit(msg)
consensus.onCommit(fbftMsg)
// Handle view change messages
// Handle view change messages
case t == msg_pb.MessageType_VIEWCHANGE:
if !consensus.senderKeySanityChecks(msg, senderKey) {
return errVerifyMessageSignature
}
consensus.onViewChange(msg)
consensus.onViewChange(fbftMsg)
case t == msg_pb.MessageType_NEWVIEW:
if !consensus.senderKeySanityChecks(msg, senderKey) {
return errVerifyMessageSignature
}
consensus.onNewView(msg)
consensus.onNewView(fbftMsg)
}
return nil
@ -136,7 +143,7 @@ func (consensus *Consensus) finalCommit() {
network.Bytes,
network.FBFTMsg
commitSigAndBitmap := FBFTMsg.Payload
consensus.FBFTLog.AddMessage(FBFTMsg)
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
// find correct block content
curBlockHash := consensus.blockHash
block := consensus.FBFTLog.GetBlockByHash(curBlockHash)
@ -540,7 +547,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
network.Bytes,
network.FBFTMsg
bareMinimumCommit := FBFTMsg.Payload
consensus.FBFTLog.AddMessage(FBFTMsg)
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
blk.SetCurrentCommitSig(bareMinimumCommit)

@ -33,6 +33,7 @@ type FBFTMessage struct {
M2Bitmap *bls_cosi.Mask
M3AggSig *bls_core.Sign
M3Bitmap *bls_cosi.Mask
Verified bool
}
// String ..
@ -202,14 +203,40 @@ func (log *FBFTLog) DeleteMessagesLessThan(number uint64) {
}
}
// AddMessage adds a pbft message into the log
func (log *FBFTLog) AddMessage(msg *FBFTMessage) {
// AddVerifiedMessage adds a signature verified pbft message into the log
func (log *FBFTLog) AddVerifiedMessage(msg *FBFTMessage) {
log.msgLock.Lock()
defer log.msgLock.Unlock()
msg.Verified = true
log.messages[msg.id()] = msg
}
// AddNotVerifiedMessage adds a not signature verified pbft message into the log
func (log *FBFTLog) AddNotVerifiedMessage(msg *FBFTMessage) {
log.msgLock.Lock()
defer log.msgLock.Unlock()
msg.Verified = false
log.messages[msg.id()] = msg
}
// GetNotVerifiedCommittedMessages returns not verified committed pbft messages with matching blockNum, viewID and blockHash
func (log *FBFTLog) GetNotVerifiedCommittedMessages(blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
log.msgLock.RLock()
defer log.msgLock.RUnlock()
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == msg_pb.MessageType_COMMITTED && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && !msg.Verified {
found = append(found, msg)
}
}
return found
}
// GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash
func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
log.msgLock.RLock()
@ -217,7 +244,7 @@ func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNu
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && msg.Verified {
found = append(found, msg)
}
}
@ -231,7 +258,7 @@ func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.Verified {
found = append(found, msg)
}
}
@ -245,7 +272,7 @@ func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum ui
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash && msg.Verified {
found = append(found, msg)
}
}
@ -283,7 +310,7 @@ func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum ui
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID {
if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID && msg.Verified {
continue
}
found = append(found, msg)

@ -66,7 +66,7 @@ func TestGetMessagesByTypeSeqViewHash(t *testing.T) {
BlockHash: [32]byte{01, 02},
}
log := NewFBFTLog()
log.AddMessage(&pbftMsg)
log.AddVerifiedMessage(&pbftMsg)
found := log.GetMessagesByTypeSeqViewHash(
msg_pb.MessageType_ANNOUNCE, 2, 3, [32]byte{01, 02},
@ -91,7 +91,7 @@ func TestHasMatchingAnnounce(t *testing.T) {
BlockHash: [32]byte{01, 02},
}
log := NewFBFTLog()
log.AddMessage(&pbftMsg)
log.AddVerifiedMessage(&pbftMsg)
found := log.HasMatchingViewAnnounce(2, 3, [32]byte{01, 02})
if !found {
t.Error("found should be true")

@ -48,7 +48,7 @@ func (consensus *Consensus) announce(block *types.Block) {
}
msgToSend, FPBTMsg := networkMessage.Bytes, networkMessage.FBFTMsg
consensus.FBFTLog.AddMessage(FPBTMsg)
consensus.FBFTLog.AddVerifiedMessage(FPBTMsg)
consensus.getLogger().Debug().
Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()).
Uint64("MsgViewID", FPBTMsg.ViewID).
@ -96,13 +96,7 @@ func (consensus *Consensus) announce(block *types.Block) {
consensus.switchPhase("Announce", FBFTPrepare)
}
func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnPrepare] Unparseable validator message")
return
}
func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
// TODO(audit): make FBFT lookup using map instead of looping through all items.
if !consensus.FBFTLog.HasMatchingViewAnnounce(
consensus.blockNum, consensus.GetCurBlockViewID(), recvMsg.BlockHash,
@ -147,7 +141,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
// Check BLS signature for the multi-sig
prepareSig := recvMsg.Payload
var sign bls_core.Sign
err = sign.Deserialize(prepareSig)
err := sign.Deserialize(prepareSig)
if err != nil {
consensus.getLogger().Error().Err(err).
Msg("[OnPrepare] Failed to deserialize bls signature")
@ -198,13 +192,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
//// Read - End
}
func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil {
consensus.getLogger().Debug().Err(err).Msg("[OnCommit] Parse pbft message failed")
return
}
func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
//// Read - Start

@ -36,7 +36,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
networkMessage.OptionalAggregateSignature
consensus.aggregatedPrepareSig = aggSig
consensus.FBFTLog.AddMessage(FBFTMsg)
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
// Leader add commit phase signature
var blockObj types.Block
if err := rlp.DecodeBytes(consensus.block, &blockObj); err != nil {

@ -35,7 +35,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnAnnounce] Announce message Added")
consensus.FBFTLog.AddMessage(recvMsg)
consensus.FBFTLog.AddVerifiedMessage(recvMsg)
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.blockHash = recvMsg.BlockHash
@ -97,12 +97,7 @@ func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) {
// if onPrepared accepts the prepared message from the leader, then
// it will send a COMMIT message for the leader to receive on the network.
func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil {
consensus.getLogger().Info().Err(err).Msg("[OnPrepared] Unparseable validator message")
return
}
func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
@ -167,7 +162,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
copy(blockPayload[:], recvMsg.Block[:])
consensus.block = blockPayload
recvMsg.Block = []byte{} // save memory space
consensus.FBFTLog.AddMessage(recvMsg)
consensus.FBFTLog.AddVerifiedMessage(recvMsg)
consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
@ -214,22 +209,30 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
copy(consensus.blockHash[:], blockHash[:])
// tryCatchup is also run in onCommitted(), so need to lock with commitMutex.
if consensus.current.Mode() != Normal {
if consensus.current.Mode() == Normal {
consensus.sendCommitMessages(&blockObj)
consensus.switchPhase("onPrepared", FBFTCommit)
} else {
// don't sign the block that is not verified
consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!")
return
}
consensus.sendCommitMessages(&blockObj)
consensus.switchPhase("onPrepared", FBFTCommit)
go func() {
// Try process future committed messages and process them in case of receiving committed before prepared
curBlockNum := consensus.blockNum
for _, committedMsg := range consensus.FBFTLog.GetNotVerifiedCommittedMessages(blockObj.NumberU64(), blockObj.Header().ViewID().Uint64(), blockObj.Hash()) {
if committedMsg != nil {
consensus.onCommitted(committedMsg)
}
if curBlockNum < consensus.blockNum {
consensus.getLogger().Info().Msg("[OnPrepared] Successfully caught up with committed message")
break
}
}
}()
}
func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil {
consensus.getLogger().Warn().Msg("[OnCommitted] unable to parse msg")
return
}
func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
@ -256,6 +259,9 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.spinUpStateSync()
}
// Optimistically add committedMessage in case of receiving committed before prepared
consensus.FBFTLog.AddNotVerifiedMessage(recvMsg)
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed")
@ -285,8 +291,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return
}
consensus.FBFTLog.AddMessage(recvMsg)
consensus.FBFTLog.AddVerifiedMessage(recvMsg)
consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask

@ -331,12 +331,7 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri
}
// onViewChange is called when the view change message is received.
func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
recvMsg, err := ParseViewChangeMessage(msg)
if err != nil {
consensus.getLogger().Warn().Err(err).Msg("[onViewChange] Unable To Parse Viewchange Message")
return
}
func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
consensus.getLogger().Info().
Uint64("viewID", recvMsg.ViewID).
Uint64("blockNum", recvMsg.BlockNum).
@ -431,13 +426,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
// prepared message from the payload and commit it to the block
// Or the validator will enter announce phase to wait for the new block proposed
// from the new leader
func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
members := consensus.Decider.Participants()
recvMsg, err := ParseNewViewMessage(msg, members)
if err != nil {
consensus.getLogger().Warn().Err(err).Msg("[onNewView] Unable to Parse NewView Message")
return
}
func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) {
consensus.getLogger().Info().
Uint64("viewID", recvMsg.ViewID).
Uint64("blockNum", recvMsg.BlockNum).
@ -510,7 +499,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{senderKey}
consensus.FBFTLog.AddMessage(&preparedMsg)
consensus.FBFTLog.AddVerifiedMessage(&preparedMsg)
if preparedBlock != nil {
consensus.FBFTLog.AddBlock(preparedBlock)

@ -335,7 +335,7 @@ func (vc *viewChange) ProcessViewChangeMsg(
preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{recvMsg.LeaderPubkey}
vc.getLogger().Info().Msg("[ProcessViewChangeMsg] New Leader Prepared Message Added")
fbftlog.AddMessage(&preparedMsg)
fbftlog.AddVerifiedMessage(&preparedMsg)
fbftlog.AddBlock(preparedBlock)
}
return nil

@ -60,7 +60,7 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag
utils.Logger().Info().
Uint64("msgBlock", recvMsg.BlockNum).
Msg("[Explorer] Haven't received the block before the committed msg")
node.Consensus.FBFTLog.AddMessage(recvMsg)
node.Consensus.FBFTLog.AddVerifiedMessage(recvMsg)
return errBlockBeforeCommit
}

Loading…
Cancel
Save