@ -2,21 +2,15 @@ package consensus
import (
import (
"bytes"
"bytes"
"encoding/binary"
"encoding/hex"
"encoding/hex"
"time"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/types"
vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls"
vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/host"
@ -24,14 +18,13 @@ import (
"github.com/harmony-one/vdf/src/vdf_go"
"github.com/harmony-one/vdf/src/vdf_go"
)
)
// handleMessageU pdate will update the consensus state according to received message
// handlemessageu pdate will update the consensus state according to received message
func ( consensus * Consensus ) handleMessageUpdate ( payload [ ] byte ) {
func ( consensus * Consensus ) handleMessageUpdate ( payload [ ] byte ) {
if len ( payload ) == 0 {
if len ( payload ) == 0 {
return
return
}
}
msg := & msg_pb . Message { }
msg := & msg_pb . Message { }
err := protobuf . Unmarshal ( payload , msg )
if err := protobuf . Unmarshal ( payload , msg ) ; err != nil {
if err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "Failed to unmarshal message payload." )
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "Failed to unmarshal message payload." )
return
return
}
}
@ -41,14 +34,17 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
// which are message types specifically for a node acting as leader
// which are message types specifically for a node acting as leader
switch {
switch {
case ( consensus . current . Mode ( ) == ViewChanging ) &&
case ( consensus . current . Mode ( ) == ViewChanging ) &&
( msg . Type == msg_pb . MessageType_PREPARE || msg . Type == msg_pb . MessageType_COMMIT ) :
( msg . Type == msg_pb . MessageType_PREPARE ||
msg . Type == msg_pb . MessageType_COMMIT ) :
return
return
case consensus . current . Mode ( ) == Listening :
case consensus . current . Mode ( ) == Listening :
return
return
}
}
if msg . Type == msg_pb . MessageType_VIEWCHANGE || msg . Type == msg_pb . MessageType_NEWVIEW {
if msg . Type == msg_pb . MessageType_VIEWCHANGE ||
if msg . GetViewchange ( ) != nil && msg . GetViewchange ( ) . ShardId != consensus . ShardID {
msg . Type == msg_pb . MessageType_NEWVIEW {
if msg . GetViewchange ( ) != nil &&
msg . GetViewchange ( ) . ShardId != consensus . ShardID {
consensus . getLogger ( ) . Warn ( ) .
consensus . getLogger ( ) . Warn ( ) .
Uint32 ( "myShardId" , consensus . ShardID ) .
Uint32 ( "myShardId" , consensus . ShardID ) .
Uint32 ( "receivedShardId" , msg . GetViewchange ( ) . ShardId ) .
Uint32 ( "receivedShardId" , msg . GetViewchange ( ) . ShardId ) .
@ -56,7 +52,8 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
return
return
}
}
} else {
} else {
if msg . GetConsensus ( ) != nil && msg . GetConsensus ( ) . ShardId != consensus . ShardID {
if msg . GetConsensus ( ) != nil &&
msg . GetConsensus ( ) . ShardId != consensus . ShardID {
consensus . getLogger ( ) . Warn ( ) .
consensus . getLogger ( ) . Warn ( ) .
Uint32 ( "myShardId" , consensus . ShardID ) .
Uint32 ( "myShardId" , consensus . ShardID ) .
Uint32 ( "receivedShardId" , msg . GetConsensus ( ) . ShardId ) .
Uint32 ( "receivedShardId" , msg . GetConsensus ( ) . ShardId ) .
@ -73,745 +70,58 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
return
return
}
}
switch msg . Type {
intendedForValidator , intendedForLeader :=
case msg_pb . MessageType_ANNOUNCE :
! ( consensus . IsLeader ( ) && consensus . current . Mode ( ) == Normal ) ,
consensus . IsLeader ( )
switch t := msg . Type ; true {
// Handle validator intended messages first
case t == msg_pb . MessageType_ANNOUNCE &&
intendedForValidator &&
consensus . validatorSanityChecks ( msg ) :
consensus . onAnnounce ( msg )
consensus . onAnnounce ( msg )
case msg_pb . MessageType_PREPARE :
case t == msg_pb . MessageType_PREPARED &&
consensus . onPrepare ( msg )
intendedForValidator &&
case msg_pb . MessageType_PREPARED :
consensus . validatorSanityChecks ( msg ) :
consensus . onPrepared ( msg )
consensus . onPrepared ( msg )
case msg_pb . MessageType_COMMIT :
case t == msg_pb . MessageType_COMMITTED &&
consensus . onCommit ( msg )
intendedForValidator &&
case msg_pb . MessageType_COMMITTED :
consensus . validatorSanityChecks ( msg ) :
consensus . onCommitted ( msg )
consensus . onCommitted ( msg )
case msg_pb . MessageType_VIEWCHANGE :
// Handle leader intended messages now
case t == msg_pb . MessageType_PREPARE &&
intendedForLeader &&
consensus . leaderSanityChecks ( msg ) :
consensus . onPrepare ( msg )
case t == msg_pb . MessageType_COMMIT &&
intendedForLeader &&
consensus . leaderSanityChecks ( msg ) :
consensus . onCommit ( msg )
case t == msg_pb . MessageType_VIEWCHANGE :
consensus . onViewChange ( msg )
consensus . onViewChange ( msg )
case msg_pb . MessageType_NEWVIEW :
case t == msg_pb . MessageType_NEWVIEW :
consensus . onNewView ( msg )
consensus . onNewView ( msg )
}
}
}
}
// TODO: move to consensus_leader.go later
func ( consensus * Consensus ) announce ( block * types . Block ) {
blockHash := block . Hash ( )
copy ( consensus . blockHash [ : ] , blockHash [ : ] )
// prepare message and broadcast to validators
encodedBlock , err := rlp . EncodeToBytes ( block )
if err != nil {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[Announce] Failed encoding block" )
return
}
encodedBlockHeader , err := rlp . EncodeToBytes ( block . Header ( ) )
if err != nil {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[Announce] Failed encoding block header" )
return
}
consensus . block = encodedBlock
consensus . blockHeader = encodedBlockHeader
msgToSend := consensus . constructAnnounceMessage ( )
// TODO Finish using this refactored way
// msgToSend := consensus.construct(quorum.Announce)
// save announce message to FBFTLog
// msgPayload, _ := proto.GetConsensusMessagePayload(msgToSend.Bytes)
msgPayload , _ := proto . GetConsensusMessagePayload ( msgToSend )
// TODO(chao): don't unmarshall the message here and direclty pass the original object.
msg := & msg_pb . Message { }
_ = protobuf . Unmarshal ( msgPayload , msg )
FPBTMsg , err := ParseFBFTMessage ( msg )
if err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[Announce] Unable to parse FPBT message" )
return
}
// TODO(chao): review FPBT log data structure
consensus . FBFTLog . AddMessage ( FPBTMsg )
consensus . getLogger ( ) . Debug ( ) .
Str ( "MsgBlockHash" , FPBTMsg . BlockHash . Hex ( ) ) .
Uint64 ( "MsgViewID" , FPBTMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , FPBTMsg . BlockNum ) .
Msg ( "[Announce] Added Announce message in FPBT" )
consensus . FBFTLog . AddBlock ( block )
// Leader sign the block hash itself
consensus . Decider . AddSignature (
quorum . Prepare ,
consensus . PubKey ,
consensus . priKey . SignHash ( consensus . blockHash [ : ] ) ,
consensus . LeaderPubKey ,
consensus . blockNum ,
)
if err := consensus . prepareBitmap . SetKey ( consensus . PubKey , true ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[Announce] Leader prepareBitmap SetKey failed" )
return
}
// Construct broadcast p2p message
if err := consensus . msgSender . SendWithRetry (
consensus . blockNum , msg_pb . MessageType_ANNOUNCE , [ ] nodeconfig . GroupID {
nodeconfig . NewGroupIDByShardID ( nodeconfig . ShardID ( consensus . ShardID ) ) ,
} , host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) .
Str ( "groupID" , string ( nodeconfig . NewGroupIDByShardID (
nodeconfig . ShardID ( consensus . ShardID ) ,
) ) ) .
Msg ( "[Announce] Cannot send announce message" )
} else {
consensus . getLogger ( ) . Info ( ) .
Str ( "blockHash" , block . Hash ( ) . Hex ( ) ) .
Uint64 ( "blockNum" , block . NumberU64 ( ) ) .
Msg ( "[Announce] Sent Announce Message!!" )
}
consensus . getLogger ( ) . Debug ( ) .
Str ( "From" , consensus . phase . String ( ) ) .
Str ( "To" , FBFTPrepare . String ( ) ) .
Msg ( "[Announce] Switching phase" )
consensus . switchPhase ( FBFTPrepare , true )
}
func ( consensus * Consensus ) onAnnounce ( msg * msg_pb . Message ) {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnAnnounce] Receive announce message" )
if consensus . IsLeader ( ) && consensus . current . Mode ( ) == Normal {
return
}
senderKey , err := consensus . verifySenderKey ( msg )
if err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "[OnAnnounce] VerifySenderKey failed" )
return
}
if ! senderKey . IsEqual ( consensus . LeaderPubKey ) &&
consensus . current . Mode ( ) == Normal && ! consensus . ignoreViewIDCheck {
consensus . getLogger ( ) . Warn ( ) .
Str ( "senderKey" , senderKey . SerializeToHexStr ( ) ) .
Str ( "leaderKey" , consensus . LeaderPubKey . SerializeToHexStr ( ) ) .
Msg ( "[OnAnnounce] SenderKey does not match leader PubKey" )
return
}
if err = verifyMessageSig ( senderKey , msg ) ; err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "[OnAnnounce] Failed to verify leader signature" )
return
}
recvMsg , err := ParseFBFTMessage ( msg )
if err != nil {
consensus . getLogger ( ) . Error ( ) .
Err ( err ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnAnnounce] Unparseable leader message" )
return
}
// verify validity of block header object
// TODO: think about just sending the block hash instead of the header.
encodedHeader := recvMsg . Payload
header := new ( block . Header )
err = rlp . DecodeBytes ( encodedHeader , header )
if err != nil {
consensus . getLogger ( ) . Warn ( ) .
Err ( err ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnAnnounce] Unparseable block header data" )
return
}
if recvMsg . BlockNum < consensus . blockNum || recvMsg . BlockNum != header . Number ( ) . Uint64 ( ) {
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "hdrBlockNum" , header . Number ( ) . Uint64 ( ) ) .
Uint64 ( "consensuBlockNum" , consensus . blockNum ) .
Msg ( "[OnAnnounce] BlockNum does not match" )
return
}
if consensus . current . Mode ( ) == Normal {
if err = chain . Engine . VerifyHeader ( consensus . ChainReader , header , true ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) .
Err ( err ) .
Str ( "inChain" , consensus . ChainReader . CurrentHeader ( ) . Number ( ) . String ( ) ) .
Str ( "MsgBlockNum" , header . Number ( ) . String ( ) ) .
Msg ( "[OnAnnounce] Block content is not verified successfully" )
return
}
//VRF/VDF is only generated in the beach chain
if consensus . NeedsRandomNumberGeneration ( header . Epoch ( ) ) {
//validate the VRF with proof if a non zero VRF is found in header
if len ( header . Vrf ( ) ) > 0 {
if ! consensus . ValidateVrfAndProof ( header ) {
return
}
}
//validate the VDF with proof if a non zero VDF is found in header
if len ( header . Vdf ( ) ) > 0 {
if ! consensus . ValidateVdfAndProof ( header ) {
return
}
}
}
}
logMsgs := consensus . FBFTLog . GetMessagesByTypeSeqView (
msg_pb . MessageType_ANNOUNCE , recvMsg . BlockNum , recvMsg . ViewID ,
)
if len ( logMsgs ) > 0 {
if logMsgs [ 0 ] . BlockHash != recvMsg . BlockHash &&
logMsgs [ 0 ] . SenderPubkey . IsEqual ( recvMsg . SenderPubkey ) {
consensus . getLogger ( ) . Debug ( ) .
Str ( "logMsgSenderKey" , logMsgs [ 0 ] . SenderPubkey . SerializeToHexStr ( ) ) .
Str ( "logMsgBlockHash" , logMsgs [ 0 ] . BlockHash . Hex ( ) ) .
Str ( "recvMsg.SenderPubkey" , recvMsg . SenderPubkey . SerializeToHexStr ( ) ) .
Uint64 ( "recvMsg.BlockNum" , recvMsg . BlockNum ) .
Uint64 ( "recvMsg.ViewID" , recvMsg . ViewID ) .
Str ( "recvMsgBlockHash" , recvMsg . BlockHash . Hex ( ) ) .
Str ( "LeaderKey" , consensus . LeaderPubKey . SerializeToHexStr ( ) ) .
Msg ( "[OnAnnounce] Leader is malicious" )
if consensus . current . Mode ( ) == ViewChanging {
viewID := consensus . current . ViewID ( )
consensus . startViewChange ( viewID + 1 )
} else {
consensus . startViewChange ( consensus . viewID + 1 )
}
}
consensus . getLogger ( ) . Debug ( ) .
Str ( "leaderKey" , consensus . LeaderPubKey . SerializeToHexStr ( ) ) .
Msg ( "[OnAnnounce] Announce message received again" )
//return
}
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnAnnounce] Announce message Added" )
consensus . FBFTLog . AddMessage ( recvMsg )
consensus . mutex . Lock ( )
defer consensus . mutex . Unlock ( )
consensus . blockHash = recvMsg . BlockHash
// we have already added message and block, skip check viewID and send prepare message if is in ViewChanging mode
if consensus . current . Mode ( ) == ViewChanging {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnAnnounce] Still in ViewChanging Mode, Exiting !!" )
return
}
if consensus . checkViewID ( recvMsg ) != nil {
if consensus . current . Mode ( ) == Normal {
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnAnnounce] ViewID check failed" )
}
return
}
consensus . prepare ( )
}
// tryPrepare will try to send prepare message
func ( consensus * Consensus ) prepare ( ) {
// Construct and send prepare message
msgToSend := consensus . constructPrepareMessage ( )
// TODO: this will not return immediatey, may block
if err := consensus . msgSender . SendWithoutRetry ( [ ] nodeconfig . GroupID { nodeconfig . NewGroupIDByShardID ( nodeconfig . ShardID ( consensus . ShardID ) ) } , host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[OnAnnounce] Cannot send prepare message" )
} else {
consensus . getLogger ( ) . Info ( ) .
Str ( "blockHash" , hex . EncodeToString ( consensus . blockHash [ : ] ) ) .
Msg ( "[OnAnnounce] Sent Prepare Message!!" )
}
consensus . getLogger ( ) . Debug ( ) .
Str ( "From" , consensus . phase . String ( ) ) .
Str ( "To" , FBFTPrepare . String ( ) ) .
Msg ( "[Announce] Switching Phase" )
consensus . switchPhase ( FBFTPrepare , true )
}
// TODO: move to consensus_leader.go later
func ( consensus * Consensus ) onPrepare ( msg * msg_pb . Message ) {
if ! consensus . IsLeader ( ) {
return
}
senderKey , err := consensus . verifySenderKey ( msg )
if err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "[OnPrepare] VerifySenderKey failed" )
return
}
if err = verifyMessageSig ( senderKey , msg ) ; err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "[OnPrepare] Failed to verify sender's signature" )
return
}
recvMsg , err := ParseFBFTMessage ( msg )
if err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "[OnPrepare] Unparseable validator message" )
return
}
if recvMsg . ViewID != consensus . viewID || recvMsg . BlockNum != consensus . blockNum {
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Msg ( "[OnPrepare] Message ViewId or BlockNum not match" )
return
}
if ! consensus . FBFTLog . HasMatchingViewAnnounce (
consensus . blockNum , consensus . viewID , recvMsg . BlockHash ,
) {
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Msg ( "[OnPrepare] No Matching Announce message" )
//return
}
validatorPubKey := recvMsg . SenderPubkey
prepareSig := recvMsg . Payload
prepareBitmap := consensus . prepareBitmap
consensus . mutex . Lock ( )
defer consensus . mutex . Unlock ( )
logger := consensus . getLogger ( ) . With ( ) .
Str ( "validatorPubKey" , validatorPubKey . SerializeToHexStr ( ) ) . Logger ( )
// proceed only when the message is not received before
signed := consensus . Decider . ReadSignature ( quorum . Prepare , validatorPubKey )
if signed != nil {
logger . Debug ( ) .
Msg ( "[OnPrepare] Already Received prepare message from the validator" )
return
}
if consensus . Decider . IsQuorumAchieved ( quorum . Prepare ) {
// already have enough signatures
logger . Debug ( ) . Msg ( "[OnPrepare] Received Additional Prepare Message" )
return
}
// Check BLS signature for the multi-sig
var sign bls . Sign
err = sign . Deserialize ( prepareSig )
if err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) .
Msg ( "[OnPrepare] Failed to deserialize bls signature" )
return
}
if ! sign . VerifyHash ( recvMsg . SenderPubkey , consensus . blockHash [ : ] ) {
consensus . getLogger ( ) . Error ( ) . Msg ( "[OnPrepare] Received invalid BLS signature" )
return
}
logger = logger . With ( ) .
Int64 ( "NumReceivedSoFar" , consensus . Decider . SignersCount ( quorum . Prepare ) ) .
Int64 ( "PublicKeys" , consensus . Decider . ParticipantsCount ( ) ) . Logger ( )
logger . Info ( ) . Msg ( "[OnPrepare] Received New Prepare Signature" )
consensus . Decider . AddSignature (
quorum . Prepare , validatorPubKey , & sign , consensus . LeaderPubKey , consensus . blockNum ,
)
// Set the bitmap indicating that this validator signed.
if err := prepareBitmap . SetKey ( recvMsg . SenderPubkey , true ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[OnPrepare] prepareBitmap.SetKey failed" )
return
}
if consensus . Decider . IsQuorumAchieved ( quorum . Prepare ) {
logger . Debug ( ) . Msg ( "[OnPrepare] Received Enough Prepare Signatures" )
// Construct and broadcast prepared message
msgToSend , aggSig := consensus . constructPreparedMessage ( )
consensus . aggregatedPrepareSig = aggSig
//leader adds prepared message to log
// TODO(chao): don't unmarshall the payload again
msgPayload , _ := proto . GetConsensusMessagePayload ( msgToSend )
msg := & msg_pb . Message { }
_ = protobuf . Unmarshal ( msgPayload , msg )
FBFTMsg , err := ParseFBFTMessage ( msg )
if err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[OnPrepare] Unable to parse pbft message" )
return
}
consensus . FBFTLog . AddMessage ( FBFTMsg )
// Leader add commit phase signature
blockNumHash := make ( [ ] byte , 8 )
binary . LittleEndian . PutUint64 ( blockNumHash , consensus . blockNum )
commitPayload := append ( blockNumHash , consensus . blockHash [ : ] ... )
consensus . Decider . AddSignature (
quorum . Commit ,
consensus . PubKey ,
consensus . priKey . SignHash ( commitPayload ) ,
consensus . LeaderPubKey ,
consensus . blockNum ,
)
if err := consensus . commitBitmap . SetKey ( consensus . PubKey , true ) ; err != nil {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnPrepare] Leader commit bitmap set failed" )
return
}
if err := consensus . msgSender . SendWithRetry (
consensus . blockNum ,
msg_pb . MessageType_PREPARED , [ ] nodeconfig . GroupID {
nodeconfig . NewGroupIDByShardID ( nodeconfig . ShardID ( consensus . ShardID ) ) ,
} ,
host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) ,
) ; err != nil {
consensus . getLogger ( ) . Warn ( ) . Msg ( "[OnPrepare] Cannot send prepared message" )
} else {
consensus . getLogger ( ) . Debug ( ) .
Hex ( "blockHash" , consensus . blockHash [ : ] ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Msg ( "[OnPrepare] Sent Prepared Message!!" )
}
consensus . msgSender . StopRetry ( msg_pb . MessageType_ANNOUNCE )
// Stop retry committed msg of last consensus
consensus . msgSender . StopRetry ( msg_pb . MessageType_COMMITTED )
consensus . getLogger ( ) . Debug ( ) .
Str ( "From" , consensus . phase . String ( ) ) .
Str ( "To" , FBFTCommit . String ( ) ) .
Msg ( "[OnPrepare] Switching phase" )
consensus . switchPhase ( FBFTCommit , true )
}
}
func ( consensus * Consensus ) onPrepared ( msg * msg_pb . Message ) {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnPrepared] Received Prepared message" )
if consensus . IsLeader ( ) && consensus . current . Mode ( ) == Normal {
return
}
senderKey , err := consensus . verifySenderKey ( msg )
if err != nil {
consensus . getLogger ( ) . Debug ( ) . Err ( err ) . Msg ( "[OnPrepared] VerifySenderKey failed" )
return
}
if ! senderKey . IsEqual ( consensus . LeaderPubKey ) &&
consensus . current . Mode ( ) == Normal && ! consensus . ignoreViewIDCheck {
consensus . getLogger ( ) . Warn ( ) . Msg ( "[OnPrepared] SenderKey not match leader PubKey" )
return
}
if err := verifyMessageSig ( senderKey , msg ) ; err != nil {
consensus . getLogger ( ) . Debug ( ) . Err ( err ) . Msg ( "[OnPrepared] Failed to verify sender's signature" )
return
}
recvMsg , err := ParseFBFTMessage ( msg )
if err != nil {
consensus . getLogger ( ) . Debug ( ) . Err ( err ) . Msg ( "[OnPrepared] Unparseable validator message" )
return
}
consensus . getLogger ( ) . Info ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Msg ( "[OnPrepared] Received prepared message" )
if recvMsg . BlockNum < consensus . blockNum {
consensus . getLogger ( ) . Debug ( ) . Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) . Msg ( "Old Block Received, ignoring!!" )
return
}
// check validity of prepared signature
blockHash := recvMsg . BlockHash
aggSig , mask , err := consensus . ReadSignatureBitmapPayload ( recvMsg . Payload , 0 )
if err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "ReadSignatureBitmapPayload failed!!" )
return
}
if ! consensus . Decider . IsQuorumAchievedByMask ( mask , true ) {
consensus . getLogger ( ) . Warn ( ) .
Msgf ( "[OnPrepared] Quorum Not achieved" )
return
}
if ! aggSig . VerifyHash ( mask . AggregatePublic , blockHash [ : ] ) {
myBlockHash := common . Hash { }
myBlockHash . SetBytes ( consensus . blockHash [ : ] )
consensus . getLogger ( ) . Warn ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Msg ( "[OnPrepared] failed to verify multi signature for prepare phase" )
return
}
// check validity of block
block := recvMsg . Block
var blockObj types . Block
err = rlp . DecodeBytes ( block , & blockObj )
if err != nil {
consensus . getLogger ( ) . Warn ( ) .
Err ( err ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnPrepared] Unparseable block header data" )
return
}
if blockObj . NumberU64 ( ) != recvMsg . BlockNum || recvMsg . BlockNum < consensus . blockNum {
consensus . getLogger ( ) . Warn ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "blockNum" , blockObj . NumberU64 ( ) ) .
Msg ( "[OnPrepared] BlockNum not match" )
return
}
if blockObj . Header ( ) . Hash ( ) != recvMsg . BlockHash {
consensus . getLogger ( ) . Warn ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Hex ( "MsgBlockHash" , recvMsg . BlockHash [ : ] ) .
Str ( "blockObjHash" , blockObj . Header ( ) . Hash ( ) . Hex ( ) ) .
Msg ( "[OnPrepared] BlockHash not match" )
return
}
if consensus . current . Mode ( ) == Normal {
err := chain . Engine . VerifyHeader ( consensus . ChainReader , blockObj . Header ( ) , true )
if err != nil {
consensus . getLogger ( ) . Error ( ) .
Err ( err ) .
Str ( "inChain" , consensus . ChainReader . CurrentHeader ( ) . Number ( ) . String ( ) ) .
Str ( "MsgBlockNum" , blockObj . Header ( ) . Number ( ) . String ( ) ) .
Msg ( "[OnPrepared] Block header is not verified successfully" )
return
}
if consensus . BlockVerifier == nil {
// do nothing
} else if err := consensus . BlockVerifier ( & blockObj ) ; err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "[OnPrepared] Block verification failed" )
return
}
}
consensus . FBFTLog . AddBlock ( & blockObj )
recvMsg . Block = [ ] byte { } // save memory space
consensus . FBFTLog . AddMessage ( recvMsg )
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Hex ( "blockHash" , recvMsg . BlockHash [ : ] ) .
Msg ( "[OnPrepared] Prepared message and block added" )
consensus . mutex . Lock ( )
defer consensus . mutex . Unlock ( )
consensus . tryCatchup ( )
if consensus . current . Mode ( ) == ViewChanging {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnPrepared] Still in ViewChanging mode, Exiting!!" )
return
}
if consensus . checkViewID ( recvMsg ) != nil {
if consensus . current . Mode ( ) == Normal {
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnPrepared] ViewID check failed" )
}
return
}
if recvMsg . BlockNum > consensus . blockNum {
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Msg ( "[OnPrepared] Future Block Received, ignoring!!" )
return
}
// add block field
blockPayload := make ( [ ] byte , len ( block ) )
copy ( blockPayload [ : ] , block [ : ] )
consensus . block = blockPayload
// add preparedSig field
consensus . aggregatedPrepareSig = aggSig
consensus . prepareBitmap = mask
// Optimistically add blockhash field of prepare message
emptyHash := [ 32 ] byte { }
if bytes . Compare ( consensus . blockHash [ : ] , emptyHash [ : ] ) == 0 {
copy ( consensus . blockHash [ : ] , blockHash [ : ] )
}
// Construct and send the commit message
// TODO: should only sign on block hash
blockNumBytes := make ( [ ] byte , 8 )
binary . LittleEndian . PutUint64 ( blockNumBytes , consensus . blockNum )
commitPayload := append ( blockNumBytes , consensus . blockHash [ : ] ... )
msgToSend := consensus . constructCommitMessage ( commitPayload )
// TODO: genesis account node delay for 1 second, this is a temp fix for allows FN nodes to earning reward
if consensus . delayCommit > 0 {
time . Sleep ( consensus . delayCommit )
}
if err := consensus . msgSender . SendWithoutRetry ( [ ] nodeconfig . GroupID { nodeconfig . NewGroupIDByShardID ( nodeconfig . ShardID ( consensus . ShardID ) ) } , host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) . Msg ( "[OnPrepared] Cannot send commit message!!" )
} else {
consensus . getLogger ( ) . Info ( ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Hex ( "blockHash" , consensus . blockHash [ : ] ) .
Msg ( "[OnPrepared] Sent Commit Message!!" )
}
consensus . getLogger ( ) . Debug ( ) .
Str ( "From" , consensus . phase . String ( ) ) .
Str ( "To" , FBFTCommit . String ( ) ) .
Msg ( "[OnPrepared] Switching phase" )
consensus . switchPhase ( FBFTCommit , true )
return
}
// TODO: move it to consensus_leader.go later
func ( consensus * Consensus ) onCommit ( msg * msg_pb . Message ) {
if ! consensus . IsLeader ( ) {
return
}
senderKey , err := consensus . verifySenderKey ( msg )
if err != nil {
consensus . getLogger ( ) . Debug ( ) . Msgf ( "[OnCommit] VerifySenderKey Failed %s" , err . Error ( ) )
return
}
if err = verifyMessageSig ( senderKey , msg ) ; err != nil {
consensus . getLogger ( ) . Debug ( ) . Err ( err ) . Msg ( "[OnCommit] Failed to verify sender's signature" )
return
}
recvMsg , err := ParseFBFTMessage ( msg )
if err != nil {
consensus . getLogger ( ) . Debug ( ) . Err ( err ) . Msg ( "[OnCommit] Parse pbft message failed" )
return
}
if recvMsg . ViewID != consensus . viewID || recvMsg . BlockNum != consensus . blockNum {
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Str ( "ValidatorPubKey" , recvMsg . SenderPubkey . SerializeToHexStr ( ) ) .
Msg ( "[OnCommit] BlockNum/viewID not match" )
return
}
if ! consensus . FBFTLog . HasMatchingAnnounce ( consensus . blockNum , recvMsg . BlockHash ) {
consensus . getLogger ( ) . Debug ( ) .
Hex ( "MsgBlockHash" , recvMsg . BlockHash [ : ] ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Msg ( "[OnCommit] Cannot find matching blockhash" )
return
}
if ! consensus . FBFTLog . HasMatchingPrepared ( consensus . blockNum , recvMsg . BlockHash ) {
consensus . getLogger ( ) . Debug ( ) .
Hex ( "blockHash" , recvMsg . BlockHash [ : ] ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Msg ( "[OnCommit] Cannot find matching prepared message" )
return
}
validatorPubKey := recvMsg . SenderPubkey
commitSig := recvMsg . Payload
consensus . mutex . Lock ( )
defer consensus . mutex . Unlock ( )
logger := consensus . getLogger ( ) . With ( ) .
Str ( "validatorPubKey" , validatorPubKey . SerializeToHexStr ( ) ) . Logger ( )
if ! consensus . IsValidatorInCommittee ( recvMsg . SenderPubkey ) {
logger . Error ( ) . Msg ( "[OnCommit] Invalid validator" )
return
}
commitBitmap := consensus . commitBitmap
// proceed only when the message is not received before
signed := consensus . Decider . ReadSignature ( quorum . Commit , validatorPubKey )
if signed != nil {
logger . Debug ( ) .
Msg ( "[OnCommit] Already received commit message from the validator" )
return
}
// has to be called before verifying signature
quorumWasMet := consensus . Decider . IsQuorumAchieved ( quorum . Commit )
// Verify the signature on commitPayload is correct
var sign bls . Sign
err = sign . Deserialize ( commitSig )
if err != nil {
logger . Debug ( ) . Msg ( "[OnCommit] Failed to deserialize bls signature" )
return
}
blockNumHash := make ( [ ] byte , 8 )
binary . LittleEndian . PutUint64 ( blockNumHash , recvMsg . BlockNum )
commitPayload := append ( blockNumHash , recvMsg . BlockHash [ : ] ... )
logger = logger . With ( ) . Uint64 ( "MsgViewID" , recvMsg . ViewID ) . Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) . Logger ( )
if ! sign . VerifyHash ( recvMsg . SenderPubkey , commitPayload ) {
logger . Error ( ) . Msg ( "[OnCommit] Cannot verify commit message" )
return
}
logger = logger . With ( ) .
Int64 ( "numReceivedSoFar" , consensus . Decider . SignersCount ( quorum . Commit ) ) .
Logger ( )
logger . Info ( ) . Msg ( "[OnCommit] Received new commit message" )
consensus . Decider . AddSignature (
quorum . Commit , validatorPubKey , & sign , consensus . LeaderPubKey , consensus . blockNum ,
)
// Set the bitmap indicating that this validator signed.
if err := commitBitmap . SetKey ( recvMsg . SenderPubkey , true ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[OnCommit] commitBitmap.SetKey failed" )
return
}
quorumIsMet := consensus . Decider . IsQuorumAchieved ( quorum . Commit )
if ! quorumWasMet && quorumIsMet {
logger . Info ( ) . Msg ( "[OnCommit] 2/3 Enough commits received" )
go func ( viewID uint64 ) {
time . Sleep ( 2 * time . Second )
logger . Debug ( ) . Msg ( "[OnCommit] Commit Grace Period Ended" )
consensus . commitFinishChan <- viewID
} ( consensus . viewID )
consensus . msgSender . StopRetry ( msg_pb . MessageType_PREPARED )
}
if consensus . Decider . IsRewardThresholdAchieved ( ) {
go func ( viewID uint64 ) {
consensus . commitFinishChan <- viewID
logger . Info ( ) . Msg ( "[OnCommit] 90% Enough commits received" )
} ( consensus . viewID )
}
}
func ( consensus * Consensus ) finalizeCommits ( ) {
func ( consensus * Consensus ) finalizeCommits ( ) {
consensus . getLogger ( ) . Info ( ) .
consensus . getLogger ( ) . Info ( ) .
Int64 ( "NumCommits" , consensus . Decider . SignersCount ( quorum . Commit ) ) .
Int64 ( "NumCommits" , consensus . Decider . SignersCount ( quorum . Commit ) ) .
Msg ( "[Finalizing] Finalizing Block" )
Msg ( "[Finalizing] Finalizing Block" )
beforeCatchupNum := consensus . blockNum
beforeCatchupNum := consensus . blockNum
// Construct committed message
// Construct committed message
msgToSend , aggSig := consensus . constructCommittedMessage ( )
network , err := consensus . construct ( msg_pb . MessageType_COMMITTED , nil )
consensus . aggregatedCommitSig = aggSig // this may not needed
// leader adds committed message to log
msgPayload , _ := proto . GetConsensusMessagePayload ( msgToSend )
msg := & msg_pb . Message { }
_ = protobuf . Unmarshal ( msgPayload , msg )
pbftMsg , err := ParseFBFTMessage ( msg )
if err != nil {
if err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[FinalizeCommits] Unable to parse pbft message" )
consensus . getLogger ( ) . Warn ( ) . Err ( err ) .
Msg ( "[FinalizeCommits] Unable to construct Committed message" )
return
return
}
}
consensus . FBFTLog . AddMessage ( pbftMsg )
msgToSend , aggSig , FBFTMsg :=
network . Bytes ,
network . OptionalAggregateSignature ,
network . FBFTMsg
consensus . aggregatedCommitSig = aggSig // this may not needed
consensus . FBFTLog . AddMessage ( FBFTMsg )
// find correct block content
// find correct block content
curBlockHash := consensus . blockHash
curBlockHash := consensus . blockHash
block := consensus . FBFTLog . GetBlockByHash ( curBlockHash )
block := consensus . FBFTLog . GetBlockByHash ( curBlockHash )
@ -830,7 +140,7 @@ func (consensus *Consensus) finalizeCommits() {
return
return
}
}
consensus . ChainReader . WriteLastCommits ( pbft Msg. Payload )
consensus . ChainReader . WriteLastCommits ( FBFT Msg. Payload )
// if leader success finalize the block, send committed message to validators
// if leader success finalize the block, send committed message to validators
if err := consensus . msgSender . SendWithRetry (
if err := consensus . msgSender . SendWithRetry (
@ -876,114 +186,8 @@ func (consensus *Consensus) finalizeCommits() {
consensus . ReadySignal <- struct { } { }
consensus . ReadySignal <- struct { } { }
}
}
func ( consensus * Consensus ) onCommitted ( msg * msg_pb . Message ) {
// LastCommitSig returns the byte array of aggregated
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnCommitted] Receive committed message" )
// commit signature and bitmap of last block
if consensus . IsLeader ( ) && consensus . current . Mode ( ) == Normal {
return
}
senderKey , err := consensus . verifySenderKey ( msg )
if err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[OnCommitted] verifySenderKey failed" )
return
}
if ! senderKey . IsEqual ( consensus . LeaderPubKey ) &&
consensus . current . Mode ( ) == Normal && ! consensus . ignoreViewIDCheck {
consensus . getLogger ( ) . Warn ( ) . Msg ( "[OnCommitted] senderKey not match leader PubKey" )
return
}
if err = verifyMessageSig ( senderKey , msg ) ; err != nil {
consensus . getLogger ( ) . Warn ( ) . Err ( err ) . Msg ( "[OnCommitted] Failed to verify sender's signature" )
return
}
recvMsg , err := ParseFBFTMessage ( msg )
if err != nil {
consensus . getLogger ( ) . Warn ( ) . Msg ( "[OnCommitted] unable to parse msg" )
return
}
if recvMsg . BlockNum < consensus . blockNum {
consensus . getLogger ( ) . Info ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Uint64 ( "blockNum" , consensus . blockNum ) .
Msg ( "[OnCommitted] Received Old Blocks!!" )
return
}
aggSig , mask , err := consensus . ReadSignatureBitmapPayload ( recvMsg . Payload , 0 )
if err != nil {
consensus . getLogger ( ) . Error ( ) . Err ( err ) . Msg ( "[OnCommitted] readSignatureBitmapPayload failed" )
return
}
if ! consensus . Decider . IsQuorumAchievedByMask ( mask , true ) {
consensus . getLogger ( ) . Warn ( ) .
Msgf ( "[OnCommitted] Quorum Not achieved" )
return
}
blockNumBytes := make ( [ ] byte , 8 )
binary . LittleEndian . PutUint64 ( blockNumBytes , recvMsg . BlockNum )
commitPayload := append ( blockNumBytes , recvMsg . BlockHash [ : ] ... )
if ! aggSig . VerifyHash ( mask . AggregatePublic , commitPayload ) {
consensus . getLogger ( ) . Error ( ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnCommitted] Failed to verify the multi signature for commit phase" )
return
}
consensus . FBFTLog . AddMessage ( recvMsg )
consensus . ChainReader . WriteLastCommits ( recvMsg . Payload )
consensus . getLogger ( ) . Debug ( ) .
Uint64 ( "MsgViewID" , recvMsg . ViewID ) .
Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) .
Msg ( "[OnCommitted] Committed message added" )
consensus . mutex . Lock ( )
defer consensus . mutex . Unlock ( )
consensus . aggregatedCommitSig = aggSig
consensus . commitBitmap = mask
if recvMsg . BlockNum - consensus . blockNum > consensusBlockNumBuffer {
consensus . getLogger ( ) . Debug ( ) . Uint64 ( "MsgBlockNum" , recvMsg . BlockNum ) . Msg ( "[OnCommitted] out of sync" )
go func ( ) {
select {
case consensus . blockNumLowChan <- struct { } { } :
consensus . current . SetMode ( Syncing )
for _ , v := range consensus . consensusTimeout {
v . Stop ( )
}
case <- time . After ( 1 * time . Second ) :
}
} ( )
return
}
// if consensus.checkViewID(recvMsg) != nil {
// consensus.getLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
// return
// }
consensus . tryCatchup ( )
if consensus . current . Mode ( ) == ViewChanging {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnCommitted] Still in ViewChanging mode, Exiting!!" )
return
}
if consensus . consensusTimeout [ timeoutBootstrap ] . IsActive ( ) {
consensus . consensusTimeout [ timeoutBootstrap ] . Stop ( )
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnCommitted] Start consensus timer; stop bootstrap timer only once" )
} else {
consensus . getLogger ( ) . Debug ( ) . Msg ( "[OnCommitted] Start consensus timer" )
}
consensus . consensusTimeout [ timeoutConsensus ] . Start ( )
return
}
// LastCommitSig returns the byte array of aggregated commit signature and bitmap of last block
func ( consensus * Consensus ) LastCommitSig ( ) ( [ ] byte , [ ] byte , error ) {
func ( consensus * Consensus ) LastCommitSig ( ) ( [ ] byte , [ ] byte , error ) {
if consensus . blockNum <= 1 {
if consensus . blockNum <= 1 {
return nil , nil , nil
return nil , nil , nil
@ -1013,9 +217,6 @@ func (consensus *Consensus) LastCommitSig() ([]byte, []byte, error) {
// try to catch up if fall behind
// try to catch up if fall behind
func ( consensus * Consensus ) tryCatchup ( ) {
func ( consensus * Consensus ) tryCatchup ( ) {
consensus . getLogger ( ) . Info ( ) . Msg ( "[TryCatchup] commit new blocks" )
consensus . getLogger ( ) . Info ( ) . Msg ( "[TryCatchup] commit new blocks" )
// if consensus.phase != Commit && consensus.mode.Mode() == Normal {
// return
// }
currentBlockNum := consensus . blockNum
currentBlockNum := consensus . blockNum
for {
for {
msgs := consensus . FBFTLog . GetMessagesByTypeSeq ( msg_pb . MessageType_COMMITTED , consensus . blockNum )
msgs := consensus . FBFTLog . GetMessagesByTypeSeq ( msg_pb . MessageType_COMMITTED , consensus . blockNum )
@ -1047,7 +248,9 @@ func (consensus *Consensus) tryCatchup() {
}
}
consensus . getLogger ( ) . Info ( ) . Msg ( "[TryCatchup] block found to commit" )
consensus . getLogger ( ) . Info ( ) . Msg ( "[TryCatchup] block found to commit" )
preparedMsgs := consensus . FBFTLog . GetMessagesByTypeSeqHash ( msg_pb . MessageType_PREPARED , msgs [ 0 ] . BlockNum , msgs [ 0 ] . BlockHash )
preparedMsgs := consensus . FBFTLog . GetMessagesByTypeSeqHash (
msg_pb . MessageType_PREPARED , msgs [ 0 ] . BlockNum , msgs [ 0 ] . BlockHash ,
)
msg := consensus . FBFTLog . FindMessageByMaxViewID ( preparedMsgs )
msg := consensus . FBFTLog . FindMessageByMaxViewID ( preparedMsgs )
if msg == nil {
if msg == nil {
break
break