merge with latest main

pull/3374/head
Rongjian Lan 4 years ago
commit 5b993107e6
  1. 2
      cmd/harmony/main.go
  2. 25
      consensus/checks.go
  3. 13
      consensus/config.go
  4. 35
      consensus/consensus.go
  5. 2
      consensus/consensus_msg_sender.go
  6. 84
      consensus/consensus_service.go
  7. 6
      consensus/consensus_v2.go
  8. 127
      consensus/fbft_log.go
  9. 8
      consensus/leader.go
  10. 7
      consensus/quorum/quorum.go
  11. 12
      consensus/validator.go
  12. 463
      consensus/view_change.go
  13. 475
      consensus/view_change_construct.go
  14. 153
      consensus/view_change_msg.go
  15. 38
      consensus/view_change_test.go
  16. 1
      node/node.go
  17. 29
      rosetta/infra/Dockerfile
  18. 127
      rosetta/infra/README.md
  19. 11
      rosetta/infra/run.sh

@ -632,7 +632,7 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType)
Msg("Init Blockchain")
// Assign closure functions to the consensus object
currentConsensus.BlockVerifier = currentNode.VerifyNewBlock
currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock)
currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing
// update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())

@ -2,6 +2,7 @@ package consensus
import (
"bytes"
"encoding/binary"
protobuf "github.com/golang/protobuf/proto"
libbls "github.com/harmony-one/bls/ffi/go/bls"
@ -171,7 +172,8 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MyViewChangingID", consensus.GetViewChangingID()).
Uint64("MsgViewChangingID", recvMsg.ViewID).
Msg("onViewChange")
Interface("SendPubKeys", recvMsg.SenderPubkeys).
Msg("[onViewChangeSanityCheck]")
if consensus.blockNum > recvMsg.BlockNum {
consensus.getLogger().Debug().
@ -180,22 +182,33 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
}
if consensus.blockNum < recvMsg.BlockNum {
consensus.getLogger().Warn().
Msg("[onViewChange] New Leader Has Lower Blocknum")
Msg("[onViewChangeSanityCheck] MsgBlockNum is different from my BlockNumber")
return false
}
if consensus.IsViewChangingMode() &&
consensus.GetViewChangingID() > recvMsg.ViewID {
consensus.GetCurBlockViewID() > recvMsg.ViewID {
consensus.getLogger().Warn().
Msg("[onViewChange] ViewChanging ID Is Low")
Msg("[onViewChangeSanityCheck] ViewChanging ID Is Low")
return false
}
if recvMsg.ViewID-consensus.GetViewChangingID() > MaxViewIDDiff {
consensus.getLogger().Debug().
Msg("Received viewID that is MaxViewIDDiff (100) further from the current viewID!")
Msg("[onViewChangeSanityCheck] Received viewID that is MaxViewIDDiff (249) further from the current viewID!")
return false
}
if len(recvMsg.SenderPubkeys) != 1 {
consensus.getLogger().Error().Msg("[onViewChange] multiple signers in view change message.")
consensus.getLogger().Error().Msg("[onViewChange] zero or multiple signers in view change message.")
return false
}
senderKey := recvMsg.SenderPubkeys[0]
viewIDHash := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDHash, recvMsg.ViewID)
if !recvMsg.ViewidSig.VerifyHash(senderKey.Object, viewIDHash) {
consensus.getLogger().Warn().
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onViewChangeSanityCheck] Failed to Verify viewID Signature")
return false
}
return true

@ -4,14 +4,15 @@ import "time"
// timeout constant
const (
// The duration of viewChangeTimeout; when a view change is initialized with v+1
// timeout will be equal to viewChangeDuration; if view change failed and start v+2
// timeout will be 2*viewChangeDuration; timeout of view change v+n is n*viewChangeDuration
viewChangeDuration time.Duration = 60 * time.Second
// default timeout configuration is shorten to 45 seconds as the consensus is 5s
viewChangeTimeout = 45
// The duration of viewChangeTimeout for each view change
viewChangeDuration time.Duration = viewChangeTimeout * time.Second
// timeout duration for announce/prepare/commit
phaseDuration time.Duration = 60 * time.Second
bootstrapDuration time.Duration = 600 * time.Second
// shorten the duration from 60 to 45 seconds as the consensus is 5s
phaseDuration time.Duration = viewChangeTimeout * time.Second
bootstrapDuration time.Duration = 120 * time.Second
maxLogSize uint32 = 1000
// threshold between received consensus message blockNum and my blockNum
consensusBlockNumBuffer uint64 = 2

@ -27,6 +27,9 @@ const (
var errLeaderPriKeyNotFound = errors.New("getting leader private key from consensus public keys failed")
// BlockVerifierFunc is a function used to verify the block
type BlockVerifierFunc func(*types.Block) error
// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
Decider quorum.Decider
@ -47,23 +50,10 @@ type Consensus struct {
aggregatedCommitSig *bls_core.Sign
prepareBitmap *bls_cosi.Mask
commitBitmap *bls_cosi.Mask
multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators
multiSigMutex sync.RWMutex
// Commits collected from view change
// for each viewID, we need keep track of corresponding sigs and bitmap
// until one of the viewID has enough votes (>=2f+1)
// after one of viewID has enough votes, we can reset and clean the map
// honest nodes will never double votes on different viewID
// bhpSigs: blockHashPreparedSigs is the signature on m1 type message
bhpSigs map[uint64]map[string]*bls_core.Sign
// nilSigs: there is no prepared message when view change,
// it's signature on m2 type (i.e. nil) messages
nilSigs map[uint64]map[string]*bls_core.Sign
viewIDSigs map[uint64]map[string]*bls_core.Sign
bhpBitmap map[uint64]*bls_cosi.Mask
nilBitmap map[uint64]*bls_cosi.Mask
viewIDBitmap map[uint64]*bls_cosi.Mask
m1Payload []byte // message payload for type m1 := |vcBlockHash|prepared_agg_sigs|prepared_bitmap|, new leader only need one
// The chain reader for the blockchain this consensus is working on
ChainReader *core.BlockChain
// Minimal number of peers in the shard
@ -87,15 +77,15 @@ type Consensus struct {
IgnoreViewIDCheck *abool.AtomicBool
// consensus mutex
mutex sync.Mutex
// mutex for view change
vcLock sync.Mutex
// ViewChange struct
vc *viewChange
// Signal channel for starting a new consensus process
ReadySignal chan struct{}
// The post-consensus processing func passed from Node object
// Called when consensus on a new block is done
OnConsensusDone func(*types.Block)
// The verifier func passed from Node object
BlockVerifier func(*types.Block) error
BlockVerifier BlockVerifierFunc
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
// will trigger state syncing when blockNum is low
@ -175,6 +165,12 @@ func (consensus *Consensus) GetConsensusLeaderPrivateKey() (*bls.PrivateKeyWrapp
return consensus.GetLeaderPrivateKey(consensus.LeaderPubKey.Object)
}
// SetBlockVerifier sets the block verifier
func (consensus *Consensus) SetBlockVerifier(verifier BlockVerifierFunc) {
consensus.BlockVerifier = verifier
consensus.vc.SetBlockVerifier(verifier)
}
// New create a new Consensus record
func New(
host p2p.Host, shard uint32, leader p2p.Peer, multiBLSPriKey multibls.PrivateKeys,
@ -214,5 +210,8 @@ func New(
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
consensus.IgnoreViewIDCheck = abool.NewBool(false)
// Make Sure Verifier is not null
consensus.vc = newViewChange()
return &consensus, nil
}

@ -13,7 +13,7 @@ import (
const (
// RetryIntervalInSec is the interval for message retry
RetryIntervalInSec = 10
RetryIntervalInSec = 7
)
// MessageSender is the wrapper object that controls how a consensus message is sent

@ -98,9 +98,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys []bls_cosi.PublicKeyWrapper
consensus.UpdateBitmaps()
consensus.ResetState()
consensus.vcLock.Lock()
consensus.ResetViewChangeState()
consensus.vcLock.Unlock()
return consensus.Decider.ParticipantsCount()
}
@ -130,24 +128,6 @@ func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message,
return nil
}
// GetViewIDSigsArray returns the signatures for viewID in viewchange
func (consensus *Consensus) GetViewIDSigsArray(viewID uint64) []*bls_core.Sign {
sigs := []*bls_core.Sign{}
for _, sig := range consensus.viewIDSigs[viewID] {
sigs = append(sigs, sig)
}
return sigs
}
// GetNilSigsArray returns the signatures for nil prepared message in viewchange
func (consensus *Consensus) GetNilSigsArray(viewID uint64) []*bls_core.Sign {
sigs := []*bls_core.Sign{}
for _, sig := range consensus.nilSigs[viewID] {
sigs = append(sigs, sig)
}
return sigs
}
// UpdateBitmaps update the bitmaps for prepare and commit phase
func (consensus *Consensus) UpdateBitmaps() {
consensus.getLogger().Debug().
@ -166,10 +146,8 @@ func (consensus *Consensus) UpdateBitmaps() {
// ResetState resets the state of the consensus
func (consensus *Consensus) ResetState() {
consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()).
Msg("[ResetState] Resetting consensus state")
consensus.switchPhase(FBFTAnnounce, true)
consensus.switchPhase("ResetState", FBFTAnnounce)
consensus.blockHash = [32]byte{}
consensus.block = []byte{}
consensus.Decider.ResetPrepareAndCommitVotes()
@ -260,17 +238,6 @@ func (consensus *Consensus) ReadSignatureBitmapPayload(
)
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.GetCurBlockViewID()).
Interface("phase", consensus.phase).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}
// retrieve corresponding blsPublicKey from Coinbase Address
func (consensus *Consensus) getLeaderPubKeyFromCoinbase(
header *block.Header,
@ -509,31 +476,6 @@ func (consensus *Consensus) NeedsRandomNumberGeneration(epoch *big.Int) bool {
return false
}
func (consensus *Consensus) addViewIDKeyIfNotExist(viewID uint64) {
members := consensus.Decider.Participants()
if _, ok := consensus.bhpSigs[viewID]; !ok {
consensus.bhpSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := consensus.nilSigs[viewID]; !ok {
consensus.nilSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := consensus.viewIDSigs[viewID]; !ok {
consensus.viewIDSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := consensus.bhpBitmap[viewID]; !ok {
bhpBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.bhpBitmap[viewID] = bhpBitmap
}
if _, ok := consensus.nilBitmap[viewID]; !ok {
nilBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.nilBitmap[viewID] = nilBitmap
}
if _, ok := consensus.viewIDBitmap[viewID]; !ok {
viewIDBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.viewIDBitmap[viewID] = viewIDBitmap
}
}
// SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) SetViewIDs(height uint64) {
@ -566,3 +508,25 @@ func (consensus *Consensus) FinishFinalityCount() {
func (consensus *Consensus) GetFinality() int64 {
return consensus.finality
}
// switchPhase will switch FBFTPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) {
consensus.getLogger().Info().
Str("from:", consensus.phase.String()).
Str("to:", desired.String()).
Str("switchPhase:", subject)
consensus.phase = desired
return
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.GetCurBlockViewID()).
Str("phase", consensus.phase.String()).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}

@ -304,11 +304,7 @@ func (consensus *Consensus) tryCatchup() {
break
}
if currentBlockNum < consensus.blockNum {
consensus.getLogger().Info().
Uint64("From", currentBlockNum).
Uint64("To", consensus.blockNum).
Msg("[TryCatchup] Caught up!")
consensus.switchPhase(FBFTAnnounce, true)
consensus.switchPhase("TryCatchup", FBFTAnnounce)
}
// catup up and skip from view change trap
if currentBlockNum < consensus.blockNum &&

@ -11,7 +11,6 @@ import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
)
// FBFTLog represents the log stored by a node during FBFT process
@ -160,7 +159,10 @@ func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNu
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum && msg.(*FBFTMessage).ViewID == viewID && msg.(*FBFTMessage).BlockHash == blockHash {
if msg.(*FBFTMessage).MessageType == typ &&
msg.(*FBFTMessage).BlockNum == blockNum &&
msg.(*FBFTMessage).ViewID == viewID &&
msg.(*FBFTMessage).BlockHash == blockHash {
found = append(found, msg.(*FBFTMessage))
}
}
@ -172,7 +174,8 @@ func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum {
if msg.(*FBFTMessage).MessageType == typ &&
msg.(*FBFTMessage).BlockNum == blockNum {
found = append(found, msg.(*FBFTMessage))
}
}
@ -184,7 +187,9 @@ func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum ui
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum && msg.(*FBFTMessage).BlockHash == blockHash {
if msg.(*FBFTMessage).MessageType == typ &&
msg.(*FBFTMessage).BlockNum == blockNum &&
msg.(*FBFTMessage).BlockHash == blockHash {
found = append(found, msg.(*FBFTMessage))
}
}
@ -281,117 +286,3 @@ func (consensus *Consensus) ParseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage,
return &pbftMsg, nil
}
// ParseViewChangeMessage parses view change message into FBFTMessage structure
func ParseViewChangeMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
pbftMsg := FBFTMessage{}
pbftMsg.MessageType = msg.GetType()
if pbftMsg.MessageType != msg_pb.MessageType_VIEWCHANGE {
return nil, fmt.Errorf("ParseViewChangeMessage: incorrect message type %s", pbftMsg.MessageType)
}
vcMsg := msg.GetViewchange()
pbftMsg.ViewID = vcMsg.ViewId
pbftMsg.BlockNum = vcMsg.BlockNum
pbftMsg.Block = make([]byte, len(vcMsg.PreparedBlock))
copy(pbftMsg.Block[:], vcMsg.PreparedBlock[:])
pbftMsg.Payload = make([]byte, len(vcMsg.Payload))
copy(pbftMsg.Payload[:], vcMsg.Payload[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse senderpubkey")
return nil, err
}
leaderKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.LeaderPubkey)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse leaderpubkey")
return nil, err
}
vcSig := bls_core.Sign{}
err = vcSig.Deserialize(vcMsg.ViewchangeSig)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the viewchange signature")
return nil, err
}
vcSig1 := bls_core.Sign{}
err = vcSig1.Deserialize(vcMsg.ViewidSig)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the viewid signature")
return nil, err
}
pbftMsg.SenderPubkeys = []*bls.PublicKeyWrapper{{Object: pubKey}}
copy(pbftMsg.SenderPubkeys[0].Bytes[:], vcMsg.SenderPubkey[:])
pbftMsg.LeaderPubkey = &bls.PublicKeyWrapper{Object: leaderKey}
copy(pbftMsg.LeaderPubkey.Bytes[:], vcMsg.LeaderPubkey[:])
pbftMsg.ViewchangeSig = &vcSig
pbftMsg.ViewidSig = &vcSig1
return &pbftMsg, nil
}
// ParseNewViewMessage parses new view message into FBFTMessage structure
func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
FBFTMsg := FBFTMessage{}
FBFTMsg.MessageType = msg.GetType()
if FBFTMsg.MessageType != msg_pb.MessageType_NEWVIEW {
return nil, fmt.Errorf("ParseNewViewMessage: incorrect message type %s", FBFTMsg.MessageType)
}
vcMsg := msg.GetViewchange()
FBFTMsg.ViewID = vcMsg.ViewId
FBFTMsg.BlockNum = vcMsg.BlockNum
FBFTMsg.Payload = make([]byte, len(vcMsg.Payload))
copy(FBFTMsg.Payload[:], vcMsg.Payload[:])
FBFTMsg.Block = make([]byte, len(vcMsg.PreparedBlock))
copy(FBFTMsg.Block[:], vcMsg.PreparedBlock[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse senderpubkey")
return nil, err
}
FBFTMsg.SenderPubkeys = []*bls.PublicKeyWrapper{{Object: pubKey}}
copy(FBFTMsg.SenderPubkeys[0].Bytes[:], vcMsg.SenderPubkey[:])
members := consensus.Decider.Participants()
if len(vcMsg.M3Aggsigs) > 0 {
m3Sig := bls_core.Sign{}
err = m3Sig.Deserialize(vcMsg.M3Aggsigs)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M3 viewID signature")
return nil, err
}
m3mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err
}
m3mask.SetMask(vcMsg.M3Bitmap)
FBFTMsg.M3AggSig = &m3Sig
FBFTMsg.M3Bitmap = m3mask
}
if len(vcMsg.M2Aggsigs) > 0 {
m2Sig := bls_core.Sign{}
err = m2Sig.Deserialize(vcMsg.M2Aggsigs)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M2 aggregated signature")
return nil, err
}
m2mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err
}
m2mask.SetMask(vcMsg.M2Bitmap)
FBFTMsg.M2AggSig = &m2Sig
FBFTMsg.M2Bitmap = m2mask
}
return &FBFTMsg, nil
}

@ -94,11 +94,7 @@ func (consensus *Consensus) announce(block *types.Block) {
Msg("[Announce] Sent Announce Message!!")
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTPrepare.String()).
Msg("[Announce] Switching phase")
consensus.switchPhase(FBFTPrepare, true)
consensus.switchPhase("Announce", FBFTPrepare)
}
func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
@ -198,7 +194,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
if err := consensus.didReachPrepareQuorum(); err != nil {
return
}
consensus.switchPhase(FBFTCommit, true)
consensus.switchPhase("onPrepare", FBFTCommit)
}
//// Read - End
}

@ -72,7 +72,7 @@ type ParticipantTracker interface {
Participants() multibls.PublicKeys
IndexOf(bls.SerializedPublicKey) int
ParticipantsCount() int64
NextAfter(*bls.PublicKeyWrapper) (bool, *bls.PublicKeyWrapper)
NthNext(*bls.PublicKeyWrapper, int) (bool, *bls.PublicKeyWrapper)
UpdateParticipants(pubKeys []bls.PublicKeyWrapper)
}
@ -206,14 +206,15 @@ func (s *cIdentities) IndexOf(pubKey bls.SerializedPublicKey) int {
return -1
}
func (s *cIdentities) NextAfter(pubKey *bls.PublicKeyWrapper) (bool, *bls.PublicKeyWrapper) {
// NthNext return the Nth next pubkey, next can be negative number
func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
found := false
idx := s.IndexOf(pubKey.Bytes)
if idx != -1 {
found = true
}
idx = (idx + 1) % int(s.ParticipantsCount())
idx = (idx + next) % int(s.ParticipantsCount())
return found, &s.publicKeys[idx]
}

@ -73,11 +73,7 @@ func (consensus *Consensus) prepare() {
Msg("[OnAnnounce] Sent Prepare Message!!")
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTPrepare.String()).
Msg("[Announce] Switching MessageType")
consensus.switchPhase(FBFTPrepare, true)
consensus.switchPhase("Announce", FBFTPrepare)
}
// if onPrepared accepts the prepared message from the leader, then
@ -210,11 +206,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Msg("[OnPrepared] Sent Commit Message!!")
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()).
Msg("[OnPrepared] Switching phase")
consensus.switchPhase(FBFTCommit, true)
consensus.switchPhase("onPrepared", FBFTCommit)
}
func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {

@ -1,30 +1,22 @@
package consensus
import (
"bytes"
"encoding/binary"
"encoding/hex"
"sync"
"time"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/consensus/signature"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
// MaxViewIDDiff limits the received view ID to only 100 further from the current view ID
const MaxViewIDDiff = 100
// MaxViewIDDiff limits the received view ID to only 249 further from the current view ID
const MaxViewIDDiff = 249
// State contains current mode and current viewID
type State struct {
@ -98,30 +90,15 @@ func (pm *State) GetViewChangeDuraion() time.Duration {
return time.Duration(diff * diff * int64(viewChangeDuration))
}
// switchPhase will switch FBFTPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(desired FBFTPhase, override bool) {
if override {
consensus.phase = desired
return
}
var nextPhase FBFTPhase
switch consensus.phase {
case FBFTAnnounce:
nextPhase = FBFTPrepare
case FBFTPrepare:
nextPhase = FBFTCommit
case FBFTCommit:
nextPhase = FBFTAnnounce
}
if nextPhase == desired {
consensus.phase = nextPhase
}
}
// GetNextLeaderKey uniquely determine who is the leader for given viewID
func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKeyWrapper {
wasFound, next := consensus.Decider.NextAfter(consensus.LeaderPubKey)
func (consensus *Consensus) GetNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper {
gap := 1
consensus.getLogger().Info().
Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()).
Uint64("newViewID", viewID).
Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()).
Msg("[GetNextLeaderKey] got leaderPubKey from coinbase")
wasFound, next := consensus.Decider.NthNext(consensus.LeaderPubKey, gap)
if !wasFound {
consensus.getLogger().Warn().
Str("key", consensus.LeaderPubKey.Bytes.Hex()).
@ -130,22 +107,6 @@ func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKeyWrapper {
return next
}
// ResetViewChangeState reset the state for viewchange
func (consensus *Consensus) ResetViewChangeState() {
consensus.getLogger().Debug().
Str("Phase", consensus.phase.String()).
Msg("[ResetViewChangeState] Resetting view change state")
consensus.current.SetMode(Normal)
consensus.m1Payload = []byte{}
consensus.bhpSigs = map[uint64]map[string]*bls_core.Sign{}
consensus.nilSigs = map[uint64]map[string]*bls_core.Sign{}
consensus.viewIDSigs = map[uint64]map[string]*bls_core.Sign{}
consensus.bhpBitmap = map[uint64]*bls_cosi.Mask{}
consensus.nilBitmap = map[uint64]*bls_cosi.Mask{}
consensus.viewIDBitmap = map[uint64]*bls_cosi.Mask{}
consensus.Decider.ResetViewChangeVotes()
}
func createTimeout() map[TimeoutType]*utils.Timeout {
timeouts := make(map[TimeoutType]*utils.Timeout)
timeouts[timeoutConsensus] = utils.NewTimeout(phaseDuration)
@ -155,6 +116,7 @@ func createTimeout() map[TimeoutType]*utils.Timeout {
}
// startViewChange send a new view change
// the viewID is the current viewID
func (consensus *Consensus) startViewChange(viewID uint64) {
if consensus.disableViewChange {
return
@ -163,15 +125,33 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.current.SetMode(ViewChanging)
consensus.SetViewChangingID(viewID)
consensus.LeaderPubKey = consensus.GetNextLeaderKey()
consensus.LeaderPubKey = consensus.GetNextLeaderKey(viewID)
duration := consensus.current.GetViewChangeDuraion()
consensus.getLogger().Warn().
Uint64("viewID", viewID).
Uint64("viewChangingID", consensus.GetViewChangingID()).
Dur("timeoutDuration", duration).
Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()).
Msg("[startViewChange]")
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
defer consensus.consensusTimeout[timeoutViewChange].Start()
// update the dictionary key if the viewID is first time received
consensus.vc.AddViewIDKeyIfNotExist(viewID, consensus.Decider.Participants())
// init my own payload
if err := consensus.vc.InitPayload(
consensus.FBFTLog,
viewID,
consensus.blockNum,
consensus.priKey); err != nil {
consensus.getLogger().Error().Err(err).Msg("Init Payload Error")
}
// for view change, send separate view change per public key
// do not do multi-sign of view change message
for _, key := range consensus.priKey {
if !consensus.IsValidatorInCommittee(key.Pub.Bytes) {
continue
@ -183,21 +163,26 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
p2p.ConstructMessage(msgToSend),
)
}
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
consensus.consensusTimeout[timeoutViewChange].Start()
}
// onViewChange is called when the view change message is received.
func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.getLogger().Info().Msg("[onViewChange] Received ViewChange Message")
recvMsg, err := ParseViewChangeMessage(msg)
if err != nil {
consensus.getLogger().Warn().Msg("[onViewChange] Unable To Parse Viewchange Message")
consensus.getLogger().Warn().Err(err).Msg("[onViewChange] Unable To Parse Viewchange Message")
return
}
// if not leader, noop
newLeaderKey := recvMsg.LeaderPubkey
newLeaderPriKey, err := consensus.GetLeaderPrivateKey(newLeaderKey.Object)
if err != nil {
consensus.getLogger().Info().
Err(err).
Interface("SenderPubkeys", recvMsg.SenderPubkeys).
Str("NextLeader", recvMsg.LeaderPubkey.Bytes.Hex()).
Str("myBLSPubKey", consensus.priKey.GetPublicKeys().SerializeToHexStr()).
Msg("[onViewChange] I am not the Leader")
return
}
@ -205,7 +190,8 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.getLogger().Info().
Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)).
Int64("need", consensus.Decider.TwoThirdsSignersCount()).
Interface("validatorPubKeys", recvMsg.SenderPubkeys).
Interface("SenderPubkeys", recvMsg.SenderPubkeys).
Str("newLeaderKey", newLeaderKey.Bytes.Hex()).
Msg("[onViewChange] Received Enough View Change Messages")
return
}
@ -217,212 +203,36 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
// already checked the length of SenderPubkeys in onViewChangeSanityCheck
senderKey := recvMsg.SenderPubkeys[0]
consensus.vcLock.Lock()
defer consensus.vcLock.Unlock()
// update the dictionary key if the viewID is first time received
consensus.addViewIDKeyIfNotExist(recvMsg.ViewID)
// TODO: remove NIL type message
// add self m1 or m2 type message signature and bitmap
_, ok1 := consensus.nilSigs[recvMsg.ViewID][newLeaderKey.Bytes.Hex()]
_, ok2 := consensus.bhpSigs[recvMsg.ViewID][newLeaderKey.Bytes.Hex()]
if !(ok1 || ok2) {
// add own signature for newview message
preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeq(
msg_pb.MessageType_PREPARED, recvMsg.BlockNum,
)
preparedMsg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs)
hasBlock := false
if preparedMsg != nil {
if preparedBlock := consensus.FBFTLog.GetBlockByHash(
preparedMsg.BlockHash,
); preparedBlock != nil {
if consensus.BlockVerifier(preparedBlock); err != nil {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] My own prepared block verification failed")
} else {
hasBlock = true
}
}
}
if hasBlock {
consensus.getLogger().Info().Msg("[onViewChange] add my M1 type messaage")
msgToSign := append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
for i, key := range consensus.priKey {
if err := consensus.bhpBitmap[recvMsg.ViewID].SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().Msgf("[onViewChange] bhpBitmap setkey failed for key at index %d", i)
continue
}
consensus.bhpSigs[recvMsg.ViewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(msgToSign)
}
// if m1Payload is empty, we just add one
if len(consensus.m1Payload) == 0 {
consensus.m1Payload = append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
}
} else {
consensus.getLogger().Info().Msg("[onViewChange] add my M2(NIL) type messaage")
for i, key := range consensus.priKey {
if err := consensus.nilBitmap[recvMsg.ViewID].SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().Msgf("[onViewChange] nilBitmap setkey failed for key at index %d", i)
continue
}
consensus.nilSigs[recvMsg.ViewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(NIL)
}
}
}
// add self m3 type message signature and bitmap
_, ok3 := consensus.viewIDSigs[recvMsg.ViewID][newLeaderKey.Bytes.Hex()]
if !ok3 {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
for i, key := range consensus.priKey {
if err := consensus.viewIDBitmap[recvMsg.ViewID].SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().Msgf("[onViewChange] viewIDBitmap setkey failed for key at index %d", i)
continue
}
consensus.viewIDSigs[recvMsg.ViewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(viewIDBytes)
}
}
preparedBlock := &types.Block{}
hasBlock := false
if len(recvMsg.Payload) != 0 && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onViewChange] Unparseable prepared block data")
return
}
hasBlock = true
}
// m2 type message
if !hasBlock {
_, ok := consensus.nilSigs[recvMsg.ViewID][senderKey.Bytes.Hex()]
if ok {
consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Already Received M2 message from validator")
return
}
members := consensus.Decider.Participants()
consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members)
if !recvMsg.ViewchangeSig.VerifyHash(senderKey.Object, NIL) {
consensus.getLogger().Warn().Msg("[onViewChange] Failed To Verify Signature For M2 Type Viewchange Message")
return
}
consensus.getLogger().Info().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Add M2 (NIL) type message")
consensus.nilSigs[recvMsg.ViewID][senderKey.Bytes.Hex()] = recvMsg.ViewchangeSig
consensus.nilBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true) // Set the bitmap indicating that this validator signed.
} else { // m1 type message
if consensus.BlockVerifier(preparedBlock); err != nil {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] Prepared block verification failed")
return
}
_, ok := consensus.bhpSigs[recvMsg.ViewID][senderKey.Bytes.Hex()]
if ok {
consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Already Received M1 Message From the Validator")
return
}
if !recvMsg.ViewchangeSig.VerifyHash(senderKey.Object, recvMsg.Payload) {
consensus.getLogger().Warn().Msg("[onViewChange] Failed to Verify Signature for M1 Type Viewchange Message")
// do it once only per viewID/Leader
if err := consensus.vc.InitPayload(consensus.FBFTLog,
recvMsg.ViewID,
recvMsg.BlockNum,
consensus.priKey); err != nil {
consensus.getLogger().Error().Err(err).Msg("Init Payload Error")
return
}
// first time receive m1 type message, need verify validity of prepared message
if len(consensus.m1Payload) == 0 || !bytes.Equal(consensus.m1Payload, recvMsg.Payload) {
if len(recvMsg.Payload) <= 32 {
consensus.getLogger().Warn().
Int("len", len(recvMsg.Payload)).
Msg("[onViewChange] M1 RecvMsg Payload Not Enough Length")
return
}
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
err = consensus.vc.ProcessViewChangeMsg(consensus.FBFTLog, consensus.Decider, recvMsg)
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] M1 RecvMsg Payload Read Error")
return
}
if !consensus.Decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn().
Msgf("[onViewChange] Quorum Not achieved")
return
}
// Verify the multi-sig for prepare phase
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
consensus.getLogger().Warn().
Hex("blockHash", blockHash).
Msg("[onViewChange] failed to verify multi signature for m1 prepared payload")
return
}
// if m1Payload is empty, we just add one
if len(consensus.m1Payload) == 0 {
consensus.m1Payload = append(recvMsg.Payload[:0:0], recvMsg.Payload...)
// create prepared message for new leader
preparedMsg := FBFTMessage{
MessageType: msg_pb.MessageType_PREPARED,
ViewID: recvMsg.ViewID,
BlockNum: recvMsg.BlockNum,
}
preparedMsg.BlockHash = common.Hash{}
copy(preparedMsg.BlockHash[:], recvMsg.Payload[:32])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{newLeaderKey}
consensus.getLogger().Info().Msg("[onViewChange] New Leader Prepared Message Added")
consensus.FBFTLog.AddMessage(&preparedMsg)
consensus.FBFTLog.AddBlock(preparedBlock)
}
}
consensus.getLogger().Info().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Add M1 (prepared) type message")
consensus.bhpSigs[recvMsg.ViewID][senderKey.Bytes.Hex()] = recvMsg.ViewchangeSig
consensus.bhpBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true) // Set the bitmap indicating that this validator signed.
}
// check and add viewID (m3 type) message signature
if _, ok := consensus.viewIDSigs[recvMsg.ViewID][senderKey.Bytes.Hex()]; ok {
consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Already Received M3(ViewID) message from the validator")
return
}
viewIDHash := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDHash, recvMsg.ViewID)
if !recvMsg.ViewidSig.VerifyHash(senderKey.Object, viewIDHash) {
consensus.getLogger().Warn().
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onViewChange] Failed to Verify M3 Message Signature")
consensus.getLogger().Error().Err(err).
Uint64("viewID", recvMsg.ViewID).
Uint64("blockNum", recvMsg.BlockNum).
Str("msgSender", senderKey.Bytes.Hex()).
Msg("Verify View Change Message Error")
return
}
consensus.getLogger().Info().
Str("validatorPubKey", senderKey.Bytes.Hex()).
Msg("[onViewChange] Add M3 (ViewID) type message")
consensus.viewIDSigs[recvMsg.ViewID][senderKey.Bytes.Hex()] = recvMsg.ViewidSig
// Set the bitmap indicating that this validator signed.
consensus.viewIDBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true)
consensus.getLogger().Info().
Int("have", len(consensus.viewIDSigs[recvMsg.ViewID])).
Int64("total", consensus.Decider.ParticipantsCount()).
Msg("[onViewChange]")
// received enough view change messages, change state to normal consensus
if consensus.Decider.IsQuorumAchievedByMask(consensus.viewIDBitmap[recvMsg.ViewID]) {
if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) {
consensus.getLogger().Info().Msg("[onViewChange] View Change Message Quorum Reached")
consensus.current.SetMode(Normal)
consensus.LeaderPubKey = newLeaderKey
consensus.ResetState()
if len(consensus.m1Payload) == 0 {
if consensus.vc.IsM1PayloadEmpty() {
// TODO(Chao): explain why ReadySignal is sent only in this case but not the other case.
// Make sure the newly proposed block have the correct view ID
consensus.SetCurBlockViewID(recvMsg.ViewID)
@ -430,13 +240,10 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.ReadySignal <- struct{}{}
}()
} else {
consensus.getLogger().Info().
Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(FBFTCommit, true)
copy(consensus.blockHash[:], consensus.m1Payload[:32])
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(consensus.m1Payload, 32)
consensus.switchPhase("onViewChange", FBFTCommit)
payload := consensus.vc.GetM1Payload()
copy(consensus.blockHash[:], payload[:32])
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32)
if err != nil {
consensus.getLogger().Error().Err(err).
@ -456,7 +263,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64())
for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().
consensus.getLogger().Warn().Err(err).
Msgf("[OnViewChange] New Leader commit bitmap set failed for key at index %d", i)
continue
}
@ -469,22 +276,18 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
block.NumberU64(),
block.Header().ViewID().Uint64(),
); err != nil {
consensus.getLogger().Warn().Msg("submit vote on viewchange commit failed")
consensus.getLogger().Warn().Err(err).Msg("submit vote on viewchange commit failed")
return
}
}
}
consensus.SetViewChangingID(recvMsg.ViewID)
consensus.SetViewIDs(recvMsg.ViewID)
msgToSend := consensus.constructNewViewMessage(
recvMsg.ViewID, newLeaderPriKey,
)
consensus.getLogger().Warn().
Int("payloadSize", len(consensus.m1Payload)).
Hex("M1Payload", consensus.m1Payload).
Msg("[onViewChange] Sent NewView Message")
if err := consensus.msgSender.SendWithRetry(
consensus.blockNum,
msg_pb.MessageType_NEWVIEW,
@ -495,25 +298,42 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.getLogger().Err(err).
Msg("could not send out the NEWVIEW message")
}
consensus.getLogger().Info().
Str("myKey", newLeaderKey.Bytes.Hex()).
Hex("M1Payload", consensus.vc.GetM1Payload()).
Msg("[onViewChange] Sent NewView Messge")
consensus.SetCurBlockViewID(recvMsg.ViewID)
consensus.ResetViewChangeState()
consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().Str("myKey", newLeaderKey.Bytes.Hex()).Msg("[onViewChange] I am the New Leader")
consensus.getLogger().Info().
Str("myKey", newLeaderKey.Bytes.Hex()).
Msg("[onViewChange] I am the New Leader")
}
}
// TODO: move to consensus_leader.go later
// onNewView is called when validators received newView message from the new leader
// the validator needs to check the m3bitmap to see if the quorum is reached
// If the new view message contains payload (block), and at least one m1 message was
// collected by the new leader (m3count > m2count), the validator will create a new
// prepared message from the payload and commit it to the block
// Or the validator will enter announce phase to wait for the new block proposed
// from the new leader
func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
consensus.getLogger().Info().Msg("[onNewView] Received NewView Message")
recvMsg, err := consensus.ParseNewViewMessage(msg)
members := consensus.Decider.Participants()
recvMsg, err := ParseNewViewMessage(msg, members)
if err != nil {
consensus.getLogger().Warn().Err(err).Msg("[onNewView] Unable to Parse NewView Message")
return
}
if !consensus.onNewViewSanityCheck(recvMsg) {
// change view and leaderKey to keep in sync with network
if consensus.blockNum != recvMsg.BlockNum {
consensus.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("myBlockNum", consensus.blockNum).
Msg("[onNewView] Invalid block number")
return
}
@ -523,82 +343,26 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
}
senderKey := recvMsg.SenderPubkeys[0]
consensus.vcLock.Lock()
defer consensus.vcLock.Unlock()
if recvMsg.M3AggSig == nil || recvMsg.M3Bitmap == nil {
consensus.getLogger().Error().Msg("[onNewView] M3AggSig or M3Bitmap is nil")
if !consensus.onNewViewSanityCheck(recvMsg) {
return
}
m3Sig := recvMsg.M3AggSig
m3Mask := recvMsg.M3Bitmap
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) {
consensus.getLogger().Warn().
Msgf("[onNewView] Quorum Not achieved")
preparedBlock, err := consensus.vc.VerifyNewViewMsg(recvMsg)
if err != nil {
consensus.getLogger().Warn().Err(err).Msg("[onNewView] Verify New View Msg Failed")
return
}
if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDBytes) {
m3Mask := recvMsg.M3Bitmap
if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) {
consensus.getLogger().Warn().
Str("m3Sig", m3Sig.SerializeToHexStr()).
Hex("m3Mask", m3Mask.Bitmap).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onNewView] Unable to Verify Aggregated Signature of M3 (ViewID) payload")
Msgf("[onNewView] Quorum Not achieved")
return
}
m2Mask := recvMsg.M2Bitmap
if recvMsg.M2AggSig != nil {
consensus.getLogger().Info().Msg("[onNewView] M2AggSig (NIL) is Not Empty")
m2Sig := recvMsg.M2AggSig
if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) {
consensus.getLogger().Warn().
Msg("[onNewView] Unable to Verify Aggregated Signature of M2 (NIL) payload")
return
}
}
// check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty
preparedBlock := &types.Block{}
hasBlock := false
if len(recvMsg.Payload) != 0 && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onNewView] Unparseable prepared block data")
return
}
blockHash := recvMsg.Payload[:32]
preparedBlockHash := preparedBlock.Hash()
if !bytes.Equal(preparedBlockHash[:], blockHash) {
consensus.getLogger().Warn().
Err(err).
Str("blockHash", preparedBlock.Hash().Hex()).
Str("payloadBlockHash", hex.EncodeToString(blockHash)).
Msg("[onNewView] Prepared block hash doesn't match msg block hash.")
return
}
hasBlock = true
if consensus.BlockVerifier(preparedBlock); err != nil {
consensus.getLogger().Error().Err(err).Msg("[onNewView] Prepared block verification failed")
return
}
}
if m2Mask == nil || m2Mask.Bitmap == nil ||
(m2Mask != nil && m2Mask.Bitmap != nil &&
utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) {
if len(recvMsg.Payload) <= 32 {
consensus.getLogger().Info().
Msg("[onNewView] M1 (prepared) Type Payload Not Have Enough Length")
return
}
// m1 is not empty, check it's valid
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
@ -625,30 +389,27 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
copy(preparedMsg.BlockHash[:], blockHash[:])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{senderKey}
consensus.FBFTLog.AddMessage(&preparedMsg)
if hasBlock {
if preparedBlock != nil {
consensus.FBFTLog.AddBlock(preparedBlock)
}
}
if !consensus.IsViewChangingMode() {
consensus.getLogger().Info().Msg("Not in ViewChanging Mode.")
return
}
// newView message verified success, override my state
consensus.SetViewIDs(recvMsg.ViewID)
consensus.LeaderPubKey = senderKey
consensus.ResetViewChangeState()
// change view and leaderKey to keep in sync with network
if consensus.blockNum != recvMsg.BlockNum {
consensus.getLogger().Info().
Str("newLeaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onNewView] New Leader Changed")
return
}
// NewView message is verified, change state to normal consensus
if hasBlock {
if preparedBlock != nil {
// Construct and send the commit message
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader,
preparedBlock.Epoch(), preparedBlock.Hash(), preparedBlock.NumberU64(), preparedBlock.Header().ViewID().Uint64())
@ -666,7 +427,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
)
if err != nil {
consensus.getLogger().Err(err).Msg("could not create commit message")
return
continue
}
consensus.getLogger().Info().Msg("onNewView === commit")
@ -675,11 +436,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
p2p.ConstructMessage(p2pMsg.Bytes),
)
}
consensus.getLogger().Info().
Str("From", consensus.phase.String()).
Str("To", FBFTCommit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(FBFTCommit, true)
consensus.switchPhase("onNewView", FBFTCommit)
} else {
consensus.ResetState()
consensus.getLogger().Info().Msg("onNewView === announce")
@ -687,8 +444,16 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
consensus.getLogger().Info().
Str("newLeaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Msg("new leader changed")
consensus.getLogger().Info().
Msg("validator start consensus timer and stop view change timer")
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.consensusTimeout[timeoutViewChange].Stop()
}
// ResetViewChangeState resets the view change structure
func (consensus *Consensus) ResetViewChangeState() {
consensus.getLogger().Info().
Str("Phase", consensus.phase.String()).
Msg("[ResetViewChangeState] Resetting view change state")
consensus.current.SetMode(Normal)
consensus.vc.Reset()
consensus.Decider.ResetViewChangeVotes()
}

@ -0,0 +1,475 @@
package consensus
import (
"bytes"
"encoding/binary"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/types"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/crypto/bls"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
const (
// ValidPayloadLength is the valid length for viewchange payload
ValidPayloadLength = 32 + bls.BLSSignatureSizeInBytes
)
// viewChange encapsulate all the view change related data structure and functions
type viewChange struct {
// Commits collected from view change
// for each viewID, we need keep track of corresponding sigs and bitmap
// until one of the viewID has enough votes (>=2f+1)
// after one of viewID has enough votes, we can reset and clean the map
// honest nodes will never double votes on different viewID
// bhpSigs: blockHashPreparedSigs is the signature on m1 type message
bhpSigs map[uint64]map[string]*bls_core.Sign
// nilSigs: there is no prepared message when view change,
// it's signature on m2 type (i.e. nil) messages
nilSigs map[uint64]map[string]*bls_core.Sign
viewIDSigs map[uint64]map[string]*bls_core.Sign
bhpBitmap map[uint64]*bls_cosi.Mask
nilBitmap map[uint64]*bls_cosi.Mask
viewIDBitmap map[uint64]*bls_cosi.Mask
newViewMsg map[uint64]map[string]uint64
m1Payload []byte // message payload for type m1 := |vcBlockHash|prepared_agg_sigs|prepared_bitmap|, new leader only need one
blockVerifier BlockVerifierFunc
viewChangeDuration time.Duration
// mutex for view change
vcLock sync.RWMutex
}
// newViewChange returns a new viewChange object
func newViewChange() *viewChange {
vc := viewChange{}
vc.Reset()
return &vc
}
// SetBlockVerifier ..
func (vc *viewChange) SetBlockVerifier(verifier BlockVerifierFunc) {
vc.blockVerifier = verifier
}
// AddViewIDKeyIfNotExist ..
func (vc *viewChange) AddViewIDKeyIfNotExist(viewID uint64, members multibls.PublicKeys) {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
if _, ok := vc.bhpSigs[viewID]; !ok {
vc.bhpSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := vc.nilSigs[viewID]; !ok {
vc.nilSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := vc.viewIDSigs[viewID]; !ok {
vc.viewIDSigs[viewID] = map[string]*bls_core.Sign{}
}
if _, ok := vc.bhpBitmap[viewID]; !ok {
bhpBitmap, _ := bls_cosi.NewMask(members, nil)
vc.bhpBitmap[viewID] = bhpBitmap
}
if _, ok := vc.nilBitmap[viewID]; !ok {
nilBitmap, _ := bls_cosi.NewMask(members, nil)
vc.nilBitmap[viewID] = nilBitmap
}
if _, ok := vc.viewIDBitmap[viewID]; !ok {
viewIDBitmap, _ := bls_cosi.NewMask(members, nil)
vc.viewIDBitmap[viewID] = viewIDBitmap
}
}
// Reset reset the state for viewchange
func (vc *viewChange) Reset() {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
vc.m1Payload = []byte{}
vc.bhpSigs = map[uint64]map[string]*bls_core.Sign{}
vc.nilSigs = map[uint64]map[string]*bls_core.Sign{}
vc.viewIDSigs = map[uint64]map[string]*bls_core.Sign{}
vc.bhpBitmap = map[uint64]*bls_cosi.Mask{}
vc.nilBitmap = map[uint64]*bls_cosi.Mask{}
vc.viewIDBitmap = map[uint64]*bls_cosi.Mask{}
vc.newViewMsg = map[uint64]map[string]uint64{}
}
// GetPreparedBlock returns the prepared block or nil if not found
func (vc *viewChange) GetPreparedBlock(fbftlog *FBFTLog, hash [32]byte) ([]byte, []byte) {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
if !vc.isM1PayloadEmpty() {
block := fbftlog.GetBlockByHash(hash)
if block != nil {
encodedBlock, err := rlp.EncodeToBytes(block)
if err != nil || len(encodedBlock) == 0 {
vc.getLogger().Err(err).Msg("[GetPreparedBlock] Failed encoding prepared block")
return vc.m1Payload, nil
}
return vc.m1Payload, encodedBlock
}
}
return vc.m1Payload, nil
}
// GetM2Bitmap returns the nilBitmap as M2Bitmap
func (vc *viewChange) GetM2Bitmap(viewID uint64) ([]byte, []byte) {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
sig2arr := []*bls_core.Sign{}
for _, sig := range vc.nilSigs[viewID] {
sig2arr = append(sig2arr, sig)
}
if len(sig2arr) > 0 {
m2Sig := bls_cosi.AggregateSig(sig2arr)
vc.getLogger().Info().Int("len", len(sig2arr)).Msg("[GetM2Bitmap] M2 (NIL) type signatures")
return m2Sig.Serialize(), vc.nilBitmap[viewID].Bitmap
}
vc.getLogger().Info().Uint64("viewID", viewID).Msg("[GetM2Bitmap] No M2 (NIL) type signatures")
return nil, nil
}
// GetM3Bitmap returns the viewIDBitmap as M3Bitmap
func (vc *viewChange) GetM3Bitmap(viewID uint64) ([]byte, []byte) {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
sig3arr := []*bls_core.Sign{}
for _, sig := range vc.viewIDSigs[viewID] {
sig3arr = append(sig3arr, sig)
}
// even we check here for safty, m3 type signatures must >= 2f+1
if len(sig3arr) > 0 {
m3Sig := bls_cosi.AggregateSig(sig3arr)
vc.getLogger().Info().Int("len", len(sig3arr)).Msg("[GetM3Bitmap] M3 (ViewID) type signatures")
return m3Sig.Serialize(), vc.viewIDBitmap[viewID].Bitmap
}
vc.getLogger().Info().Uint64("viewID", viewID).Msg("[GetM3Bitmap] No M3 (ViewID) type signatures")
return nil, nil
}
// VerifyNewViewMsg returns true if the new view message is valid
func (vc *viewChange) VerifyNewViewMsg(recvMsg *FBFTMessage) (*types.Block, error) {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
if recvMsg.M3AggSig == nil || recvMsg.M3Bitmap == nil {
return nil, errors.New("[VerifyNewViewMsg] M3AggSig or M3Bitmap is nil")
}
senderKey := recvMsg.SenderPubkeys[0]
senderKeyStr := senderKey.Bytes.Hex()
// first time received the new view message
_, ok := vc.newViewMsg[recvMsg.ViewID]
if !ok {
newViewMap := map[string]uint64{}
vc.newViewMsg[recvMsg.ViewID] = newViewMap
}
_, ok = vc.newViewMsg[recvMsg.ViewID][senderKeyStr]
if ok {
vc.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MyBlockNum", vc.newViewMsg[recvMsg.ViewID][senderKeyStr]).
Msg("[VerifyNewViewMsg] redundant NewView Message")
}
vc.newViewMsg[recvMsg.ViewID][senderKeyStr] = recvMsg.BlockNum
m3Sig := recvMsg.M3AggSig
m3Mask := recvMsg.M3Bitmap
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDBytes) {
return nil, errors.New("[VerifyNewViewMsg] Unable to Verify Aggregated Signature of M3 (ViewID) payload")
}
m2Mask := recvMsg.M2Bitmap
if recvMsg.M2AggSig != nil {
m2Sig := recvMsg.M2AggSig
if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) {
return nil, errors.New("[VerifyNewViewMsg] Unable to Verify Aggregated Signature of M2 (NIL) payload")
}
}
// check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty
preparedBlock := &types.Block{}
if len(recvMsg.Payload) >= ValidPayloadLength && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
return nil, errors.New("[VerifyNewViewMsg] Unparseable prepared block data")
}
blockHash := recvMsg.Payload[:32]
preparedBlockHash := preparedBlock.Hash()
if !bytes.Equal(preparedBlockHash[:], blockHash) {
return nil, errors.New("[VerifyNewViewMsg] Prepared block hash doesn't match msg block hash")
}
if err := vc.blockVerifier(preparedBlock); err != nil {
return nil, errors.New("[VerifyNewViewMsg] Prepared block verification failed")
}
return preparedBlock, nil
}
return nil, nil
}
var (
errDupM1 = errors.New("received M1 (prepared) message already")
errDupM2 = errors.New("received M2 (NIL) message already")
errDupM3 = errors.New("received M3 (ViewID) message already")
errVerifyM1 = errors.New("failed to verfiy signature for M1 message")
errVerifyM2 = errors.New("failed to verfiy signature for M2 message")
errM1Payload = errors.New("failed to verify multi signature for M1 prepared payload")
errNoQuorum = errors.New("no quorum on M1 (prepared) payload")
errIncorrectSender = errors.New("view change msg must have only one sender")
)
// ProcessViewChangeMsg process the view change message after verification
func (vc *viewChange) ProcessViewChangeMsg(
fbftlog *FBFTLog,
decider quorum.Decider,
recvMsg *FBFTMessage,
) error {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
preparedBlock := &types.Block{}
if len(recvMsg.SenderPubkeys) != 1 {
return errNoQuorum
}
senderKey := recvMsg.SenderPubkeys[0]
senderKeyStr := senderKey.Bytes.Hex()
// check and add viewID (m3 type) message signature
if _, ok := vc.viewIDSigs[recvMsg.ViewID][senderKeyStr]; ok {
return errDupM3
}
if len(recvMsg.Payload) >= ValidPayloadLength && len(recvMsg.Block) != 0 {
if err := rlp.DecodeBytes(recvMsg.Block, preparedBlock); err != nil {
return err
}
if err := vc.blockVerifier(preparedBlock); err != nil {
return err
}
_, ok := vc.bhpSigs[recvMsg.ViewID][senderKeyStr]
if ok {
return errDupM1
}
if !recvMsg.ViewchangeSig.VerifyHash(senderKey.Object, recvMsg.Payload) {
return errVerifyM1
}
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := chain.ReadSignatureBitmapByPublicKeys(recvMsg.Payload[32:], decider.Participants())
if err != nil {
return err
}
if !decider.IsQuorumAchievedByMask(mask) {
return errNoQuorum
}
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
return errM1Payload
}
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M1 (prepared) type message")
vc.bhpSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewchangeSig
vc.bhpBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true) // Set the bitmap indicating that this validator signed.
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M3 (ViewID) type message")
vc.viewIDSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewidSig
// Set the bitmap indicating that this validator signed.
vc.viewIDBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true)
if vc.isM1PayloadEmpty() {
vc.m1Payload = append(recvMsg.Payload[:0:0], recvMsg.Payload...)
// create prepared message for new leader
preparedMsg := FBFTMessage{
MessageType: msg_pb.MessageType_PREPARED,
ViewID: recvMsg.ViewID,
BlockNum: recvMsg.BlockNum,
}
preparedMsg.BlockHash = common.Hash{}
copy(preparedMsg.BlockHash[:], recvMsg.Payload[:32])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{recvMsg.LeaderPubkey}
vc.getLogger().Info().Msg("[ProcessViewChangeMsg] New Leader Prepared Message Added")
fbftlog.AddMessage(&preparedMsg)
fbftlog.AddBlock(preparedBlock)
}
return nil
}
_, ok := vc.nilSigs[recvMsg.ViewID][senderKeyStr]
if ok {
return errDupM2
}
if !recvMsg.ViewchangeSig.VerifyHash(senderKey.Object, NIL) {
return errVerifyM2
}
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M2 (NIL) type message")
vc.nilSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewchangeSig
vc.nilBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true) // Set the bitmap indicating that this validator signed.
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M3 (ViewID) type message")
vc.viewIDSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewidSig
// Set the bitmap indicating that this validator signed.
vc.viewIDBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true)
return nil
}
// InitPayload do it once when validator received view change message
func (vc *viewChange) InitPayload(
fbftlog *FBFTLog,
viewID uint64,
blockNum uint64,
privKeys multibls.PrivateKeys,
) error {
vc.vcLock.Lock()
defer vc.vcLock.Unlock()
// m1 or m2 init once per viewID/key.
// m1 and m2 are mutually exclusive.
// If the node has valid prepared block, it will add m1 signature.
// If not, the node will add m2 signature.
// Honest node should have only one kind of m1/m2 signature.
inited := false
for _, key := range privKeys {
_, ok1 := vc.bhpSigs[viewID][key.Pub.Bytes.Hex()]
_, ok2 := vc.nilSigs[viewID][key.Pub.Bytes.Hex()]
if ok1 || ok2 {
inited = true
break
}
}
// add my own M1/M2 type message signature and bitmap
if !inited {
preparedMsgs := fbftlog.GetMessagesByTypeSeq(msg_pb.MessageType_PREPARED, blockNum)
preparedMsg := fbftlog.FindMessageByMaxViewID(preparedMsgs)
hasBlock := false
if preparedMsg != nil {
if preparedBlock := fbftlog.GetBlockByHash(preparedMsg.BlockHash); preparedBlock != nil {
if err := vc.blockVerifier(preparedBlock); err == nil {
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M1 (prepared) type messaage")
msgToSign := append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
for _, key := range privKeys {
if err := vc.bhpBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] bhpBitmap setkey failed")
continue
}
vc.bhpSigs[viewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(msgToSign)
}
hasBlock = true
// if m1Payload is empty, we just add one
if vc.isM1PayloadEmpty() {
vc.m1Payload = append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
}
}
}
}
if !hasBlock {
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M2 (NIL) type messaage")
for _, key := range privKeys {
if err := vc.nilBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] nilBitmap setkey failed")
continue
}
vc.nilSigs[viewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(NIL)
}
}
}
inited = false
for _, key := range privKeys {
_, ok3 := vc.viewIDSigs[viewID][key.Pub.Bytes.Hex()]
if ok3 {
inited = true
break
}
}
// add my own M3 type message signature and bitmap
if !inited {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, viewID)
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M3 (ViewID) type messaage")
for _, key := range privKeys {
if err := vc.viewIDBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] viewIDBitmap setkey failed")
continue
}
vc.viewIDSigs[viewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(viewIDBytes)
}
}
return nil
}
// isM1PayloadEmpty returns true if m1Payload is not set
func (vc *viewChange) isM1PayloadEmpty() bool {
return len(vc.m1Payload) == 0
}
// IsM1PayloadEmpty returns true if m1Payload is not set
func (vc *viewChange) IsM1PayloadEmpty() bool {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
return vc.isM1PayloadEmpty()
}
// GetViewIDBitmap returns the viewIDBitmap
func (vc *viewChange) GetViewIDBitmap(viewID uint64) *bls_cosi.Mask {
vc.vcLock.RLock()
defer vc.vcLock.RUnlock()
return vc.viewIDBitmap[viewID]
}
// GetM1Payload returns the m1Payload
func (vc *viewChange) GetM1Payload() []byte {
return vc.m1Payload
}
// getLogger returns logger for view change contexts added
func (vc *viewChange) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Str("context", "ViewChange").
Logger()
return &logger
}

@ -7,9 +7,13 @@ import (
"github.com/ethereum/go-ethereum/rlp"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/multibls"
"github.com/pkg/errors"
)
// construct the view change message
@ -36,7 +40,7 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra
var encodedBlock []byte
if preparedMsg != nil {
block := consensus.FBFTLog.GetBlockByHash(preparedMsg.BlockHash)
consensus.getLogger().Debug().
consensus.getLogger().Info().
Interface("Block", block).
Interface("preparedMsg", preparedMsg).
Msg("[constructViewChangeMessage] found prepared msg")
@ -65,9 +69,10 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra
vcMsg.PreparedBlock = encodedBlock
}
consensus.getLogger().Debug().
consensus.getLogger().Info().
Hex("m1Payload", vcMsg.Payload).
Str("pubKey", consensus.GetPublicKeys().SerializeToHexStr()).
Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()).
Str("SenderPubKey", priKey.Pub.Bytes.Hex()).
Msg("[constructViewChangeMessage]")
sign := priKey.Pri.SignHash(msgToSign)
@ -110,41 +115,133 @@ func (consensus *Consensus) constructNewViewMessage(viewID uint64, priKey *bls.P
}
vcMsg := message.GetViewchange()
vcMsg.Payload = consensus.m1Payload
if len(consensus.m1Payload) != 0 {
block := consensus.FBFTLog.GetBlockByHash(consensus.blockHash)
if block != nil {
encodedBlock, err := rlp.EncodeToBytes(block)
vcMsg.Payload, vcMsg.PreparedBlock = consensus.vc.GetPreparedBlock(consensus.FBFTLog, consensus.blockHash)
vcMsg.M2Aggsigs, vcMsg.M2Bitmap = consensus.vc.GetM2Bitmap(viewID)
vcMsg.M3Aggsigs, vcMsg.M3Bitmap = consensus.vc.GetM3Bitmap(viewID)
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message, priKey.Pri)
if err != nil {
consensus.getLogger().Err(err).Msg("[constructNewViewMessage] Failed encoding prepared block")
consensus.getLogger().Err(err).
Msg("[constructNewViewMessage] failed to sign and marshal the new view message")
}
if len(encodedBlock) != 0 {
vcMsg.PreparedBlock = encodedBlock
return proto.ConstructConsensusMessage(marshaledMessage)
}
var (
errNilMessage = errors.New("Nil protobuf message")
errIncorrectMessageType = errors.New("Incorrect message type")
)
// ParseViewChangeMessage parses view change message into FBFTMessage structure
func ParseViewChangeMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
if msg == nil {
return nil, errNilMessage
}
vcMsg := msg.GetViewchange()
FBFTMsg := FBFTMessage{
BlockNum: vcMsg.BlockNum,
ViewID: vcMsg.ViewId,
MessageType: msg.GetType(),
Payload: make([]byte, len(vcMsg.Payload)),
Block: make([]byte, len(vcMsg.PreparedBlock)),
}
if FBFTMsg.MessageType != msg_pb.MessageType_VIEWCHANGE {
return nil, errIncorrectMessageType
}
sig2arr := consensus.GetNilSigsArray(viewID)
consensus.getLogger().Debug().Int("len", len(sig2arr)).Msg("[constructNewViewMessage] M2 (NIL) type signatures")
if len(sig2arr) > 0 {
m2Sig := bls_cosi.AggregateSig(sig2arr)
vcMsg.M2Aggsigs = m2Sig.Serialize()
vcMsg.M2Bitmap = consensus.nilBitmap[viewID].Bitmap
copy(FBFTMsg.Block[:], vcMsg.PreparedBlock[:])
copy(FBFTMsg.Payload[:], vcMsg.Payload[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
return nil, err
}
leaderKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.LeaderPubkey)
if err != nil {
return nil, err
}
sig3arr := consensus.GetViewIDSigsArray(viewID)
consensus.getLogger().Debug().Int("len", len(sig3arr)).Msg("[constructNewViewMessage] M3 (ViewID) type signatures")
// even we check here for safty, m3 type signatures must >= 2f+1
if len(sig3arr) > 0 {
m3Sig := bls_cosi.AggregateSig(sig3arr)
vcMsg.M3Aggsigs = m3Sig.Serialize()
vcMsg.M3Bitmap = consensus.viewIDBitmap[viewID].Bitmap
vcSig := bls_core.Sign{}
err = vcSig.Deserialize(vcMsg.ViewchangeSig)
if err != nil {
return nil, err
}
FBFTMsg.ViewchangeSig = &vcSig
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message, priKey.Pri)
vcSig1 := bls_core.Sign{}
err = vcSig1.Deserialize(vcMsg.ViewidSig)
if err != nil {
consensus.getLogger().Err(err).
Msg("[constructNewViewMessage] failed to sign and marshal the new view message")
return nil, err
}
return proto.ConstructConsensusMessage(marshaledMessage)
FBFTMsg.ViewidSig = &vcSig1
FBFTMsg.SenderPubkeys = []*bls.PublicKeyWrapper{{Object: pubKey}}
copy(FBFTMsg.SenderPubkeys[0].Bytes[:], vcMsg.SenderPubkey[:])
FBFTMsg.LeaderPubkey = &bls.PublicKeyWrapper{Object: leaderKey}
copy(FBFTMsg.LeaderPubkey.Bytes[:], vcMsg.LeaderPubkey[:])
return &FBFTMsg, nil
}
// ParseNewViewMessage parses new view message into FBFTMessage structure
func ParseNewViewMessage(msg *msg_pb.Message, members multibls.PublicKeys) (*FBFTMessage, error) {
if msg == nil {
return nil, errNilMessage
}
vcMsg := msg.GetViewchange()
FBFTMsg := FBFTMessage{
BlockNum: vcMsg.BlockNum,
ViewID: vcMsg.ViewId,
MessageType: msg.GetType(),
Payload: make([]byte, len(vcMsg.Payload)),
Block: make([]byte, len(vcMsg.PreparedBlock)),
}
if FBFTMsg.MessageType != msg_pb.MessageType_NEWVIEW {
return nil, errIncorrectMessageType
}
copy(FBFTMsg.Payload[:], vcMsg.Payload[:])
copy(FBFTMsg.Block[:], vcMsg.PreparedBlock[:])
pubKey, err := bls_cosi.BytesToBLSPublicKey(vcMsg.SenderPubkey)
if err != nil {
return nil, err
}
FBFTMsg.SenderPubkeys = []*bls.PublicKeyWrapper{{Object: pubKey}}
copy(FBFTMsg.SenderPubkeys[0].Bytes[:], vcMsg.SenderPubkey[:])
if len(vcMsg.M3Aggsigs) > 0 {
m3Sig := bls_core.Sign{}
err = m3Sig.Deserialize(vcMsg.M3Aggsigs)
if err != nil {
return nil, err
}
m3mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
return nil, err
}
m3mask.SetMask(vcMsg.M3Bitmap)
FBFTMsg.M3AggSig = &m3Sig
FBFTMsg.M3Bitmap = m3mask
}
if len(vcMsg.M2Aggsigs) > 0 {
m2Sig := bls_core.Sign{}
err = m2Sig.Deserialize(vcMsg.M2Aggsigs)
if err != nil {
return nil, err
}
m2mask, err := bls_cosi.NewMask(members, nil)
if err != nil {
return nil, err
}
m2mask.SetMask(vcMsg.M2Bitmap)
FBFTMsg.M2AggSig = &m2Sig
FBFTMsg.M2Bitmap = m2mask
}
return &FBFTMsg, nil
}

@ -45,8 +45,6 @@ func TestPhaseSwitching(t *testing.T) {
assert.Equal(t, FBFTAnnounce, consensus.phase) // It's a new consensus, we should be at the FBFTAnnounce phase
override := false
switches := []phaseSwitch{
{start: FBFTAnnounce, end: FBFTPrepare},
{start: FBFTPrepare, end: FBFTCommit},
@ -54,13 +52,11 @@ func TestPhaseSwitching(t *testing.T) {
}
for _, sw := range switches {
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end, override)
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end)
}
override = true
for _, sw := range switches {
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end, override)
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end)
}
switches = []phaseSwitch{
@ -70,39 +66,19 @@ func TestPhaseSwitching(t *testing.T) {
}
for _, sw := range switches {
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end, override)
testPhaseGroupSwitching(t, consensus, phases, sw.start, sw.end)
}
}
func testPhaseGroupSwitching(t *testing.T, consensus *Consensus, phases []FBFTPhase, startPhase FBFTPhase, desiredPhase FBFTPhase, override bool) {
phaseMapping := make(map[FBFTPhase]bool)
if override {
func testPhaseGroupSwitching(t *testing.T, consensus *Consensus, phases []FBFTPhase, startPhase FBFTPhase, desiredPhase FBFTPhase) {
for range phases {
consensus.switchPhase(desiredPhase, override)
consensus.switchPhase("test", desiredPhase)
assert.Equal(t, desiredPhase, consensus.phase)
}
assert.Equal(t, desiredPhase, consensus.phase)
return
}
phaseMapping[FBFTAnnounce] = false
phaseMapping[FBFTPrepare] = false
phaseMapping[FBFTCommit] = false
phaseMapping[startPhase] = false
phaseMapping[desiredPhase] = true
assert.Equal(t, startPhase, consensus.phase)
for _, phase := range phases {
consensus.switchPhase(desiredPhase, override)
expected := phaseMapping[phase]
assert.Equal(t, expected, (phase == consensus.phase))
}
assert.Equal(t, desiredPhase, consensus.phase)
}
func TestGetNextLeaderKeyShouldFailForStandardGeneratedConsensus(t *testing.T) {
@ -111,7 +87,7 @@ func TestGetNextLeaderKeyShouldFailForStandardGeneratedConsensus(t *testing.T) {
// The below results in: "panic: runtime error: integer divide by zero"
// This happens because there's no check for if there are any participants or not in https://github.com/harmony-one/harmony/blob/main/consensus/quorum/quorum.go#L188-L197
assert.Panics(t, func() { consensus.GetNextLeaderKey() })
assert.Panics(t, func() { consensus.GetNextLeaderKey(uint64(1)) })
}
func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
@ -139,7 +115,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount())
consensus.LeaderPubKey = &wrappedBLSKeys[0]
nextKey := consensus.GetNextLeaderKey()
nextKey := consensus.GetNextLeaderKey(uint64(1))
assert.Equal(t, nextKey, &wrappedBLSKeys[1])
}

@ -922,6 +922,7 @@ func New(
node.BeaconBlockChannel = make(chan *types.Block)
txPoolConfig := core.DefaultTxPoolConfig
txPoolConfig.Blacklist = blacklist
txPoolConfig.Journal = fmt.Sprintf("%v/%v", node.NodeConfig.DBDir, txPoolConfig.Journal)
node.TxPool = core.NewTxPool(txPoolConfig, node.Blockchain().Config(), blockchain, node.TransactionErrorSink)
node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine)

@ -0,0 +1,29 @@
FROM golang:1.14
RUN apt update -y && apt upgrade -y
RUN apt install libgmp-dev libssl-dev curl git -y
ENV GOPATH=/root/go
ENV GO111MODULE=on
ENV HMY_PATH=${GOPATH}/src/github.com/harmony-one
RUN mkdir -p $HMY_PATH
WORKDIR $HMY_PATH
RUN git clone https://github.com/harmony-one/harmony.git
RUN git clone https://github.com/harmony-one/bls.git
RUN git clone https://github.com/harmony-one/mcl.git
WORKDIR $HMY_PATH/harmony
RUN make linux_static
RUN cp ./bin/harmony /root && chmod +x /root/harmony
WORKDIR $GOPATH
RUN rm -rf *
WORKDIR /root
COPY run.sh run.sh
ENTRYPOINT ["/bin/bash","/root/run.sh"]

@ -0,0 +1,127 @@
# Docker deployment of a Rosetta enabled Harmony node
## Docker Image
You can choose to build the docker image using the included Dockerfile with the following command:
```bash
docker build -t harmony-rosetta .
```
Or you can download/pull the image from dockerhub with the following command:
```bash
docker pull harmony-rosetta:latest
```
## Starting the node
You can start the node with the following command:
```bash
docker run -d -p 9700:9700 -v "$(pwd)/data:/root/data" harmony-rosetta --run.shard=0
```
> This command will create the container of the harmony node on shard 0 in the detached mode,
> binding port 9700 (the rosetta port) on the container to the host and mounting the shared
> `./data` directory on the host to `/root/data` on the container. Note that the container
> uses `/root/data` for all data storage (this is where the `harmony_db_*` directories will be stored).
You can view your container with the following command:
```bash
docker ps
```
You can ensure that your node is running with the following curl command:
```bash
curl -X POST --data '{
"network_identifier": {
"blockchain": "Harmony",
"network": "Mainnet",
"sub_network_identifier": {
"network": "shard 0",
"metadata": {
"is_beacon": true
}
}
}}' http://localhost:9700/network/status
```
Once can start the node in the offline mode with the following command:
```bash
docker run -d -p 9700:9700 -v "$(pwd)/data:/root/data" harmony-rosetta --run.shard=0 --run.offline
```
> The offline mode implies that the node will not connect to any p2p peer or sync.
## Stopping the node
First get your `CONTAINER ID` using the following command:
```bash
docker ps
```
> Note that if you do not see your node in the list, then your node is not running.
> You can verify this with the `docker ps -a` command.
Once you have your `CONTAINER ID`, you can stop it with the following command:
```bash
docker stop [CONTAINER ID]
```
## Details
**Note that all the arguments provided when running the docker img are immediately forwarded to the harmony node binary.**
> Note that the following args are **appended** to the provided arg when running the image:
> `--http.ip "0.0.0.0" --ws.ip "0.0.0.0" --http.rosetta --node_type "explorer" --datadir "./data" --log.dir "./data/logs"`.
> This effectively makes them args that you cannot change.
### Running the node on testnet
All the args on the image run are forwarded to the harmony node binary. Therefore, you can simply add `-n testnet` to
run the node for testnet. For example:
```bash
docker run -d -p 9700:9700 -v "$(pwd)/data:/root/data" harmony-rosetta --run.shard=0 -n testnet
```
### Running the node with the http RPC capabilities
Similar to running a node on testnet, once can simply add `--http` to enable the rpc server. Then you have to forward
the host port to the container's rpc server port.
```bash
docker run -d -p 9700:9700 -p 9500:9500 -v "$(pwd)/data:/root/data" harmony-rosetta --run.shard=0 -n testnet --http
```
### Running the node with the web socket RPC capabilities
Similar to running a node on testnet, once can simply add `--ws` to enable the rpc server. Then you have to forward
the host port to the container's rpc server port.
```bash
docker run -d -p 9700:9700 -p 9800:9900 -v "$(pwd)/data:/root/data" harmony-rosetta --run.shard=0 -n testnet --ws
```
### Running the node in non-archival mode
One can append `--run.archive=false` to the docker run command to run the node in non-archival mode. For example:
```bash
docker run -d -p 9700:9700 -v "$(pwd)/data:/root/data" harmony-rosetta --run.shard=0 -n testnet --run.archive=false
```
### Running a node with a rcloned DB
Note that all node data will be stored in the `/root/data` directory within the container. Therefore, you can rclone
the `harmony_db_*` directory to some directory (i.e: `./data`) and mount the volume on the docker run.
This way, the node will use DB in the volume that is shared between the container and host. For example:
```bash
docker run -d -p 9700:9700 -v "$(pwd)/data:/root/data" harmony-rosetta --run.shard=0
```
Note that the directory structure for `/root/data` (== `./data`) should look something like:
```
.
├── explorer_storage_127.0.0.1_9000
├── harmony_db_0
├── harmony_db_1
├── logs
│ ├── node_execution.log
│ └── zerolog-harmony.log
└── transactions.rlp
```
### Inspecting Logs
If you mount `./data` on the host to `/root/data` in the container, you van view the harmony node logs at
`./data/logs/` on your host machine.
### View rosetta request logs
You can view all the rosetta endpoint requests with the following command:
```bash
docker logs [CONTAINER ID]
```
> The `[CONTAINER ID]` can be found with this command: `docker ps`

@ -0,0 +1,11 @@
#!/usr/bin/env bash
set -e
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
DATA="$DIR/data"
LOGS="$DATA/logs"
BASE_ARGS=(--http.ip "0.0.0.0" --ws.ip "0.0.0.0" --http.rosetta --node_type "explorer" --datadir "$DATA" --log.dir "$LOGS")
mkdir -p "$LOGS"
echo -e NODE ARGS: \" "$@" "${BASE_ARGS[@]}" \"
"$DIR/harmony" "$@" "${BASE_ARGS[@]}"
Loading…
Cancel
Save