package consensus
import (
"bytes"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
// PbftPhase PBFT phases: pre-prepare, prepare and commit
type PbftPhase int
// Enum for PbftPhase
const (
Announce PbftPhase = iota
Prepare
Commit
)
// Mode determines whether a node is in normal or viewchanging mode
type Mode int
// Enum for node Mode
const (
Normal Mode = iota
ViewChanging
)
// PbftMode contains mode and viewID of viewchanging
type PbftMode struct {
mode Mode
viewID uint32
mux sync . Mutex
}
// Mode return the current node mode
func ( pm * PbftMode ) Mode ( ) Mode {
return pm . mode
}
// SetMode set the node mode as required
func ( pm * PbftMode ) SetMode ( m Mode ) {
pm . mux . Lock ( )
defer pm . mux . Unlock ( )
pm . mode = m
}
// ViewID return the current viewchanging id
func ( pm * PbftMode ) ViewID ( ) uint32 {
return pm . viewID
}
// SetViewID sets the viewchanging id accordingly
func ( pm * PbftMode ) SetViewID ( viewID uint32 ) {
pm . mux . Lock ( )
defer pm . mux . Unlock ( )
pm . viewID = viewID
}
// GetViewID returns the current viewchange viewID
func ( pm * PbftMode ) GetViewID ( ) uint32 {
return pm . viewID
}
// switchPhase will switch PbftPhase to nextPhase if the desirePhase equals the nextPhase
func ( consensus * Consensus ) switchPhase ( desirePhase PbftPhase ) {
utils . GetLogInstance ( ) . Debug ( "switchPhase: " , "desirePhase" , desirePhase , "myPhase" , consensus . phase )
var nextPhase PbftPhase
switch consensus . phase {
case Announce :
nextPhase = Prepare
case Prepare :
nextPhase = Commit
case Commit :
nextPhase = Announce
}
if nextPhase == desirePhase {
consensus . phase = nextPhase
}
}
// GetNextLeaderKey uniquely determine who is the leader for given viewID
func ( consensus * Consensus ) GetNextLeaderKey ( ) * bls . PublicKey {
idx := consensus . getIndexOfPubKey ( consensus . LeaderPubKey )
if idx == - 1 {
utils . GetLogInstance ( ) . Warn ( "GetNextLeaderKey: currentLeaderKey not found" , "key" , consensus . LeaderPubKey . GetHexString ( ) )
}
idx = ( idx + 1 ) % len ( consensus . PublicKeys )
return consensus . PublicKeys [ idx ]
}
func ( consensus * Consensus ) getIndexOfPubKey ( pubKey * bls . PublicKey ) int {
for k , v := range consensus . PublicKeys {
if v . IsEqual ( pubKey ) {
return k
}
}
return - 1
}
// ResetViewChangeState reset the state for viewchange
func ( consensus * Consensus ) ResetViewChangeState ( ) {
consensus . mode . SetMode ( Normal )
bhpBitmap , _ := bls_cosi . NewMask ( consensus . PublicKeys , nil )
nilBitmap , _ := bls_cosi . NewMask ( consensus . PublicKeys , nil )
consensus . bhpBitmap = bhpBitmap
consensus . nilBitmap = nilBitmap
consensus . bhpSigs = map [ common . Address ] * bls . Sign { }
consensus . nilSigs = map [ common . Address ] * bls . Sign { }
consensus . aggregatedBHPSig = nil
consensus . aggregatedNILSig = nil
}
func createTimeout ( ) map [ string ] * utils . Timeout {
timeouts := make ( map [ string ] * utils . Timeout )
strs := [ ] string { "announce" , "prepare" , "commit" }
for _ , s := range strs {
timeouts [ s ] = utils . NewTimeout ( phaseDuration )
}
timeouts [ "bootstrap" ] = utils . NewTimeout ( bootstrapDuration )
timeouts [ "viewchange" ] = utils . NewTimeout ( viewChangeDuration )
return timeouts
}
// startViewChange send a new view change
func ( consensus * Consensus ) startViewChange ( viewID uint32 ) {
for k := range consensus . consensusTimeout {
if k != "viewchange" {
consensus . consensusTimeout [ k ] . Stop ( )
}
}
consensus . mode . SetMode ( ViewChanging )
consensus . mode . SetViewID ( viewID )
nextLeaderKey := consensus . GetNextLeaderKey ( )
consensus . LeaderPubKey = consensus . GetNextLeaderKey ( )
if nextLeaderKey . IsEqual ( consensus . PubKey ) {
return
}
diff := viewID - consensus . viewID
duration := time . Duration ( int64 ( diff ) * int64 ( viewChangeDuration ) )
utils . GetLogInstance ( ) . Info ( "startViewChange" , "viewID" , viewID , "timeoutDuration" , duration , "nextLeader" , consensus . LeaderPubKey . GetHexString ( ) [ : 10 ] )
msgToSend := consensus . constructViewChangeMessage ( )
consensus . host . SendMessageToGroups ( [ ] p2p . GroupID { p2p . NewGroupIDByShardID ( p2p . ShardID ( consensus . ShardID ) ) } , host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) )
consensus . consensusTimeout [ "viewchange" ] . SetDuration ( duration )
consensus . consensusTimeout [ "viewchange" ] . Start ( )
}
// new leader send new view message
func ( consensus * Consensus ) startNewView ( ) {
utils . GetLogInstance ( ) . Info ( "startNewView" , "viewID" , consensus . mode . GetViewID ( ) )
consensus . mode . SetMode ( Normal )
consensus . switchPhase ( Announce )
msgToSend := consensus . constructNewViewMessage ( )
consensus . host . SendMessageToGroups ( [ ] p2p . GroupID { p2p . NewGroupIDByShardID ( p2p . ShardID ( consensus . ShardID ) ) } , host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) )
}
func ( consensus * Consensus ) onViewChange ( msg * msg_pb . Message ) {
senderKey , validatorAddress , err := consensus . verifyViewChangeSenderKey ( msg )
if err != nil {
utils . GetLogInstance ( ) . Debug ( "onViewChange verifySenderKey failed" , "error" , err )
return
}
recvMsg , err := ParseViewChangeMessage ( msg )
if err != nil {
utils . GetLogInstance ( ) . Warn ( "onViewChange unable to parse viewchange message" )
return
}
newLeaderKey := recvMsg . LeaderPubkey
if ! consensus . PubKey . IsEqual ( newLeaderKey ) {
return
}
utils . GetLogInstance ( ) . Warn ( "onViewChange received" , "viewChangeID" , recvMsg . ViewID , "myCurrentID" , consensus . viewID , "ValidatorAddress" , consensus . SelfAddress )
if consensus . blockNum > recvMsg . BlockNum {
return
}
if consensus . mode . Mode ( ) == ViewChanging && consensus . mode . GetViewID ( ) > recvMsg . ViewID {
return
}
if err = verifyMessageSig ( senderKey , msg ) ; err != nil {
utils . GetLogInstance ( ) . Debug ( "onViewChange Failed to verify sender's signature" , "error" , err )
return
}
consensus . vcLock . Lock ( )
defer consensus . vcLock . Unlock ( )
consensus . mode . SetMode ( ViewChanging )
consensus . mode . SetViewID ( recvMsg . ViewID )
_ , ok1 := consensus . nilSigs [ consensus . SelfAddress ]
_ , ok2 := consensus . bhpSigs [ consensus . SelfAddress ]
if ! ( ok1 || ok2 ) {
// add own signature for newview message
preparedMsgs := consensus . pbftLog . GetMessagesByTypeSeq ( msg_pb . MessageType_PREPARED , recvMsg . BlockNum )
if len ( preparedMsgs ) == 0 {
sign := consensus . priKey . SignHash ( NIL )
consensus . nilSigs [ consensus . SelfAddress ] = sign
consensus . nilBitmap . SetKey ( consensus . PubKey , true )
} else {
if len ( preparedMsgs ) > 1 {
utils . GetLogInstance ( ) . Debug ( "onViewChange more than 1 prepared message for new leader" )
}
msgToSign := append ( preparedMsgs [ 0 ] . BlockHash [ : ] , preparedMsgs [ 0 ] . Payload ... )
consensus . bhpSigs [ consensus . SelfAddress ] = consensus . priKey . SignHash ( msgToSign )
consensus . bhpBitmap . SetKey ( consensus . PubKey , true )
}
}
if ( len ( consensus . bhpSigs ) + len ( consensus . nilSigs ) ) >= ( ( len ( consensus . PublicKeys ) * 2 ) / 3 + 1 ) {
return
}
// m2 type message
if len ( recvMsg . Payload ) == 0 {
_ , ok := consensus . nilSigs [ validatorAddress ]
if ok {
utils . GetLogInstance ( ) . Debug ( "onViewChange already received m2 message from the validator" , "validatorAddress" , validatorAddress )
return
}
if ! recvMsg . ViewchangeSig . VerifyHash ( senderKey , NIL ) {
utils . GetLogInstance ( ) . Warn ( "onViewChange failed to verify signature for m2 type viewchange message" )
return
}
consensus . nilSigs [ validatorAddress ] = recvMsg . ViewchangeSig
consensus . nilBitmap . SetKey ( recvMsg . SenderPubkey , true ) // Set the bitmap indicating that this validator signed.
} else { // m1 type message
_ , ok := consensus . bhpSigs [ validatorAddress ]
if ok {
utils . GetLogInstance ( ) . Debug ( "onViewChange already received m1 message from the validator" , "validatorAddress" , validatorAddress )
return
}
if ! recvMsg . ViewchangeSig . VerifyHash ( recvMsg . SenderPubkey , recvMsg . Payload ) {
utils . GetLogInstance ( ) . Warn ( "onViewChange failed to verify signature for m1 type viewchange message" )
return
}
// first time receive m1 type message, need verify validity of prepared message
if len ( consensus . m1Payload ) == 0 {
//#### Read payload data
offset := 0
blockHash := recvMsg . Payload [ offset : offset + 32 ]
offset += 32
// 48 byte of multi-sig
multiSig := recvMsg . Payload [ offset : offset + 48 ]
offset += 48
// bitmap
bitmap := recvMsg . Payload [ offset : ]
//#### END Read payload data
// Verify the multi-sig for prepare phase
deserializedMultiSig := bls . Sign { }
err = deserializedMultiSig . Deserialize ( multiSig )
if err != nil {
utils . GetLogInstance ( ) . Warn ( "onViewChange failed to deserialize the multi signature for prepared payload" , "error" , err )
return
}
mask , err := bls_cosi . NewMask ( consensus . PublicKeys , nil )
mask . SetMask ( bitmap )
// TODO: add 2f+1 signature checking
if ! deserializedMultiSig . VerifyHash ( mask . AggregatePublic , blockHash [ : ] ) || err != nil {
utils . GetLogInstance ( ) . Warn ( "onViewChange failed to verify multi signature for m1 prepared payload" , "error" , err , "blockHash" , blockHash )
return
}
consensus . m1Payload = append ( recvMsg . Payload [ : 0 : 0 ] , recvMsg . Payload ... )
}
// consensus.m1Payload already verified
if ! bytes . Equal ( consensus . m1Payload , recvMsg . Payload ) {
utils . GetLogInstance ( ) . Warn ( "onViewChange m1 message payload not equal, hence invalid" )
return
}
consensus . bhpSigs [ validatorAddress ] = recvMsg . ViewchangeSig
consensus . bhpBitmap . SetKey ( recvMsg . SenderPubkey , true ) // Set the bitmap indicating that this validator signed.
}
if ( len ( consensus . bhpSigs ) + len ( consensus . nilSigs ) ) >= ( ( len ( consensus . PublicKeys ) * 2 ) / 3 + 1 ) {
consensus . mode . SetMode ( Normal )
consensus . LeaderPubKey = consensus . PubKey
if len ( consensus . m1Payload ) == 0 {
consensus . phase = Announce
go func ( ) {
consensus . ReadySignal <- struct { } { }
} ( )
} else {
consensus . phase = Commit
copy ( consensus . blockHash [ : ] , consensus . m1Payload [ : 32 ] )
//#### Read payload data
offset := 32
// 48 byte of multi-sig
multiSig := recvMsg . Payload [ offset : offset + 48 ]
offset += 48
// bitmap
bitmap := recvMsg . Payload [ offset : ]
//#### END Read payload data
aggSig := bls . Sign { }
_ = aggSig . Deserialize ( multiSig )
mask , _ := bls_cosi . NewMask ( consensus . PublicKeys , nil )
mask . SetMask ( bitmap )
consensus . aggregatedPrepareSig = & aggSig
consensus . prepareBitmap = mask
// Leader sign the multi-sig and bitmap (for commit phase)
consensus . commitSigs [ consensus . SelfAddress ] = consensus . priKey . SignHash ( consensus . m1Payload [ 32 : ] )
}
msgToSend := consensus . constructNewViewMessage ( )
utils . GetLogInstance ( ) . Warn ( "onViewChange" , "sent newview message" , len ( msgToSend ) )
consensus . host . SendMessageToGroups ( [ ] p2p . GroupID { p2p . NewGroupIDByShardID ( p2p . ShardID ( consensus . ShardID ) ) } , host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) )
consensus . viewID = consensus . mode . GetViewID ( )
consensus . ResetViewChangeState ( )
consensus . ResetState ( )
consensus . consensusTimeout [ "viewchange" ] . Stop ( )
}
utils . GetLogInstance ( ) . Debug ( "onViewChange" , "numSigs" , len ( consensus . bhpSigs ) + len ( consensus . nilSigs ) , "needed" , ( len ( consensus . PublicKeys ) * 2 ) / 3 + 1 )
}
// TODO: move to consensus_leader.go later
func ( consensus * Consensus ) onNewView ( msg * msg_pb . Message ) {
utils . GetLogInstance ( ) . Info ( "onNewView received new view message" )
senderKey , _ , err := consensus . verifyViewChangeSenderKey ( msg )
if err != nil {
utils . GetLogInstance ( ) . Debug ( "onNewView verifySenderKey failed" , "error" , err )
return
}
recvMsg , err := consensus . ParseNewViewMessage ( msg )
if err != nil {
utils . GetLogInstance ( ) . Warn ( "onViewChange unable to parse viewchange message" )
return
}
if ! consensus . LeaderPubKey . IsEqual ( senderKey ) {
utils . GetLogInstance ( ) . Warn ( "onNewView key not match" , "senderKey" , senderKey . GetHexString ( ) [ : 10 ] , "newLeaderKey" , consensus . LeaderPubKey . GetHexString ( ) [ : 10 ] )
return
}
if consensus . blockNum > recvMsg . BlockNum {
return
}
if err = verifyMessageSig ( senderKey , msg ) ; err != nil {
utils . GetLogInstance ( ) . Debug ( "onNewView failed to verify new leader's signature" , "error" , err )
return
}
consensus . vcLock . Lock ( )
defer consensus . vcLock . Unlock ( )
// TODO check total number of sigs > 2f+1
if recvMsg . M1AggSig != nil {
m1Sig := recvMsg . M1AggSig
m1Mask := recvMsg . M1Bitmap
consensus . aggregatedBHPSig = m1Sig
consensus . bhpBitmap = m1Mask
if ! m1Sig . VerifyHash ( m1Mask . AggregatePublic , recvMsg . Payload ) {
utils . GetLogInstance ( ) . Warn ( "onNewView unable to verify aggregated signature of m1 payload" )
return
}
}
if recvMsg . M2AggSig != nil {
m2Sig := recvMsg . M2AggSig
m2Mask := recvMsg . M2Bitmap
if ! m2Sig . VerifyHash ( m2Mask . AggregatePublic , NIL ) {
utils . GetLogInstance ( ) . Warn ( "onNewView unable to verify aggregated signature of m2 payload" )
return
}
}
if len ( recvMsg . Payload ) > 0 && recvMsg . M1AggSig != nil {
//#### Read payload data
blockHash := recvMsg . Payload [ : 32 ]
offset := 32
// 48 byte of multi-sig
multiSig := recvMsg . Payload [ offset : offset + 48 ]
offset += 48
// bitmap
bitmap := recvMsg . Payload [ offset : ]
//#### END Read payload data
aggSig := bls . Sign { }
err := aggSig . Deserialize ( multiSig )
if err != nil {
utils . GetLogInstance ( ) . Warn ( "onNewView unable to deserialize prepared message agg sig" , "err" , err )
return
}
mask , err := bls_cosi . NewMask ( consensus . PublicKeys , nil )
if err != nil {
utils . GetLogInstance ( ) . Warn ( "onNewView unable to setup mask for prepared message" , "err" , err )
return
}
mask . SetMask ( bitmap )
if ! aggSig . VerifyHash ( mask . AggregatePublic , blockHash ) {
utils . GetLogInstance ( ) . Warn ( "onNewView failed to verify signature for prepared message" )
return
}
copy ( consensus . blockHash [ : ] , blockHash )
consensus . aggregatedPrepareSig = & aggSig
consensus . prepareBitmap = mask
//create prepared message?: consensus.pbftLog.AddMessage(recvMsg)
if recvMsg . BlockNum > consensus . blockNum {
return
}
// Construct and send the commit message
multiSigAndBitmap := append ( multiSig , bitmap ... )
msgToSend := consensus . constructCommitMessage ( multiSigAndBitmap )
utils . GetLogInstance ( ) . Info ( "onNewView === commit" , "sent commit message" , len ( msgToSend ) )
consensus . host . SendMessageToGroups ( [ ] p2p . GroupID { p2p . NewGroupIDByShardID ( p2p . ShardID ( consensus . ShardID ) ) } , host . ConstructP2pMessage ( byte ( 17 ) , msgToSend ) )
consensus . phase = Commit
consensus . consensusTimeout [ "commit" ] . Start ( )
} else {
consensus . phase = Announce
consensus . consensusTimeout [ "announce" ] . Start ( )
utils . GetLogInstance ( ) . Info ( "onNewView === announce" )
}
consensus . viewID = consensus . mode . GetViewID ( )
consensus . ResetViewChangeState ( )
consensus . ResetState ( )
consensus . consensusTimeout [ "viewchange" ] . Stop ( )
}