diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index a28ce68fd..425df7fdb 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -2,7 +2,6 @@ package consensus import ( "math/big" - "sync" "sync/atomic" "time" @@ -30,11 +29,6 @@ import ( "github.com/rs/zerolog" ) -var ( - logOnce sync.Once - logger zerolog.Logger -) - // WaitForNewRandomness listens to the RndChannel to receive new VDF randomness. func (consensus *Consensus) WaitForNewRandomness() { go func() { @@ -621,13 +615,11 @@ func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uin // getLogger returns logger for consensus contexts added func (consensus *Consensus) getLogger() *zerolog.Logger { - logOnce.Do(func() { - logger = utils.Logger().With(). - Uint64("myBlock", consensus.blockNum). - Uint64("myViewID", consensus.GetCurBlockViewID()). - Str("phase", consensus.phase.String()). - Str("mode", consensus.current.Mode().String()). - Logger() - }) + logger := utils.Logger().With(). + Uint64("myBlock", consensus.blockNum). + Uint64("myViewID", consensus.GetCurBlockViewID()). + Str("phase", consensus.phase.String()). + Str("mode", consensus.current.Mode().String()). + Logger() return &logger } diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 06d6c8c94..a2ad31dcb 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/rs/zerolog" @@ -17,7 +18,6 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" "github.com/harmony-one/vdf/src/vdf_go" @@ -28,6 +28,7 @@ import ( var ( errSenderPubKeyNotLeader = errors.New("sender pubkey doesn't match leader") errVerifyMessageSignature = errors.New("verify message signature failed") + errParsingFBFTMessage = errors.New("failed parsing FBFT message") ) // timeout constant @@ -56,57 +57,63 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb return nil } - intendedForValidator, intendedForLeader := - !consensus.IsLeader(), - consensus.IsLeader() - - switch t := msg.Type; true { - // Handle validator intended messages first - case t == msg_pb.MessageType_ANNOUNCE && intendedForValidator: + // Do easier check before signature check + if msg.Type == msg_pb.MessageType_ANNOUNCE || msg.Type == msg_pb.MessageType_PREPARED || msg.Type == msg_pb.MessageType_COMMITTED { + // Only validator needs to check whether the message is from the correct leader if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) && consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() { return errSenderPubKeyNotLeader } + } + + if msg.Type != msg_pb.MessageType_PREPARE && msg.Type != msg_pb.MessageType_COMMIT { + // Leader doesn't need to check validator's message signature since the consensus signature will be checked if !consensus.senderKeySanityChecks(msg, senderKey) { return errVerifyMessageSignature } + } + + // Parse FBFT message + var fbftMsg *FBFTMessage + var err error + switch t := msg.Type; true { + case t == msg_pb.MessageType_VIEWCHANGE: + fbftMsg, err = ParseViewChangeMessage(msg) + case t == msg_pb.MessageType_NEWVIEW: + members := consensus.Decider.Participants() + fbftMsg, err = ParseNewViewMessage(msg, members) + default: + fbftMsg, err = consensus.ParseFBFTMessage(msg) + } + if err != nil || fbftMsg == nil { + return errors.Wrapf(err, "unable to parse consensus msg with type: %s", msg.Type) + } + + intendedForValidator, intendedForLeader := + !consensus.IsLeader(), + consensus.IsLeader() + + // Route message to handler + switch t := msg.Type; true { + // Handle validator intended messages first + case t == msg_pb.MessageType_ANNOUNCE && intendedForValidator: consensus.onAnnounce(msg) case t == msg_pb.MessageType_PREPARED && intendedForValidator: - if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) && - consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() { - return errSenderPubKeyNotLeader - } - if !consensus.senderKeySanityChecks(msg, senderKey) { - return errVerifyMessageSignature - } - consensus.onPrepared(msg) + consensus.onPrepared(fbftMsg) case t == msg_pb.MessageType_COMMITTED && intendedForValidator: - if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) && - consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() { - return errSenderPubKeyNotLeader - } - if !consensus.senderKeySanityChecks(msg, senderKey) { - return errVerifyMessageSignature - } - consensus.onCommitted(msg) + consensus.onCommitted(fbftMsg) // Handle leader intended messages now case t == msg_pb.MessageType_PREPARE && intendedForLeader: - consensus.onPrepare(msg) + consensus.onPrepare(fbftMsg) case t == msg_pb.MessageType_COMMIT && intendedForLeader: - consensus.onCommit(msg) + consensus.onCommit(fbftMsg) - // Handle view change messages + // Handle view change messages case t == msg_pb.MessageType_VIEWCHANGE: - if !consensus.senderKeySanityChecks(msg, senderKey) { - return errVerifyMessageSignature - } - consensus.onViewChange(msg) + consensus.onViewChange(fbftMsg) case t == msg_pb.MessageType_NEWVIEW: - if !consensus.senderKeySanityChecks(msg, senderKey) { - return errVerifyMessageSignature - } - consensus.onNewView(msg) + consensus.onNewView(fbftMsg) } return nil @@ -136,7 +143,7 @@ func (consensus *Consensus) finalCommit() { network.Bytes, network.FBFTMsg commitSigAndBitmap := FBFTMsg.Payload - consensus.FBFTLog.AddMessage(FBFTMsg) + consensus.FBFTLog.AddVerifiedMessage(FBFTMsg) // find correct block content curBlockHash := consensus.blockHash block := consensus.FBFTLog.GetBlockByHash(curBlockHash) @@ -540,7 +547,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { network.Bytes, network.FBFTMsg bareMinimumCommit := FBFTMsg.Payload - consensus.FBFTLog.AddMessage(FBFTMsg) + consensus.FBFTLog.AddVerifiedMessage(FBFTMsg) blk.SetCurrentCommitSig(bareMinimumCommit) diff --git a/consensus/fbft_log.go b/consensus/fbft_log.go index d578030c4..c028c88ef 100644 --- a/consensus/fbft_log.go +++ b/consensus/fbft_log.go @@ -33,6 +33,7 @@ type FBFTMessage struct { M2Bitmap *bls_cosi.Mask M3AggSig *bls_core.Sign M3Bitmap *bls_cosi.Mask + Verified bool } // String .. @@ -202,14 +203,40 @@ func (log *FBFTLog) DeleteMessagesLessThan(number uint64) { } } -// AddMessage adds a pbft message into the log -func (log *FBFTLog) AddMessage(msg *FBFTMessage) { +// AddVerifiedMessage adds a signature verified pbft message into the log +func (log *FBFTLog) AddVerifiedMessage(msg *FBFTMessage) { log.msgLock.Lock() defer log.msgLock.Unlock() + msg.Verified = true + + log.messages[msg.id()] = msg +} + +// AddNotVerifiedMessage adds a not signature verified pbft message into the log +func (log *FBFTLog) AddNotVerifiedMessage(msg *FBFTMessage) { + log.msgLock.Lock() + defer log.msgLock.Unlock() + + msg.Verified = false + log.messages[msg.id()] = msg } +// GetNotVerifiedCommittedMessages returns not verified committed pbft messages with matching blockNum, viewID and blockHash +func (log *FBFTLog) GetNotVerifiedCommittedMessages(blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage { + log.msgLock.RLock() + defer log.msgLock.RUnlock() + + var found []*FBFTMessage + for _, msg := range log.messages { + if msg.MessageType == msg_pb.MessageType_COMMITTED && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && !msg.Verified { + found = append(found, msg) + } + } + return found +} + // GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage { log.msgLock.RLock() @@ -217,7 +244,7 @@ func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNu var found []*FBFTMessage for _, msg := range log.messages { - if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash { + if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && msg.Verified { found = append(found, msg) } } @@ -231,7 +258,7 @@ func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64 var found []*FBFTMessage for _, msg := range log.messages { - if msg.MessageType == typ && msg.BlockNum == blockNum { + if msg.MessageType == typ && msg.BlockNum == blockNum && msg.Verified { found = append(found, msg) } } @@ -245,7 +272,7 @@ func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum ui var found []*FBFTMessage for _, msg := range log.messages { - if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash { + if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash && msg.Verified { found = append(found, msg) } } @@ -283,7 +310,7 @@ func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum ui var found []*FBFTMessage for _, msg := range log.messages { - if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID { + if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID && msg.Verified { continue } found = append(found, msg) diff --git a/consensus/fbft_log_test.go b/consensus/fbft_log_test.go index 28b23572f..420effff4 100644 --- a/consensus/fbft_log_test.go +++ b/consensus/fbft_log_test.go @@ -66,7 +66,7 @@ func TestGetMessagesByTypeSeqViewHash(t *testing.T) { BlockHash: [32]byte{01, 02}, } log := NewFBFTLog() - log.AddMessage(&pbftMsg) + log.AddVerifiedMessage(&pbftMsg) found := log.GetMessagesByTypeSeqViewHash( msg_pb.MessageType_ANNOUNCE, 2, 3, [32]byte{01, 02}, @@ -91,7 +91,7 @@ func TestHasMatchingAnnounce(t *testing.T) { BlockHash: [32]byte{01, 02}, } log := NewFBFTLog() - log.AddMessage(&pbftMsg) + log.AddVerifiedMessage(&pbftMsg) found := log.HasMatchingViewAnnounce(2, 3, [32]byte{01, 02}) if !found { t.Error("found should be true") diff --git a/consensus/leader.go b/consensus/leader.go index ea331f8b1..3e8646131 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -48,7 +48,7 @@ func (consensus *Consensus) announce(block *types.Block) { } msgToSend, FPBTMsg := networkMessage.Bytes, networkMessage.FBFTMsg - consensus.FBFTLog.AddMessage(FPBTMsg) + consensus.FBFTLog.AddVerifiedMessage(FPBTMsg) consensus.getLogger().Debug(). Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()). Uint64("MsgViewID", FPBTMsg.ViewID). @@ -96,13 +96,7 @@ func (consensus *Consensus) announce(block *types.Block) { consensus.switchPhase("Announce", FBFTPrepare) } -func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { - recvMsg, err := consensus.ParseFBFTMessage(msg) - if err != nil { - consensus.getLogger().Error().Err(err).Msg("[OnPrepare] Unparseable validator message") - return - } - +func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { // TODO(audit): make FBFT lookup using map instead of looping through all items. if !consensus.FBFTLog.HasMatchingViewAnnounce( consensus.blockNum, consensus.GetCurBlockViewID(), recvMsg.BlockHash, @@ -147,7 +141,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { // Check BLS signature for the multi-sig prepareSig := recvMsg.Payload var sign bls_core.Sign - err = sign.Deserialize(prepareSig) + err := sign.Deserialize(prepareSig) if err != nil { consensus.getLogger().Error().Err(err). Msg("[OnPrepare] Failed to deserialize bls signature") @@ -198,13 +192,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { //// Read - End } -func (consensus *Consensus) onCommit(msg *msg_pb.Message) { - recvMsg, err := consensus.ParseFBFTMessage(msg) - if err != nil { - consensus.getLogger().Debug().Err(err).Msg("[OnCommit] Parse pbft message failed") - return - } - +func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { consensus.mutex.Lock() defer consensus.mutex.Unlock() //// Read - Start diff --git a/consensus/threshold.go b/consensus/threshold.go index 503185f3b..e1867a757 100644 --- a/consensus/threshold.go +++ b/consensus/threshold.go @@ -36,7 +36,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { networkMessage.OptionalAggregateSignature consensus.aggregatedPrepareSig = aggSig - consensus.FBFTLog.AddMessage(FBFTMsg) + consensus.FBFTLog.AddVerifiedMessage(FBFTMsg) // Leader add commit phase signature var blockObj types.Block if err := rlp.DecodeBytes(consensus.block, &blockObj); err != nil { diff --git a/consensus/validator.go b/consensus/validator.go index 3786b61f0..2eb501d25 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -35,7 +35,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgBlockNum", recvMsg.BlockNum). Msg("[OnAnnounce] Announce message Added") - consensus.FBFTLog.AddMessage(recvMsg) + consensus.FBFTLog.AddVerifiedMessage(recvMsg) consensus.mutex.Lock() defer consensus.mutex.Unlock() consensus.blockHash = recvMsg.BlockHash @@ -97,12 +97,7 @@ func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) { // if onPrepared accepts the prepared message from the leader, then // it will send a COMMIT message for the leader to receive on the network. -func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { - recvMsg, err := consensus.ParseFBFTMessage(msg) - if err != nil { - consensus.getLogger().Info().Err(err).Msg("[OnPrepared] Unparseable validator message") - return - } +func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { consensus.mutex.Lock() defer consensus.mutex.Unlock() @@ -167,7 +162,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { copy(blockPayload[:], recvMsg.Block[:]) consensus.block = blockPayload recvMsg.Block = []byte{} // save memory space - consensus.FBFTLog.AddMessage(recvMsg) + consensus.FBFTLog.AddVerifiedMessage(recvMsg) consensus.getLogger().Debug(). Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgBlockNum", recvMsg.BlockNum). @@ -214,22 +209,30 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { copy(consensus.blockHash[:], blockHash[:]) // tryCatchup is also run in onCommitted(), so need to lock with commitMutex. - if consensus.current.Mode() != Normal { + if consensus.current.Mode() == Normal { + consensus.sendCommitMessages(&blockObj) + consensus.switchPhase("onPrepared", FBFTCommit) + } else { // don't sign the block that is not verified consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!") - return } - consensus.sendCommitMessages(&blockObj) - consensus.switchPhase("onPrepared", FBFTCommit) + go func() { + // Try process future committed messages and process them in case of receiving committed before prepared + curBlockNum := consensus.blockNum + for _, committedMsg := range consensus.FBFTLog.GetNotVerifiedCommittedMessages(blockObj.NumberU64(), blockObj.Header().ViewID().Uint64(), blockObj.Hash()) { + if committedMsg != nil { + consensus.onCommitted(committedMsg) + } + if curBlockNum < consensus.blockNum { + consensus.getLogger().Info().Msg("[OnPrepared] Successfully caught up with committed message") + break + } + } + }() } -func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { - recvMsg, err := consensus.ParseFBFTMessage(msg) - if err != nil { - consensus.getLogger().Warn().Msg("[OnCommitted] unable to parse msg") - return - } +func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { consensus.mutex.Lock() defer consensus.mutex.Unlock() @@ -256,6 +259,9 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.spinUpStateSync() } + // Optimistically add committedMessage in case of receiving committed before prepared + consensus.FBFTLog.AddNotVerifiedMessage(recvMsg) + aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) if err != nil { consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") @@ -285,8 +291,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { return } - consensus.FBFTLog.AddMessage(recvMsg) - + consensus.FBFTLog.AddVerifiedMessage(recvMsg) consensus.aggregatedCommitSig = aggSig consensus.commitBitmap = mask diff --git a/consensus/view_change.go b/consensus/view_change.go index c932c81cb..10f64d72b 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -331,12 +331,7 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri } // onViewChange is called when the view change message is received. -func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { - recvMsg, err := ParseViewChangeMessage(msg) - if err != nil { - consensus.getLogger().Warn().Err(err).Msg("[onViewChange] Unable To Parse Viewchange Message") - return - } +func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { consensus.getLogger().Info(). Uint64("viewID", recvMsg.ViewID). Uint64("blockNum", recvMsg.BlockNum). @@ -431,13 +426,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { // prepared message from the payload and commit it to the block // Or the validator will enter announce phase to wait for the new block proposed // from the new leader -func (consensus *Consensus) onNewView(msg *msg_pb.Message) { - members := consensus.Decider.Participants() - recvMsg, err := ParseNewViewMessage(msg, members) - if err != nil { - consensus.getLogger().Warn().Err(err).Msg("[onNewView] Unable to Parse NewView Message") - return - } +func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { consensus.getLogger().Info(). Uint64("viewID", recvMsg.ViewID). Uint64("blockNum", recvMsg.BlockNum). @@ -510,7 +499,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) { copy(preparedMsg.Payload[:], recvMsg.Payload[32:]) preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{senderKey} - consensus.FBFTLog.AddMessage(&preparedMsg) + consensus.FBFTLog.AddVerifiedMessage(&preparedMsg) if preparedBlock != nil { consensus.FBFTLog.AddBlock(preparedBlock) diff --git a/consensus/view_change_construct.go b/consensus/view_change_construct.go index 4beb22306..446a039ca 100644 --- a/consensus/view_change_construct.go +++ b/consensus/view_change_construct.go @@ -335,7 +335,7 @@ func (vc *viewChange) ProcessViewChangeMsg( preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{recvMsg.LeaderPubkey} vc.getLogger().Info().Msg("[ProcessViewChangeMsg] New Leader Prepared Message Added") - fbftlog.AddMessage(&preparedMsg) + fbftlog.AddVerifiedMessage(&preparedMsg) fbftlog.AddBlock(preparedBlock) } return nil diff --git a/node/node_explorer.go b/node/node_explorer.go index 0b467e941..d90556a11 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -60,7 +60,7 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag utils.Logger().Info(). Uint64("msgBlock", recvMsg.BlockNum). Msg("[Explorer] Haven't received the block before the committed msg") - node.Consensus.FBFTLog.AddMessage(recvMsg) + node.Consensus.FBFTLog.AddVerifiedMessage(recvMsg) return errBlockBeforeCommit }