diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 6a6da898e..ff9a07bd2 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -14,6 +14,7 @@ import ( "github.com/harmony-one/harmony/block" consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/consensus/signature" bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/internal/chain" @@ -504,6 +505,69 @@ func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) { return } +var ( + errGetPreparedBlock = errors.New("failed to get prepared block for self commit") + errReadBitmapPayload = errors.New("failed to read signature bitmap payload") +) + +// selfCommit will create a commit message and commit it locally +// it is used by the new leadder of the view change routine +// when view change is succeeded and the new leader +// received prepared payload from other validators or from local +func (consensus *Consensus) selfCommit(payload []byte) error { + var blockHash [32]byte + copy(blockHash[:], payload[:32]) + + // Leader sign and add commit message + block := consensus.FBFTLog.GetBlockByHash(blockHash) + if block == nil { + return errGetPreparedBlock + } + + aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32) + if err != nil { + return errReadBitmapPayload + } + + // update consensus structure when succeeded + // protect consensus data update logic + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + + copy(consensus.blockHash[:], blockHash[:]) + consensus.switchPhase("selfCommit", FBFTCommit) + consensus.aggregatedPrepareSig = aggSig + consensus.prepareBitmap = mask + commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, + block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64()) + for i, key := range consensus.priKey { + if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil { + consensus.getLogger().Error(). + Err(err). + Int("Index", i). + Str("Key", key.Pub.Bytes.Hex()). + Msg("[selfCommit] New Leader commit bitmap set failed") + continue + } + + if _, err := consensus.Decider.SubmitVote( + quorum.Commit, + key.Pub.Bytes, + key.Pri.SignHash(commitPayload), + common.BytesToHash(consensus.blockHash[:]), + block.NumberU64(), + block.Header().ViewID().Uint64(), + ); err != nil { + consensus.getLogger().Warn(). + Err(err). + Int("Index", i). + Str("Key", key.Pub.Bytes.Hex()). + Msg("[selfCommit] submit vote on viewchange commit failed") + } + } + return nil +} + // getLogger returns logger for consensus contexts added func (consensus *Consensus) getLogger() *zerolog.Logger { logger := utils.Logger().With(). diff --git a/consensus/view_change.go b/consensus/view_change.go index 8e5fa759e..9553e31a4 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -13,6 +13,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" + "github.com/pkg/errors" ) // MaxViewIDDiff limits the received view ID to only 249 further from the current view ID @@ -170,6 +171,39 @@ func (consensus *Consensus) startViewChange(viewID uint64) { } } +// stopViewChange stops the current view change +func (consensus *Consensus) stopViewChange(viewID uint64, newLeaderPriKey *bls.PrivateKeyWrapper) error { + msgToSend := consensus.constructNewViewMessage( + viewID, newLeaderPriKey, + ) + if err := consensus.msgSender.SendWithRetry( + consensus.blockNum, + msg_pb.MessageType_NEWVIEW, + []nodeconfig.GroupID{ + nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, + p2p.ConstructMessage(msgToSend), + ); err != nil { + return errors.New("failed to send out the NewView message") + } + consensus.getLogger().Info(). + Str("myKey", newLeaderPriKey.Pub.Bytes.Hex()). + Hex("M1Payload", consensus.vc.GetM1Payload()). + Msg("[stopViewChange] Sent NewView Messge") + + consensus.current.SetMode(Normal) + consensus.consensusTimeout[timeoutViewChange].Stop() + consensus.SetViewIDs(viewID) + consensus.ResetViewChangeState() + consensus.consensusTimeout[timeoutConsensus].Start() + + consensus.getLogger().Info(). + Uint64("viewID", viewID). + Str("myKey", newLeaderPriKey.Pub.Bytes.Hex()). + Msg("[stopViewChange] viewChange stopped. I am the New Leader") + + return nil +} + // onViewChange is called when the view change message is received. func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { consensus.getLogger().Info().Msg("[onViewChange] Received ViewChange Message") @@ -229,88 +263,34 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { } // received enough view change messages, change state to normal consensus - if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { - consensus.current.SetMode(Normal) - consensus.getLogger().Info().Msg("[onViewChange] View Change Message Quorum Reached") - consensus.LeaderPubKey = newLeaderKey - consensus.ResetState() + if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.IsViewChangingMode() { + // no previous prepared message, go straight to normal mode + // and start proposing new block if consensus.vc.IsM1PayloadEmpty() { - // TODO(Chao): explain why ReadySignal is sent only in this case but not the other case. - // Make sure the newly proposed block have the correct view ID - consensus.SetCurBlockViewID(recvMsg.ViewID) - go func() { - consensus.ReadySignal <- struct{}{} - }() - } else { - consensus.switchPhase("onViewChange", FBFTCommit) - payload := consensus.vc.GetM1Payload() - copy(consensus.blockHash[:], payload[:32]) - aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32) - - if err != nil { - consensus.getLogger().Error().Err(err). - Msg("[onViewChange] ReadSignatureBitmapPayload Fail") - return - } - - consensus.aggregatedPrepareSig = aggSig - consensus.prepareBitmap = mask - // Leader sign and add commit message - block := consensus.FBFTLog.GetBlockByHash(consensus.blockHash) - if block == nil { - consensus.getLogger().Warn().Msg("[onViewChange] failed to get prepared block for self commit") + if err := consensus.stopViewChange(recvMsg.ViewID, newLeaderPriKey); err != nil { + consensus.getLogger().Error().Err(err).Msg("[onViewChange] stopViewChange failed") return } - commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, - block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64()) - for i, key := range consensus.priKey { - if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil { - consensus.getLogger().Warn().Err(err). - Msgf("[OnViewChange] New Leader commit bitmap set failed for key at index %d", i) - continue - } - - if _, err := consensus.Decider.SubmitVote( - quorum.Commit, - key.Pub.Bytes, - key.Pri.SignHash(commitPayload), - common.BytesToHash(consensus.blockHash[:]), - block.NumberU64(), - block.Header().ViewID().Uint64(), - ); err != nil { - consensus.getLogger().Warn().Err(err).Msg("submit vote on viewchange commit failed") - return - } + consensus.ResetState() + consensus.LeaderPubKey = newLeaderKey - } + go func() { + consensus.ReadySignal <- struct{}{} + }() + return } - consensus.SetViewIDs(recvMsg.ViewID) - msgToSend := consensus.constructNewViewMessage( - recvMsg.ViewID, newLeaderPriKey, - ) - - if err := consensus.msgSender.SendWithRetry( - consensus.blockNum, - msg_pb.MessageType_NEWVIEW, - []nodeconfig.GroupID{ - nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, - p2p.ConstructMessage(msgToSend), - ); err != nil { - consensus.getLogger().Err(err). - Msg("could not send out the NEWVIEW message") + payload := consensus.vc.GetM1Payload() + if err := consensus.selfCommit(payload); err != nil { + consensus.getLogger().Error().Err(err).Msg("[onViewChange] self commit failed") + return } - consensus.getLogger().Info(). - Str("myKey", newLeaderKey.Bytes.Hex()). - Hex("M1Payload", consensus.vc.GetM1Payload()). - Msg("[onViewChange] Sent NewView Messge") - - consensus.ResetViewChangeState() - consensus.consensusTimeout[timeoutViewChange].Stop() - consensus.consensusTimeout[timeoutConsensus].Start() - consensus.getLogger().Info(). - Str("myKey", newLeaderKey.Bytes.Hex()). - Msg("[onViewChange] I am the New Leader") + if err := consensus.stopViewChange(recvMsg.ViewID, newLeaderPriKey); err != nil { + consensus.getLogger().Error().Err(err).Msg("[onViewChange] stopViewChange failed") + return + } + consensus.ResetState() + consensus.LeaderPubKey = newLeaderKey } } @@ -372,9 +352,12 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) { Msg("[onNewView] Failed to Verify Signature for M1 (prepare) message") return } + consensus.mutex.Lock() copy(consensus.blockHash[:], blockHash) consensus.aggregatedPrepareSig = aggSig consensus.prepareBitmap = mask + consensus.mutex.Unlock() + // create prepared message from newview preparedMsg := FBFTMessage{ MessageType: msg_pb.MessageType_PREPARED,