Merge pull request #3379 from LeoHChen/vc-revamp-1

View Change Revamp 1
pull/3391/head
Leo Chen 4 years ago committed by GitHub
commit 4ce64b1387
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/harmony/main.go
  2. 22
      consensus/checks.go
  3. 13
      consensus/config.go
  4. 33
      consensus/consensus.go
  5. 2
      consensus/consensus_msg_sender.go
  6. 83
      consensus/consensus_service.go
  7. 6
      consensus/consensus_v2.go
  8. 127
      consensus/fbft_log.go
  9. 8
      consensus/leader.go
  10. 7
      consensus/quorum/quorum.go
  11. 12
      consensus/validator.go
  12. 475
      consensus/view_change.go
  13. 470
      consensus/view_change_construct.go
  14. 161
      consensus/view_change_msg.go
  15. 44
      consensus/view_change_test.go

@ -632,7 +632,7 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType)
Msg("Init Blockchain")
// Assign closure functions to the consensus object
currentConsensus.BlockVerifier = currentNode.VerifyNewBlock
currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock)
currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing
// update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())

@ -2,6 +2,7 @@ package consensus
import (
"bytes"
"encoding/binary"
protobuf "github.com/golang/protobuf/proto"
libbls "github.com/harmony-one/bls/ffi/go/bls"
@ -165,7 +166,8 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MyViewChangingID", consensus.GetViewChangingID()).
Uint64("MsgViewChangingID", recvMsg.ViewID).
Msg("onViewChange")
Str("SendPubKey", recvMsg.SenderPubkey.Bytes.Hex()).
Msg("[onViewChangeSanityCheck]")
if consensus.blockNum > recvMsg.BlockNum {
consensus.getLogger().Debug().
@ -174,18 +176,28 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
}
if consensus.blockNum < recvMsg.BlockNum {
consensus.getLogger().Warn().
Msg("[onViewChange] New Leader Has Lower Blocknum")
Msg("[onViewChangeSanityCheck] MsgBlockNum is different from my BlockNumber")
return false
}
if consensus.IsViewChangingMode() &&
consensus.GetViewChangingID() > recvMsg.ViewID {
consensus.GetCurBlockViewID() > recvMsg.ViewID {
consensus.getLogger().Warn().
Msg("[onViewChange] ViewChanging ID Is Low")
Msg("[onViewChangeSanityCheck] ViewChanging ID Is Low")
return false
}
if recvMsg.ViewID-consensus.GetViewChangingID() > MaxViewIDDiff {
consensus.getLogger().Debug().
Msg("Received viewID that is MaxViewIDDiff (100) further from the current viewID!")
Msg("[onViewChangeSanityCheck] Received viewID that is MaxViewIDDiff (249) further from the current viewID!")
return false
}
senderKey := recvMsg.SenderPubkey
viewIDHash := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDHash, recvMsg.ViewID)
if !recvMsg.ViewidSig.VerifyHash(senderKey.Object, viewIDHash) {
consensus.getLogger().Warn().
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onViewChangeSanityCheck] Failed to Verify viewID Signature")
return false
}
return true

@ -4,14 +4,15 @@ import "time"
// timeout constant
const (
// The duration of viewChangeTimeout; when a view change is initialized with v+1
// timeout will be equal to viewChangeDuration; if view change failed and start v+2
// timeout will be 2*viewChangeDuration; timeout of view change v+n is n*viewChangeDuration
viewChangeDuration time.Duration = 60 * time.Second
// default timeout configuration is shorten to 45 seconds as the consensus is 5s
viewChangeTimeout = 45
// The duration of viewChangeTimeout for each view change
viewChangeDuration time.Duration = viewChangeTimeout * time.Second
// timeout duration for announce/prepare/commit
phaseDuration time.Duration = 60 * time.Second
bootstrapDuration time.Duration = 600 * time.Second
// shorten the duration from 60 to 45 seconds as the consensus is 5s
phaseDuration time.Duration = viewChangeTimeout * time.Second
bootstrapDuration time.Duration = 120 * time.Second
maxLogSize uint32 = 1000
// threshold between received consensus message blockNum and my blockNum
consensusBlockNumBuffer uint64 = 2

@ -27,6 +27,9 @@ const (
var errLeaderPriKeyNotFound = errors.New("getting leader private key from consensus public keys failed")
// BlockVerifierFunc is a function used to verify the block
type BlockVerifierFunc func(*types.Block) error
// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
Decider quorum.Decider
@ -47,21 +50,6 @@ type Consensus struct {
aggregatedCommitSig *bls_core.Sign
prepareBitmap *bls_cosi.Mask
commitBitmap *bls_cosi.Mask
// Commits collected from view change
// for each viewID, we need keep track of corresponding sigs and bitmap
// until one of the viewID has enough votes (>=2f+1)
// after one of viewID has enough votes, we can reset and clean the map
// honest nodes will never double votes on different viewID
// bhpSigs: blockHashPreparedSigs is the signature on m1 type message
bhpSigs map[uint64]map[string]*bls_core.Sign
// nilSigs: there is no prepared message when view change,
// it's signature on m2 type (i.e. nil) messages
nilSigs map[uint64]map[string]*bls_core.Sign
viewIDSigs map[uint64]map[string]*bls_core.Sign
bhpBitmap map[uint64]*bls_cosi.Mask
nilBitmap map[uint64]*bls_cosi.Mask
viewIDBitmap map[uint64]*bls_cosi.Mask
m1Payload []byte // message payload for type m1 := |vcBlockHash|prepared_agg_sigs|prepared_bitmap|, new leader only need one
// The chain reader for the blockchain this consensus is working on
ChainReader *core.BlockChain
// Minimal number of peers in the shard
@ -85,15 +73,15 @@ type Consensus struct {
IgnoreViewIDCheck *abool.AtomicBool
// consensus mutex
mutex sync.Mutex
// mutex for view change
vcLock sync.Mutex
// ViewChange struct
vc *viewChange
// Signal channel for starting a new consensus process
ReadySignal chan struct{}
// The post-consensus processing func passed from Node object
// Called when consensus on a new block is done
OnConsensusDone func(*types.Block)
// The verifier func passed from Node object
BlockVerifier func(*types.Block) error
BlockVerifier BlockVerifierFunc
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
// will trigger state syncing when blockNum is low
@ -173,6 +161,12 @@ func (consensus *Consensus) GetConsensusLeaderPrivateKey() (*bls.PrivateKeyWrapp
return consensus.GetLeaderPrivateKey(consensus.LeaderPubKey.Object)
}
// SetBlockVerifier sets the block verifier
func (consensus *Consensus) SetBlockVerifier(verifier BlockVerifierFunc) {
consensus.BlockVerifier = verifier
consensus.vc.SetBlockVerifier(verifier)
}
// New create a new Consensus record
func New(
host p2p.Host, shard uint32, leader p2p.Peer, multiBLSPriKey multibls.PrivateKeys,
@ -213,5 +207,8 @@ func New(
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
consensus.IgnoreViewIDCheck = abool.NewBool(false)
// Make Sure Verifier is not null
consensus.vc = newViewChange()
return &consensus, nil
}

@ -13,7 +13,7 @@ import (
const (
// RetryIntervalInSec is the interval for message retry
RetryIntervalInSec = 10
RetryIntervalInSec = 7
)
// MessageSender is the wrapper object that controls how a consensus message is sent

@ -95,9 +95,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys []bls_cosi.PublicKeyWrapper
consensus.UpdateBitmaps()
consensus.ResetState()
consensus.vcLock.Lock()
consensus.ResetViewChangeState()
consensus.vcLock.Unlock()
return consensus.Decider.ParticipantsCount()
}
@ -127,24 +125,6 @@ func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message,
return nil
}
// GetViewIDSigsArray returns the signatures for viewID in viewchange
func (consensus *Consensus) GetViewIDSigsArray(viewID uint64) []*bls_core.Sign {
sigs := []*bls_core.Sign{}
for _, sig := range consensus.viewIDSigs[viewID] {
sigs = append(sigs, sig)
}
return sigs
}
// GetNilSigsArray returns the signatures for nil prepared message in viewchange
func (consensus *Consensus) GetNilSigsArray(viewID uint64) []*bls_core.Sign {
sigs := []*bls_core.Sign{}
for _, sig := range consensus.nilSigs[viewID] {
sigs = append(sigs, sig)
}
return sigs
}
// UpdateBitmaps update the bitmaps for prepare and commit phase
func (consensus *Consensus) UpdateBitmaps() {
consensus.getLogger().Debug().
@ -159,10 +139,7 @@ func (consensus *Consensus) UpdateBitmaps() {
// ResetState resets the state of the consensus
func (consensus *Consensus) ResetState() {
consensus.getLogger().Debug().
Str("Phase", consensus.phase.String()).
Msg("[ResetState] Resetting consensus state")
consensus.switchPhase(FBFTAnnounce, true)
consensus.switchPhase("ResetState", FBFTAnnounce)
consensus.blockHash = [32]byte{}
consensus.block = []byte{}
consensus.Decider.ResetPrepareAndCommitVotes()
@ -250,17 +227,6 @@ func (consensus *Consensus) ReadSignatureBitmapPayload(
)
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.GetCurBlockViewID()).
Interface("phase", consensus.phase).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}
// retrieve corresponding blsPublicKey from Coinbase Address
func (consensus *Consensus) getLeaderPubKeyFromCoinbase(
header *block.Header,
@ -494,31 +460,6 @@ func (consensus *Consensus) NeedsRandomNumberGeneration(epoch *big.Int) bool {
return false
}
func (consensus *Consensus) addViewIDKeyIfNotExist(viewID uint64) {
members := consensus.Decider.Participants()
if _, ok := consensus.bhpSigs[viewID]; !ok {
consensus.bhpSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := consensus.nilSigs[viewID]; !ok {
consensus.nilSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := consensus.viewIDSigs[viewID]; !ok {
consensus.viewIDSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := consensus.bhpBitmap[viewID]; !ok {
bhpBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.bhpBitmap[viewID] = bhpBitmap
}
if _, ok := consensus.nilBitmap[viewID]; !ok {
nilBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.nilBitmap[viewID] = nilBitmap
}
if _, ok := consensus.viewIDBitmap[viewID]; !ok {
viewIDBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.viewIDBitmap[viewID] = viewIDBitmap
}
}
// SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) SetViewIDs(height uint64) {
@ -551,3 +492,25 @@ func (consensus *Consensus) FinishFinalityCount() {
func (consensus *Consensus) GetFinality() int64 {
return consensus.finality
}
// switchPhase will switch FBFTPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) {
consensus.getLogger().Info().
Str("from:", consensus.phase.String()).
Str("to:", desired.String()).
Str("switchPhase:", subject)
consensus.phase = desired
return
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.GetCurBlockViewID()).
Str("phase", consensus.phase.String()).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}

@ -306,11 +306,7 @@ func (consensus *Consensus) tryCatchup() {
break
}
if currentBlockNum < consensus.blockNum {
consensus.getLogger().Info().
Uint64("From", currentBlockNum).
Uint64("To", consensus.blockNum).
Msg("[TryCatchup] Caught up!")
consensus.switchPhase(FBFTAnnounce, true)
consensus.switchPhase("TryCatchup", FBFTAnnounce)
}
// catup up and skip from view change trap
if currentBlockNum < consensus.blockNum &&

@ -11,7 +11,6 @@ import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
)
// FBFTLog represents the log stored by a node during FBFT process
@ -155,7 +154,10 @@ func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNu
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum && msg.(*FBFTMessage).ViewID == viewID && msg.(*FBFTMessage).BlockHash == blockHash {
if msg.(*FBFTMessage).MessageType == typ &&
msg.(*FBFTMessage).BlockNum == blockNum &&
msg.(*FBFTMessage).ViewID == viewID &&
msg.(*FBFTMessage).BlockHash == blockHash {
found = append(found, msg.(*FBFTMessage))
}
}
@ -167,7 +169,8 @@ func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum {
if msg.(*FBFTMessage).MessageType == typ &&
msg.(*FBFTMessage).BlockNum == blockNum {
found = append(found, msg.(*FBFTMessage))
}
}
@ -179,7 +182,9 @@ func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum ui
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum && msg.(*FBFTMessage).BlockHash == blockHash {
if msg.(*FBFTMessage).MessageType == typ &&
msg.(*FBFTMessage).BlockNum == blockNum &&
msg.(*FBFTMessage).BlockHash == blockHash {
found = append(found, msg.(*FBFTMessage))
}
}
@ -262,117 +267,3 @@ func ParseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
return &pbftMsg, nil
}
// ParseViewChangeMessage parses view change message into FBFTMessage structure
func ParseViewChangeMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
pbftMsg := FBFTMessage{}
pbftMsg.MessageType = msg.GetType()
if pbftMsg.MessageType != msg_pb.MessageType_VIEWCHANGE {
return nil, fmt.Errorf("ParseViewChangeMessage: incorrect message type %s", pbftMsg.MessageType)
}
vcMsg := msg.GetViewchange()
pbftMsg.ViewID = vcMsg.ViewId
pbftMsg.BlockNum = vcMsg.BlockNum
pbftMsg.Block = make([]byte, len(vcMsg.PreparedBlock))
copy(pbftMsg.Block[:], vcMsg.PreparedBlock[:])
pbftMsg.Payload = make([]byte, len(vcMsg.Payload))
copy(pbftMsg.Payload[:], vcMsg.Payload[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse senderpubkey")
return nil, err
}
leaderKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.LeaderPubkey)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse leaderpubkey")
return nil, err
}
vcSig := bls_core.Sign{}
err = vcSig.Deserialize(vcMsg.ViewchangeSig)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the viewchange signature")
return nil, err
}
vcSig1 := bls_core.Sign{}
err = vcSig1.Deserialize(vcMsg.ViewidSig)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the viewid signature")
return nil, err
}
pbftMsg.SenderPubkey = &bls.PublicKeyWrapper{Object: pubKey}
copy(pbftMsg.SenderPubkey.Bytes[:], vcMsg.SenderPubkey[:])
pbftMsg.LeaderPubkey = &bls.PublicKeyWrapper{Object: leaderKey}
copy(pbftMsg.LeaderPubkey.Bytes[:], vcMsg.LeaderPubkey[:])
pbftMsg.ViewchangeSig = &vcSig
pbftMsg.ViewidSig = &vcSig1
return &pbftMsg, nil
}
// ParseNewViewMessage parses new view message into FBFTMessage structure
func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
FBFTMsg := FBFTMessage{}
FBFTMsg.MessageType = msg.GetType()
if FBFTMsg.MessageType != msg_pb.MessageType_NEWVIEW {
return nil, fmt.Errorf("ParseNewViewMessage: incorrect message type %s", FBFTMsg.MessageType)
}
vcMsg := msg.GetViewchange()
FBFTMsg.ViewID = vcMsg.ViewId
FBFTMsg.BlockNum = vcMsg.BlockNum
FBFTMsg.Payload = make([]byte, len(vcMsg.Payload))
copy(FBFTMsg.Payload[:], vcMsg.Payload[:])
FBFTMsg.Block = make([]byte, len(vcMsg.PreparedBlock))
copy(FBFTMsg.Block[:], vcMsg.PreparedBlock[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse senderpubkey")
return nil, err
}
FBFTMsg.SenderPubkey = &bls.PublicKeyWrapper{Object: pubKey}
copy(FBFTMsg.SenderPubkey.Bytes[:], vcMsg.SenderPubkey[:])
members := consensus.Decider.Participants()
if len(vcMsg.M3Aggsigs) > 0 {
m3Sig := bls_core.Sign{}
err = m3Sig.Deserialize(vcMsg.M3Aggsigs)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M3 viewID signature")
return nil, err
}
m3mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err
}
m3mask.SetMask(vcMsg.M3Bitmap)
FBFTMsg.M3AggSig = &m3Sig
FBFTMsg.M3Bitmap = m3mask
}
if len(vcMsg.M2Aggsigs) > 0 {
m2Sig := bls_core.Sign{}
err = m2Sig.Deserialize(vcMsg.M2Aggsigs)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M2 aggregated signature")
return nil, err
}
m2mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err
}
m2mask.SetMask(vcMsg.M2Bitmap)
FBFTMsg.M2AggSig = &m2Sig
FBFTMsg.M2Bitmap = m2mask
}
return &FBFTMsg, nil
}

@ -92,11 +92,7 @@ func (consensus *Consensus) announce(block *types.Block) {
Msg("[Announce] Sent Announce Message!!")
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTPrepare.String()).
Msg("[Announce] Switching phase")
consensus.switchPhase(FBFTPrepare, true)
consensus.switchPhase("Announce", FBFTPrepare)
}
func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
@ -186,7 +182,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
if err := consensus.didReachPrepareQuorum(); err != nil {
return
}
consensus.switchPhase(FBFTCommit, true)
consensus.switchPhase("onPrepare", FBFTCommit)
}
//// Read - End
}

@ -72,7 +72,7 @@ type ParticipantTracker interface {
Participants() multibls.PublicKeys
IndexOf(bls.SerializedPublicKey) int
ParticipantsCount() int64
NextAfter(*bls.PublicKeyWrapper) (bool, *bls.PublicKeyWrapper)
NthNext(*bls.PublicKeyWrapper, int) (bool, *bls.PublicKeyWrapper)
UpdateParticipants(pubKeys []bls.PublicKeyWrapper)
}
@ -187,14 +187,15 @@ func (s *cIdentities) IndexOf(pubKey bls.SerializedPublicKey) int {
return -1
}
func (s *cIdentities) NextAfter(pubKey *bls.PublicKeyWrapper) (bool, *bls.PublicKeyWrapper) {
// NthNext return the Nth next pubkey, next can be negative number
func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
found := false
idx := s.IndexOf(pubKey.Bytes)
if idx != -1 {
found = true
}
idx = (idx + 1) % int(s.ParticipantsCount())
idx = (idx + next) % int(s.ParticipantsCount())
return found, &s.publicKeys[idx]
}

@ -86,11 +86,7 @@ func (consensus *Consensus) prepare() {
}
}
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTPrepare.String()).
Msg("[Announce] Switching Phase")
consensus.switchPhase(FBFTPrepare, true)
consensus.switchPhase("Announce", FBFTPrepare)
}
// if onPrepared accepts the prepared message from the leader, then
@ -238,11 +234,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
}
}
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()).
Msg("[OnPrepared] Switching phase")
consensus.switchPhase(FBFTCommit, true)
consensus.switchPhase("onPrepared", FBFTCommit)
}
func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {

@ -1,30 +1,22 @@
package consensus
import (
"bytes"
"encoding/binary"
"encoding/hex"
"sync"
"time"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/consensus/signature"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
// MaxViewIDDiff limits the received view ID to only 100 further from the current view ID
const MaxViewIDDiff = 100
// MaxViewIDDiff limits the received view ID to only 249 further from the current view ID
const MaxViewIDDiff = 249
// State contains current mode and current viewID
type State struct {
@ -98,30 +90,15 @@ func (pm *State) GetViewChangeDuraion() time.Duration {
return time.Duration(diff * diff * int64(viewChangeDuration))
}
// switchPhase will switch FBFTPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(desired FBFTPhase, override bool) {
if override {
consensus.phase = desired
return
}
var nextPhase FBFTPhase
switch consensus.phase {
case FBFTAnnounce:
nextPhase = FBFTPrepare
case FBFTPrepare:
nextPhase = FBFTCommit
case FBFTCommit:
nextPhase = FBFTAnnounce
}
if nextPhase == desired {
consensus.phase = nextPhase
}
}
// GetNextLeaderKey uniquely determine who is the leader for given viewID
func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKeyWrapper {
wasFound, next := consensus.Decider.NextAfter(consensus.LeaderPubKey)
func (consensus *Consensus) GetNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper {
gap := 1
consensus.getLogger().Info().
Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()).
Uint64("newViewID", viewID).
Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()).
Msg("[GetNextLeaderKey] got leaderPubKey from coinbase")
wasFound, next := consensus.Decider.NthNext(consensus.LeaderPubKey, gap)
if !wasFound {
consensus.getLogger().Warn().
Str("key", consensus.LeaderPubKey.Bytes.Hex()).
@ -130,22 +107,6 @@ func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKeyWrapper {
return next
}
// ResetViewChangeState reset the state for viewchange
func (consensus *Consensus) ResetViewChangeState() {
consensus.getLogger().Debug().
Str("Phase", consensus.phase.String()).
Msg("[ResetViewChangeState] Resetting view change state")
consensus.current.SetMode(Normal)
consensus.m1Payload = []byte{}
consensus.bhpSigs = map[uint64]map[string]*bls_core.Sign{}
consensus.nilSigs = map[uint64]map[string]*bls_core.Sign{}
consensus.viewIDSigs = map[uint64]map[string]*bls_core.Sign{}
consensus.bhpBitmap = map[uint64]*bls_cosi.Mask{}
consensus.nilBitmap = map[uint64]*bls_cosi.Mask{}
consensus.viewIDBitmap = map[uint64]*bls_cosi.Mask{}
consensus.Decider.ResetViewChangeVotes()
}
func createTimeout() map[TimeoutType]*utils.Timeout {
timeouts := make(map[TimeoutType]*utils.Timeout)
timeouts[timeoutConsensus] = utils.NewTimeout(phaseDuration)
@ -154,7 +115,8 @@ func createTimeout() map[TimeoutType]*utils.Timeout {
return timeouts
}
// startViewChange send a new view change
// startViewChange send a new view change
// the viewID is the current viewID
func (consensus *Consensus) startViewChange(viewID uint64) {
if consensus.disableViewChange {
return
@ -163,15 +125,33 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.current.SetMode(ViewChanging)
consensus.SetViewChangingID(viewID)
consensus.LeaderPubKey = consensus.GetNextLeaderKey()
consensus.LeaderPubKey = consensus.GetNextLeaderKey(viewID)
duration := consensus.current.GetViewChangeDuraion()
consensus.getLogger().Warn().
Uint64("viewID", viewID).
Uint64("viewChangingID", consensus.GetViewChangingID()).
Dur("timeoutDuration", duration).
Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()).
Msg("[startViewChange]")
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
defer consensus.consensusTimeout[timeoutViewChange].Start()
// update the dictionary key if the viewID is first time received
consensus.vc.AddViewIDKeyIfNotExist(viewID, consensus.Decider.Participants())
// init my own payload
if err := consensus.vc.InitPayload(
consensus.FBFTLog,
viewID,
consensus.blockNum,
consensus.priKey); err != nil {
consensus.getLogger().Error().Err(err).Msg("Init Payload Error")
}
// for view change, send separate view change per public key
// do not do multi-sign of view change message
for _, key := range consensus.priKey {
if !consensus.IsValidatorInCommittee(key.Pub.Bytes) {
continue
@ -183,21 +163,26 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
p2p.ConstructMessage(msgToSend),
)
}
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
consensus.consensusTimeout[timeoutViewChange].Start()
}
// onViewChange is called when the view change message is received.
func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.getLogger().Info().Msg("[onViewChange] Received ViewChange Message")
recvMsg, err := ParseViewChangeMessage(msg)
if err != nil {
consensus.getLogger().Warn().Msg("[onViewChange] Unable To Parse Viewchange Message")
consensus.getLogger().Warn().Err(err).Msg("[onViewChange] Unable To Parse Viewchange Message")
return
}
// if not leader, noop
newLeaderKey := recvMsg.LeaderPubkey
newLeaderPriKey, err := consensus.GetLeaderPrivateKey(newLeaderKey.Object)
if err != nil {
consensus.getLogger().Info().
Err(err).
Str("Sender", recvMsg.SenderPubkey.Bytes.Hex()).
Str("NextLeader", recvMsg.LeaderPubkey.Bytes.Hex()).
Str("myBLSPubKey", consensus.priKey.GetPublicKeys().SerializeToHexStr()).
Msg("[onViewChange] I am not the Leader")
return
}
@ -206,6 +191,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)).
Int64("need", consensus.Decider.TwoThirdsSignersCount()).
Str("validatorPubKey", recvMsg.SenderPubkey.Bytes.Hex()).
Str("newLeaderKey", newLeaderKey.Bytes.Hex()).
Msg("[onViewChange] Received Enough View Change Messages")
return
}
@ -214,214 +200,36 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
senderKey := recvMsg.SenderPubkey
consensus.vcLock.Lock()
defer consensus.vcLock.Unlock()
// update the dictionary key if the viewID is first time received
consensus.addViewIDKeyIfNotExist(recvMsg.ViewID)
// TODO: remove NIL type message
// add self m1 or m2 type message signature and bitmap
_, ok1 := consensus.nilSigs[recvMsg.ViewID][newLeaderKey.Bytes.Hex()]
_, ok2 := consensus.bhpSigs[recvMsg.ViewID][newLeaderKey.Bytes.Hex()]
if !(ok1 || ok2) {
// add own signature for newview message
preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeq(
msg_pb.MessageType_PREPARED, recvMsg.BlockNum,
)
preparedMsg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs)
hasBlock := false
if preparedMsg != nil {
if preparedBlock := consensus.FBFTLog.GetBlockByHash(
preparedMsg.BlockHash,
); preparedBlock != nil {
if consensus.BlockVerifier(preparedBlock); err != nil {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] My own prepared block verification failed")
} else {
hasBlock = true
}
}
}
if hasBlock {
consensus.getLogger().Info().Msg("[onViewChange] add my M1 type messaage")
msgToSign := append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
for i, key := range consensus.priKey {
if err := consensus.bhpBitmap[recvMsg.ViewID].SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().Msgf("[onViewChange] bhpBitmap setkey failed for key at index %d", i)
continue
}
consensus.bhpSigs[recvMsg.ViewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(msgToSign)
}
// if m1Payload is empty, we just add one
if len(consensus.m1Payload) == 0 {
consensus.m1Payload = append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
}
} else {
consensus.getLogger().Info().Msg("[onViewChange] add my M2(NIL) type messaage")
for i, key := range consensus.priKey {
if err := consensus.nilBitmap[recvMsg.ViewID].SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().Msgf("[onViewChange] nilBitmap setkey failed for key at index %d", i)
continue
}
consensus.nilSigs[recvMsg.ViewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(NIL)
}
}
}
// add self m3 type message signature and bitmap
_, ok3 := consensus.viewIDSigs[recvMsg.ViewID][newLeaderKey.Bytes.Hex()]
if !ok3 {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
for i, key := range consensus.priKey {
if err := consensus.viewIDBitmap[recvMsg.ViewID].SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().Msgf("[onViewChange] viewIDBitmap setkey failed for key at index %d", i)
continue
}
consensus.viewIDSigs[recvMsg.ViewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(viewIDBytes)
}
}
preparedBlock := &types.Block{}
hasBlock := false
if len(recvMsg.Payload) != 0 && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onViewChange] Unparseable prepared block data")
return
}
hasBlock = true
}
// m2 type message
if !hasBlock {
_, ok := consensus.nilSigs[recvMsg.ViewID][senderKey.Bytes.Hex()]
if ok {
consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Already Received M2 message from validator")
return
}
if !recvMsg.ViewchangeSig.VerifyHash(senderKey.Object, NIL) {
consensus.getLogger().Warn().Msg("[onViewChange] Failed To Verify Signature For M2 Type Viewchange Message")
return
}
consensus.getLogger().Info().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Add M2 (NIL) type message")
consensus.nilSigs[recvMsg.ViewID][senderKey.Bytes.Hex()] = recvMsg.ViewchangeSig
consensus.nilBitmap[recvMsg.ViewID].SetKey(recvMsg.SenderPubkey.Bytes, true) // Set the bitmap indicating that this validator signed.
} else { // m1 type message
if consensus.BlockVerifier(preparedBlock); err != nil {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] Prepared block verification failed")
return
}
_, ok := consensus.bhpSigs[recvMsg.ViewID][senderKey.Bytes.Hex()]
if ok {
consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Already Received M1 Message From the Validator")
return
}
if !recvMsg.ViewchangeSig.VerifyHash(recvMsg.SenderPubkey.Object, recvMsg.Payload) {
consensus.getLogger().Warn().Msg("[onViewChange] Failed to Verify Signature for M1 Type Viewchange Message")
return
}
// first time receive m1 type message, need verify validity of prepared message
if len(consensus.m1Payload) == 0 || !bytes.Equal(consensus.m1Payload, recvMsg.Payload) {
if len(recvMsg.Payload) <= 32 {
consensus.getLogger().Warn().
Int("len", len(recvMsg.Payload)).
Msg("[onViewChange] M1 RecvMsg Payload Not Enough Length")
return
}
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] M1 RecvMsg Payload Read Error")
return
}
if !consensus.Decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn().
Msgf("[onViewChange] Quorum Not achieved")
return
}
// Verify the multi-sig for prepare phase
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
consensus.getLogger().Warn().
Hex("blockHash", blockHash).
Msg("[onViewChange] failed to verify multi signature for m1 prepared payload")
return
}
// if m1Payload is empty, we just add one
if len(consensus.m1Payload) == 0 {
consensus.m1Payload = append(recvMsg.Payload[:0:0], recvMsg.Payload...)
// create prepared message for new leader
preparedMsg := FBFTMessage{
MessageType: msg_pb.MessageType_PREPARED,
ViewID: recvMsg.ViewID,
BlockNum: recvMsg.BlockNum,
}
preparedMsg.BlockHash = common.Hash{}
copy(preparedMsg.BlockHash[:], recvMsg.Payload[:32])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = newLeaderKey
consensus.getLogger().Info().Msg("[onViewChange] New Leader Prepared Message Added")
consensus.FBFTLog.AddMessage(&preparedMsg)
consensus.FBFTLog.AddBlock(preparedBlock)
}
}
consensus.getLogger().Info().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Add M1 (prepared) type message")
consensus.bhpSigs[recvMsg.ViewID][senderKey.Bytes.Hex()] = recvMsg.ViewchangeSig
consensus.bhpBitmap[recvMsg.ViewID].SetKey(recvMsg.SenderPubkey.Bytes, true) // Set the bitmap indicating that this validator signed.
}
// check and add viewID (m3 type) message signature
if _, ok := consensus.viewIDSigs[recvMsg.ViewID][senderKey.Bytes.Hex()]; ok {
consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Already Received M3(ViewID) message from the validator")
members := consensus.Decider.Participants()
consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members)
// do it once only per viewID/Leader
if err := consensus.vc.InitPayload(consensus.FBFTLog,
recvMsg.ViewID,
recvMsg.BlockNum,
consensus.priKey); err != nil {
consensus.getLogger().Error().Err(err).Msg("Init Payload Error")
return
}
viewIDHash := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDHash, recvMsg.ViewID)
if !recvMsg.ViewidSig.VerifyHash(recvMsg.SenderPubkey.Object, viewIDHash) {
consensus.getLogger().Warn().
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onViewChange] Failed to Verify M3 Message Signature")
err = consensus.vc.ProcessViewChangeMsg(consensus.FBFTLog, consensus.Decider, recvMsg)
if err != nil {
consensus.getLogger().Error().Err(err).
Uint64("viewID", recvMsg.ViewID).
Uint64("blockNum", recvMsg.BlockNum).
Str("msgSender", recvMsg.SenderPubkey.Bytes.Hex()).
Msg("Verify View Change Message Error")
return
}
consensus.getLogger().Info().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Add M3 (ViewID) type message")
consensus.viewIDSigs[recvMsg.ViewID][senderKey.Bytes.Hex()] = recvMsg.ViewidSig
// Set the bitmap indicating that this validator signed.
consensus.viewIDBitmap[recvMsg.ViewID].SetKey(recvMsg.SenderPubkey.Bytes, true)
consensus.getLogger().Info().
Int("have", len(consensus.viewIDSigs[recvMsg.ViewID])).
Int64("total", consensus.Decider.ParticipantsCount()).
Msg("[onViewChange]")
// received enough view change messages, change state to normal consensus
if consensus.Decider.IsQuorumAchievedByMask(consensus.viewIDBitmap[recvMsg.ViewID]) {
if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) {
consensus.getLogger().Info().Msg("[onViewChange] View Change Message Quorum Reached")
consensus.current.SetMode(Normal)
consensus.LeaderPubKey = newLeaderKey
consensus.ResetState()
if len(consensus.m1Payload) == 0 {
if consensus.vc.IsM1PayloadEmpty() {
// TODO(Chao): explain why ReadySignal is sent only in this case but not the other case.
// Make sure the newly proposed block have the correct view ID
consensus.SetCurBlockViewID(recvMsg.ViewID)
@ -429,13 +237,10 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.ReadySignal <- struct{}{}
}()
} else {
consensus.getLogger().Info().
Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(FBFTCommit, true)
copy(consensus.blockHash[:], consensus.m1Payload[:32])
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(consensus.m1Payload, 32)
consensus.switchPhase("onViewChange", FBFTCommit)
payload := consensus.vc.GetM1Payload()
copy(consensus.blockHash[:], payload[:32])
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32)
if err != nil {
consensus.getLogger().Error().Err(err).
@ -455,7 +260,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64())
for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().
consensus.getLogger().Warn().Err(err).
Msgf("[OnViewChange] New Leader commit bitmap set failed for key at index %d", i)
continue
}
@ -468,22 +273,18 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
block.NumberU64(),
block.Header().ViewID().Uint64(),
); err != nil {
consensus.getLogger().Warn().Msg("submit vote on viewchange commit failed")
consensus.getLogger().Warn().Err(err).Msg("submit vote on viewchange commit failed")
return
}
}
}
consensus.SetViewChangingID(recvMsg.ViewID)
consensus.SetViewIDs(recvMsg.ViewID)
msgToSend := consensus.constructNewViewMessage(
recvMsg.ViewID, newLeaderPriKey,
)
consensus.getLogger().Warn().
Int("payloadSize", len(consensus.m1Payload)).
Hex("M1Payload", consensus.m1Payload).
Msg("[onViewChange] Sent NewView Message")
if err := consensus.msgSender.SendWithRetry(
consensus.blockNum,
msg_pb.MessageType_NEWVIEW,
@ -494,105 +295,65 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.getLogger().Err(err).
Msg("could not send out the NEWVIEW message")
}
consensus.getLogger().Info().
Str("myKey", newLeaderKey.Bytes.Hex()).
Hex("M1Payload", consensus.vc.GetM1Payload()).
Msg("[onViewChange] Sent NewView Messge")
consensus.SetCurBlockViewID(recvMsg.ViewID)
consensus.ResetViewChangeState()
consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().Str("myKey", newLeaderKey.Bytes.Hex()).Msg("[onViewChange] I am the New Leader")
consensus.getLogger().Info().
Str("myKey", newLeaderKey.Bytes.Hex()).
Msg("[onViewChange] I am the New Leader")
}
}
// TODO: move to consensus_leader.go later
// onNewView is called when validators received newView message from the new leader
// the validator needs to check the m3bitmap to see if the quorum is reached
// If the new view message contains payload (block), and at least one m1 message was
// collected by the new leader (m3count > m2count), the validator will create a new
// prepared message from the payload and commit it to the block
// Or the validator will enter announce phase to wait for the new block proposed
// from the new leader
func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
consensus.getLogger().Info().Msg("[onNewView] Received NewView Message")
recvMsg, err := consensus.ParseNewViewMessage(msg)
members := consensus.Decider.Participants()
recvMsg, err := ParseNewViewMessage(msg, members)
if err != nil {
consensus.getLogger().Warn().Err(err).Msg("[onNewView] Unable to Parse NewView Message")
return
}
if !consensus.onNewViewSanityCheck(recvMsg) {
// change view and leaderKey to keep in sync with network
if consensus.blockNum != recvMsg.BlockNum {
consensus.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("myBlockNum", consensus.blockNum).
Msg("[onNewView] Invalid block number")
return
}
senderKey := recvMsg.SenderPubkey
consensus.vcLock.Lock()
defer consensus.vcLock.Unlock()
if recvMsg.M3AggSig == nil || recvMsg.M3Bitmap == nil {
consensus.getLogger().Error().Msg("[onNewView] M3AggSig or M3Bitmap is nil")
if !consensus.onNewViewSanityCheck(recvMsg) {
return
}
m3Sig := recvMsg.M3AggSig
m3Mask := recvMsg.M3Bitmap
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) {
consensus.getLogger().Warn().
Msgf("[onNewView] Quorum Not achieved")
preparedBlock, err := consensus.vc.VerifyNewViewMsg(recvMsg)
if err != nil {
consensus.getLogger().Warn().Err(err).Msg("[onNewView] Verify New View Msg Failed")
return
}
if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDBytes) {
m3Mask := recvMsg.M3Bitmap
if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) {
consensus.getLogger().Warn().
Str("m3Sig", m3Sig.SerializeToHexStr()).
Hex("m3Mask", m3Mask.Bitmap).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onNewView] Unable to Verify Aggregated Signature of M3 (ViewID) payload")
Msgf("[onNewView] Quorum Not achieved")
return
}
m2Mask := recvMsg.M2Bitmap
if recvMsg.M2AggSig != nil {
consensus.getLogger().Info().Msg("[onNewView] M2AggSig (NIL) is Not Empty")
m2Sig := recvMsg.M2AggSig
if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) {
consensus.getLogger().Warn().
Msg("[onNewView] Unable to Verify Aggregated Signature of M2 (NIL) payload")
return
}
}
// check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty
preparedBlock := &types.Block{}
hasBlock := false
if len(recvMsg.Payload) != 0 && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onNewView] Unparseable prepared block data")
return
}
blockHash := recvMsg.Payload[:32]
preparedBlockHash := preparedBlock.Hash()
if !bytes.Equal(preparedBlockHash[:], blockHash) {
consensus.getLogger().Warn().
Err(err).
Str("blockHash", preparedBlock.Hash().Hex()).
Str("payloadBlockHash", hex.EncodeToString(blockHash)).
Msg("[onNewView] Prepared block hash doesn't match msg block hash.")
return
}
hasBlock = true
if consensus.BlockVerifier(preparedBlock); err != nil {
consensus.getLogger().Error().Err(err).Msg("[onNewView] Prepared block verification failed")
return
}
}
if m2Mask == nil || m2Mask.Bitmap == nil ||
(m2Mask != nil && m2Mask.Bitmap != nil &&
utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) {
if len(recvMsg.Payload) <= 32 {
consensus.getLogger().Info().
Msg("[onNewView] M1 (prepared) Type Payload Not Have Enough Length")
return
}
// m1 is not empty, check it's valid
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
@ -619,30 +380,26 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
copy(preparedMsg.BlockHash[:], blockHash[:])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = senderKey
preparedMsg.SenderPubkey = recvMsg.SenderPubkey
consensus.FBFTLog.AddMessage(&preparedMsg)
if hasBlock {
if preparedBlock != nil {
consensus.FBFTLog.AddBlock(preparedBlock)
}
}
if !consensus.IsViewChangingMode() {
consensus.getLogger().Info().Msg("Not in ViewChanging Mode.")
return
}
// newView message verified success, override my state
consensus.SetViewIDs(recvMsg.ViewID)
consensus.LeaderPubKey = senderKey
consensus.LeaderPubKey = recvMsg.SenderPubkey
consensus.ResetViewChangeState()
// change view and leaderKey to keep in sync with network
if consensus.blockNum != recvMsg.BlockNum {
consensus.getLogger().Info().
Str("newLeaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onNewView] New Leader Changed")
return
}
// NewView message is verified, change state to normal consensus
if hasBlock {
if preparedBlock != nil {
// Construct and send the commit message
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader,
preparedBlock.Epoch(), preparedBlock.Hash(), preparedBlock.NumberU64(), preparedBlock.Header().ViewID().Uint64())
@ -659,7 +416,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
)
if err != nil {
consensus.getLogger().Err(err).Msg("could not create commit message")
return
continue
}
msgToSend := network.Bytes
consensus.getLogger().Info().Msg("onNewView === commit")
@ -668,11 +425,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
p2p.ConstructMessage(msgToSend),
)
}
consensus.getLogger().Info().
Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(FBFTCommit, true)
consensus.switchPhase("onNewView", FBFTCommit)
} else {
consensus.ResetState()
consensus.getLogger().Info().Msg("onNewView === announce")
@ -680,8 +433,16 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
consensus.getLogger().Info().
Str("newLeaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Msg("new leader changed")
consensus.getLogger().Info().
Msg("validator start consensus timer and stop view change timer")
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.consensusTimeout[timeoutViewChange].Stop()
}
// ResetViewChangeState resets the view change structure
func (consensus *Consensus) ResetViewChangeState() {
consensus.getLogger().Info().
Str("Phase", consensus.phase.String()).
Msg("[ResetViewChangeState] Resetting view change state")
consensus.current.SetMode(Normal)
consensus.vc.Reset()
consensus.Decider.ResetViewChangeVotes()
}

@ -0,0 +1,470 @@
package consensus
import (
"bytes"
"encoding/binary"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/types"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/crypto/bls"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
const (
// ValidPayloadLength is the valid length for viewchange payload
ValidPayloadLength = 32 + bls.BLSSignatureSizeInBytes
)
// viewChange encapsulate all the view change related data structure and functions
type viewChange struct {
// Commits collected from view change
// for each viewID, we need keep track of corresponding sigs and bitmap
// until one of the viewID has enough votes (>=2f+1)
// after one of viewID has enough votes, we can reset and clean the map
// honest nodes will never double votes on different viewID
// bhpSigs: blockHashPreparedSigs is the signature on m1 type message
bhpSigs map[uint64]map[string]*bls_core.Sign
// nilSigs: there is no prepared message when view change,
// it's signature on m2 type (i.e. nil) messages
nilSigs map[uint64]map[string]*bls_core.Sign
viewIDSigs map[uint64]map[string]*bls_core.Sign
bhpBitmap map[uint64]*bls_cosi.Mask
nilBitmap map[uint64]*bls_cosi.Mask
viewIDBitmap map[uint64]*bls_cosi.Mask
newViewMsg map[uint64]map[string]uint64
m1Payload []byte // message payload for type m1 := |vcBlockHash|prepared_agg_sigs|prepared_bitmap|, new leader only need one
blockVerifier BlockVerifierFunc
viewChangeDuration time.Duration
// mutex for view change
vcLock sync.RWMutex
}
// newViewChange returns a new viewChange object
func newViewChange() *viewChange {
vc := viewChange{}
vc.Reset()
return &vc
}
// SetBlockVerifier ..
func (vc *viewChange) SetBlockVerifier(verifier BlockVerifierFunc) {
vc.blockVerifier = verifier
}
// AddViewIDKeyIfNotExist ..
func (vc *viewChange) AddViewIDKeyIfNotExist(viewID uint64, members multibls.PublicKeys) {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
if _, ok := vc.bhpSigs[viewID]; !ok {
vc.bhpSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := vc.nilSigs[viewID]; !ok {
vc.nilSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := vc.viewIDSigs[viewID]; !ok {
vc.viewIDSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := vc.bhpBitmap[viewID]; !ok {
bhpBitmap, _ := bls_cosi.NewMask(members, nil)
vc.bhpBitmap[viewID] = bhpBitmap
}
if _, ok := vc.nilBitmap[viewID]; !ok {
nilBitmap, _ := bls_cosi.NewMask(members, nil)
vc.nilBitmap[viewID] = nilBitmap
}
if _, ok := vc.viewIDBitmap[viewID]; !ok {
viewIDBitmap, _ := bls_cosi.NewMask(members, nil)
vc.viewIDBitmap[viewID] = viewIDBitmap
}
}
// Reset reset the state for viewchange
func (vc *viewChange) Reset() {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
vc.m1Payload = []byte{}
vc.bhpSigs = map[uint64]map[string]*bls_core.Sign{}
vc.nilSigs = map[uint64]map[string]*bls_core.Sign{}
vc.viewIDSigs = map[uint64]map[string]*bls_core.Sign{}
vc.bhpBitmap = map[uint64]*bls_cosi.Mask{}
vc.nilBitmap = map[uint64]*bls_cosi.Mask{}
vc.viewIDBitmap = map[uint64]*bls_cosi.Mask{}
vc.newViewMsg = map[uint64]map[string]uint64{}
}
// GetPreparedBlock returns the prepared block or nil if not found
func (vc *viewChange) GetPreparedBlock(fbftlog *FBFTLog, hash [32]byte) ([]byte, []byte) {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
if !vc.isM1PayloadEmpty() {
block := fbftlog.GetBlockByHash(hash)
if block != nil {
encodedBlock, err := rlp.EncodeToBytes(block)
if err != nil || len(encodedBlock) == 0 {
vc.getLogger().Err(err).Msg("[GetPreparedBlock] Failed encoding prepared block")
return vc.m1Payload, nil
}
return vc.m1Payload, encodedBlock
}
}
return vc.m1Payload, nil
}
// GetM2Bitmap returns the nilBitmap as M2Bitmap
func (vc *viewChange) GetM2Bitmap(viewID uint64) ([]byte, []byte) {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
sig2arr := []*bls_core.Sign{}
for _, sig := range vc.nilSigs[viewID] {
sig2arr = append(sig2arr, sig)
}
if len(sig2arr) > 0 {
m2Sig := bls_cosi.AggregateSig(sig2arr)
vc.getLogger().Info().Int("len", len(sig2arr)).Msg("[GetM2Bitmap] M2 (NIL) type signatures")
return m2Sig.Serialize(), vc.nilBitmap[viewID].Bitmap
}
vc.getLogger().Info().Uint64("viewID", viewID).Msg("[GetM2Bitmap] No M2 (NIL) type signatures")
return nil, nil
}
// GetM3Bitmap returns the viewIDBitmap as M3Bitmap
func (vc *viewChange) GetM3Bitmap(viewID uint64) ([]byte, []byte) {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
sig3arr := []*bls_core.Sign{}
for _, sig := range vc.viewIDSigs[viewID] {
sig3arr = append(sig3arr, sig)
}
// even we check here for safty, m3 type signatures must >= 2f+1
if len(sig3arr) > 0 {
m3Sig := bls_cosi.AggregateSig(sig3arr)
vc.getLogger().Info().Int("len", len(sig3arr)).Msg("[GetM3Bitmap] M3 (ViewID) type signatures")
return m3Sig.Serialize(), vc.viewIDBitmap[viewID].Bitmap
}
vc.getLogger().Info().Uint64("viewID", viewID).Msg("[GetM3Bitmap] No M3 (ViewID) type signatures")
return nil, nil
}
// VerifyNewViewMsg returns true if the new view message is valid
func (vc *viewChange) VerifyNewViewMsg(recvMsg *FBFTMessage) (*types.Block, error) {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
if recvMsg.M3AggSig == nil || recvMsg.M3Bitmap == nil {
return nil, errors.New("[VerifyNewViewMsg] M3AggSig or M3Bitmap is nil")
}
senderKey := recvMsg.SenderPubkey
senderKeyStr := senderKey.Bytes.Hex()
// first time received the new view message
_, ok := vc.newViewMsg[recvMsg.ViewID]
if !ok {
newViewMap := map[string]uint64{}
vc.newViewMsg[recvMsg.ViewID] = newViewMap
}
_, ok = vc.newViewMsg[recvMsg.ViewID][senderKeyStr]
if ok {
vc.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MyBlockNum", vc.newViewMsg[recvMsg.ViewID][senderKeyStr]).
Msg("[VerifyNewViewMsg] redundant NewView Message")
}
vc.newViewMsg[recvMsg.ViewID][senderKeyStr] = recvMsg.BlockNum
m3Sig := recvMsg.M3AggSig
m3Mask := recvMsg.M3Bitmap
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDBytes) {
return nil, errors.New("[VerifyNewViewMsg] Unable to Verify Aggregated Signature of M3 (ViewID) payload")
}
m2Mask := recvMsg.M2Bitmap
if recvMsg.M2AggSig != nil {
m2Sig := recvMsg.M2AggSig
if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) {
return nil, errors.New("[VerifyNewViewMsg] Unable to Verify Aggregated Signature of M2 (NIL) payload")
}
}
// check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty
preparedBlock := &types.Block{}
if len(recvMsg.Payload) >= ValidPayloadLength && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
return nil, errors.New("[VerifyNewViewMsg] Unparseable prepared block data")
}
blockHash := recvMsg.Payload[:32]
preparedBlockHash := preparedBlock.Hash()
if !bytes.Equal(preparedBlockHash[:], blockHash) {
return nil, errors.New("[VerifyNewViewMsg] Prepared block hash doesn't match msg block hash")
}
if err := vc.blockVerifier(preparedBlock); err != nil {
return nil, errors.New("[VerifyNewViewMsg] Prepared block verification failed")
}
return preparedBlock, nil
}
return nil, nil
}
var (
errDupM1 = errors.New("received M1 (prepared) message already")
errDupM2 = errors.New("received M2 (NIL) message already")
errDupM3 = errors.New("received M3 (ViewID) message already")
errVerifyM1 = errors.New("failed to verfiy signature for M1 message")
errVerifyM2 = errors.New("failed to verfiy signature for M2 message")
errM1Payload = errors.New("failed to verify multi signature for M1 prepared payload")
errNoQuorum = errors.New("no quorum on M1 (prepared) payload")
)
// ProcessViewChangeMsg process the view change message after verification
func (vc *viewChange) ProcessViewChangeMsg(
fbftlog *FBFTLog,
decider quorum.Decider,
recvMsg *FBFTMessage,
) error {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
preparedBlock := &types.Block{}
senderKey := recvMsg.SenderPubkey
senderKeyStr := senderKey.Bytes.Hex()
// check and add viewID (m3 type) message signature
if _, ok := vc.viewIDSigs[recvMsg.ViewID][senderKeyStr]; ok {
return errDupM3
}
if len(recvMsg.Payload) >= ValidPayloadLength && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
return err
}
if err := vc.blockVerifier(preparedBlock); err != nil {
return err
}
_, ok := vc.bhpSigs[recvMsg.ViewID][senderKeyStr]
if ok {
return errDupM1
}
if !recvMsg.ViewchangeSig.VerifyHash(senderKey.Object, recvMsg.Payload) {
return errVerifyM1
}
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := chain.ReadSignatureBitmapByPublicKeys(recvMsg.Payload[32:], decider.Participants())
if err != nil {
return err
}
if !decider.IsQuorumAchievedByMask(mask) {
return errNoQuorum
}
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
return errM1Payload
}
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M1 (prepared) type message")
vc.bhpSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewchangeSig
vc.bhpBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true) // Set the bitmap indicating that this validator signed.
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M3 (ViewID) type message")
vc.viewIDSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewidSig
// Set the bitmap indicating that this validator signed.
vc.viewIDBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true)
if vc.isM1PayloadEmpty() {
vc.m1Payload = append(recvMsg.Payload[:0:0], recvMsg.Payload...)
// create prepared message for new leader
preparedMsg := FBFTMessage{
MessageType: msg_pb.MessageType_PREPARED,
ViewID: recvMsg.ViewID,
BlockNum: recvMsg.BlockNum,
}
preparedMsg.BlockHash = common.Hash{}
copy(preparedMsg.BlockHash[:], recvMsg.Payload[:32])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = recvMsg.LeaderPubkey
vc.getLogger().Info().Msg("[ProcessViewChangeMsg] New Leader Prepared Message Added")
fbftlog.AddMessage(&preparedMsg)
fbftlog.AddBlock(preparedBlock)
}
return nil
}
_, ok := vc.nilSigs[recvMsg.ViewID][senderKeyStr]
if ok {
return errDupM2
}
if !recvMsg.ViewchangeSig.VerifyHash(senderKey.Object, NIL) {
return errVerifyM2
}
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M2 (NIL) type message")
vc.nilSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewchangeSig
vc.nilBitmap[recvMsg.ViewID].SetKey(recvMsg.SenderPubkey.Bytes, true) // Set the bitmap indicating that this validator signed.
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M3 (ViewID) type message")
vc.viewIDSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewidSig
// Set the bitmap indicating that this validator signed.
vc.viewIDBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true)
return nil
}
// InitPayload do it once when validator received view change message
func (vc *viewChange) InitPayload(
fbftlog *FBFTLog,
viewID uint64,
blockNum uint64,
privKeys multibls.PrivateKeys,
) error {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
// m1 or m2 init once per viewID/key.
// m1 and m2 are mutually exclusive.
// If the node has valid prepared block, it will add m1 signature.
// If not, the node will add m2 signature.
// Honest node should have only one kind of m1/m2 signature.
inited := false
for _, key := range privKeys {
_, ok1 := vc.bhpSigs[viewID][key.Pub.Bytes.Hex()]
_, ok2 := vc.nilSigs[viewID][key.Pub.Bytes.Hex()]
if ok1 || ok2 {
inited = true
break
}
}
// add my own M1/M2 type message signature and bitmap
if !inited {
preparedMsgs := fbftlog.GetMessagesByTypeSeq(msg_pb.MessageType_PREPARED, blockNum)
preparedMsg := fbftlog.FindMessageByMaxViewID(preparedMsgs)
hasBlock := false
if preparedMsg != nil {
if preparedBlock := fbftlog.GetBlockByHash(preparedMsg.BlockHash); preparedBlock != nil {
if err := vc.blockVerifier(preparedBlock); err == nil {
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M1 (prepared) type messaage")
msgToSign := append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
for _, key := range privKeys {
if err := vc.bhpBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] bhpBitmap setkey failed")
continue
}
vc.bhpSigs[viewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(msgToSign)
}
hasBlock = true
// if m1Payload is empty, we just add one
if vc.isM1PayloadEmpty() {
vc.m1Payload = append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
}
}
}
}
if !hasBlock {
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M2 (NIL) type messaage")
for _, key := range privKeys {
if err := vc.nilBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] nilBitmap setkey failed")
continue
}
vc.nilSigs[viewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(NIL)
}
}
}
inited = false
for _, key := range privKeys {
_, ok3 := vc.viewIDSigs[viewID][key.Pub.Bytes.Hex()]
if ok3 {
inited = true
break
}
}
// add my own M3 type message signature and bitmap
if !inited {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, viewID)
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M3 (ViewID) type messaage")
for _, key := range privKeys {
if err := vc.viewIDBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] viewIDBitmap setkey failed")
continue
}
vc.viewIDSigs[viewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(viewIDBytes)
}
}
return nil
}
// isM1PayloadEmpty returns true if m1Payload is not set
func (vc *viewChange) isM1PayloadEmpty() bool {
return len(vc.m1Payload) == 0
}
// IsM1PayloadEmpty returns true if m1Payload is not set
func (vc *viewChange) IsM1PayloadEmpty() bool {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
return vc.isM1PayloadEmpty()
}
// GetViewIDBitmap returns the viewIDBitmap
func (vc *viewChange) GetViewIDBitmap(viewID uint64) *bls_cosi.Mask {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
return vc.viewIDBitmap[viewID]
}
// GetM1Payload returns the m1Payload
func (vc *viewChange) GetM1Payload() []byte {
return vc.m1Payload
}
// getLogger returns logger for view change contexts added
func (vc *viewChange) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Str("context", "ViewChange").
Logger()
return &logger
}

@ -7,9 +7,13 @@ import (
"github.com/ethereum/go-ethereum/rlp"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/multibls"
"github.com/pkg/errors"
)
// construct the view change message
@ -36,7 +40,7 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra
var encodedBlock []byte
if preparedMsg != nil {
block := consensus.FBFTLog.GetBlockByHash(preparedMsg.BlockHash)
consensus.getLogger().Debug().
consensus.getLogger().Info().
Interface("Block", block).
Interface("preparedMsg", preparedMsg).
Msg("[constructViewChangeMessage] found prepared msg")
@ -65,9 +69,10 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra
vcMsg.PreparedBlock = encodedBlock
}
consensus.getLogger().Debug().
consensus.getLogger().Info().
Hex("m1Payload", vcMsg.Payload).
Str("pubKey", consensus.GetPublicKeys().SerializeToHexStr()).
Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()).
Str("SenderPubKey", priKey.Pub.Bytes.Hex()).
Msg("[constructViewChangeMessage]")
sign := priKey.Pri.SignHash(msgToSign)
@ -110,41 +115,133 @@ func (consensus *Consensus) constructNewViewMessage(viewID uint64, priKey *bls.P
}
vcMsg := message.GetViewchange()
vcMsg.Payload = consensus.m1Payload
if len(consensus.m1Payload) != 0 {
block := consensus.FBFTLog.GetBlockByHash(consensus.blockHash)
if block != nil {
encodedBlock, err := rlp.EncodeToBytes(block)
if err != nil {
consensus.getLogger().Err(err).Msg("[constructNewViewMessage] Failed encoding prepared block")
}
if len(encodedBlock) != 0 {
vcMsg.PreparedBlock = encodedBlock
}
}
vcMsg.Payload, vcMsg.PreparedBlock = consensus.vc.GetPreparedBlock(consensus.FBFTLog, consensus.blockHash)
vcMsg.M2Aggsigs, vcMsg.M2Bitmap = consensus.vc.GetM2Bitmap(viewID)
vcMsg.M3Aggsigs, vcMsg.M3Bitmap = consensus.vc.GetM3Bitmap(viewID)
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message, priKey.Pri)
if err != nil {
consensus.getLogger().Err(err).
Msg("[constructNewViewMessage] failed to sign and marshal the new view message")
}
return proto.ConstructConsensusMessage(marshaledMessage)
}
sig2arr := consensus.GetNilSigsArray(viewID)
consensus.getLogger().Debug().Int("len", len(sig2arr)).Msg("[constructNewViewMessage] M2 (NIL) type signatures")
if len(sig2arr) > 0 {
m2Sig := bls_cosi.AggregateSig(sig2arr)
vcMsg.M2Aggsigs = m2Sig.Serialize()
vcMsg.M2Bitmap = consensus.nilBitmap[viewID].Bitmap
var (
errNilMessage = errors.New("Nil protobuf message")
errIncorrectMessageType = errors.New("Incorrect message type")
)
// ParseViewChangeMessage parses view change message into FBFTMessage structure
func ParseViewChangeMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
if msg == nil {
return nil, errNilMessage
}
vcMsg := msg.GetViewchange()
FBFTMsg := FBFTMessage{
BlockNum: vcMsg.BlockNum,
ViewID: vcMsg.ViewId,
MessageType: msg.GetType(),
Payload: make([]byte, len(vcMsg.Payload)),
Block: make([]byte, len(vcMsg.PreparedBlock)),
}
sig3arr := consensus.GetViewIDSigsArray(viewID)
consensus.getLogger().Debug().Int("len", len(sig3arr)).Msg("[constructNewViewMessage] M3 (ViewID) type signatures")
// even we check here for safty, m3 type signatures must >= 2f+1
if len(sig3arr) > 0 {
m3Sig := bls_cosi.AggregateSig(sig3arr)
vcMsg.M3Aggsigs = m3Sig.Serialize()
vcMsg.M3Bitmap = consensus.viewIDBitmap[viewID].Bitmap
if FBFTMsg.MessageType != msg_pb.MessageType_VIEWCHANGE {
return nil, errIncorrectMessageType
}
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message, priKey.Pri)
copy(FBFTMsg.Block[:], vcMsg.PreparedBlock[:])
copy(FBFTMsg.Payload[:], vcMsg.Payload[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
consensus.getLogger().Err(err).
Msg("[constructNewViewMessage] failed to sign and marshal the new view message")
return nil, err
}
return proto.ConstructConsensusMessage(marshaledMessage)
leaderKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.LeaderPubkey)
if err != nil {
return nil, err
}
vcSig := bls_core.Sign{}
err = vcSig.Deserialize(vcMsg.ViewchangeSig)
if err != nil {
return nil, err
}
FBFTMsg.ViewchangeSig = &vcSig
vcSig1 := bls_core.Sign{}
err = vcSig1.Deserialize(vcMsg.ViewidSig)
if err != nil {
return nil, err
}
FBFTMsg.ViewidSig = &vcSig1
FBFTMsg.SenderPubkey = &bls.PublicKeyWrapper{Object: pubKey}
copy(FBFTMsg.SenderPubkey.Bytes[:], vcMsg.SenderPubkey[:])
FBFTMsg.LeaderPubkey = &bls.PublicKeyWrapper{Object: leaderKey}
copy(FBFTMsg.LeaderPubkey.Bytes[:], vcMsg.LeaderPubkey[:])
return &FBFTMsg, nil
}
// ParseNewViewMessage parses new view message into FBFTMessage structure
func ParseNewViewMessage(msg *msg_pb.Message, members multibls.PublicKeys) (*FBFTMessage, error) {
if msg == nil {
return nil, errNilMessage
}
vcMsg := msg.GetViewchange()
FBFTMsg := FBFTMessage{
BlockNum: vcMsg.BlockNum,
ViewID: vcMsg.ViewId,
MessageType: msg.GetType(),
Payload: make([]byte, len(vcMsg.Payload)),
Block: make([]byte, len(vcMsg.PreparedBlock)),
}
if FBFTMsg.MessageType != msg_pb.MessageType_NEWVIEW {
return nil, errIncorrectMessageType
}
copy(FBFTMsg.Payload[:], vcMsg.Payload[:])
copy(FBFTMsg.Block[:], vcMsg.PreparedBlock[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
return nil, err
}
FBFTMsg.SenderPubkey = &bls.PublicKeyWrapper{Object: pubKey}
copy(FBFTMsg.SenderPubkey.Bytes[:], vcMsg.SenderPubkey[:])
if len(vcMsg.M3Aggsigs) > 0 {
m3Sig := bls_core.Sign{}
err = m3Sig.Deserialize(vcMsg.M3Aggsigs)
if err != nil {
return nil, err
}
m3mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
return nil, err
}
m3mask.SetMask(vcMsg.M3Bitmap)
FBFTMsg.M3AggSig = &m3Sig
FBFTMsg.M3Bitmap = m3mask
}
if len(vcMsg.M2Aggsigs) > 0 {
m2Sig := bls_core.Sign{}
err = m2Sig.Deserialize(vcMsg.M2Aggsigs)
if err != nil {
return nil, err
}
m2mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
return nil, err
}
m2mask.SetMask(vcMsg.M2Bitmap)
FBFTMsg.M2AggSig = &m2Sig
FBFTMsg.M2Bitmap = m2mask
}
return &FBFTMsg, nil
}

@ -45,8 +45,6 @@ func TestPhaseSwitching(t *testing.T) {
assert.Equal(t, FBFTAnnounce, consensus.phase) // It's a new consensus, we should be at the FBFTAnnounce phase
override := false
switches := []phaseSwitch{
{start: FBFTAnnounce, end: FBFTPrepare},
{start: FBFTPrepare, end: FBFTCommit},
@ -54,13 +52,11 @@ func TestPhaseSwitching(t *testing.T) {
}
for _, sw := range switches {
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end, override)
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end)
}
override = true
for _, sw := range switches {
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end, override)
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end)
}
switches = []phaseSwitch{
@ -70,39 +66,19 @@ func TestPhaseSwitching(t *testing.T) {
}
for _, sw := range switches {
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end, override)
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end)
}
}
func testPhaseGroupSwitching(t *testing.T, consensus *Consensus, phases []FBFTPhase, startPhase FBFTPhase, desiredPhase FBFTPhase, override bool) {
phaseMapping := make(map[FBFTPhase]bool)
if override {
for range phases {
consensus.switchPhase(desiredPhase, override)
assert.Equal(t, desiredPhase, consensus.phase)
}
func testPhaseGroupSwitching(t *testing.T, consensus *Consensus, phases []FBFTPhase, startPhase FBFTPhase, desiredPhase FBFTPhase) {
for range phases {
consensus.switchPhase("test", desiredPhase)
assert.Equal(t, desiredPhase, consensus.phase)
return
}
phaseMapping[FBFTAnnounce] = false
phaseMapping[FBFTPrepare] = false
phaseMapping[FBFTCommit] = false
phaseMapping[startPhase] = false
phaseMapping[desiredPhase] = true
assert.Equal(t, startPhase, consensus.phase)
for _, phase := range phases {
consensus.switchPhase(desiredPhase, override)
expected := phaseMapping[phase]
assert.Equal(t, expected, (phase == consensus.phase))
}
assert.Equal(t, desiredPhase, consensus.phase)
return
}
func TestGetNextLeaderKeyShouldFailForStandardGeneratedConsensus(t *testing.T) {
@ -111,7 +87,7 @@ func TestGetNextLeaderKeyShouldFailForStandardGeneratedConsensus(t *testing.T) {
// The below results in: "panic: runtime error: integer divide by zero"
// This happens because there's no check for if there are any participants or not in https://github.com/harmony-one/harmony/blob/main/consensus/quorum/quorum.go#L188-L197
assert.Panics(t, func() { consensus.GetNextLeaderKey() })
assert.Panics(t, func() { consensus.GetNextLeaderKey(uint64(1)) })
}
func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
@ -139,7 +115,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount())
consensus.LeaderPubKey = &wrappedBLSKeys[0]
nextKey := consensus.GetNextLeaderKey()
nextKey := consensus.GetNextLeaderKey(uint64(1))
assert.Equal(t, nextKey, &wrappedBLSKeys[1])
}

Loading…
Cancel
Save