[viewchange] lock to process view change

Signed-off-by: Leo Chen <leo@harmony.one>
pull/3388/head
Leo Chen 4 years ago
parent 537ec78636
commit f8e2fd7120
  1. 64
      consensus/consensus_service.go
  2. 133
      consensus/view_change.go

@ -14,6 +14,7 @@ import (
"github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/block"
consensus_engine "github.com/harmony-one/harmony/consensus/engine" consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/consensus/signature"
bls_cosi "github.com/harmony-one/harmony/crypto/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/chain"
@ -504,6 +505,69 @@ func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) {
return return
} }
var (
errGetPreparedBlock = errors.New("failed to get prepared block for self commit")
errReadBitmapPayload = errors.New("failed to read signature bitmap payload")
)
// selfCommit will create a commit message and commit it locally
// it is used by the new leadder of the view change routine
// when view change is succeeded and the new leader
// received prepared payload from other validators or from local
func (consensus *Consensus) selfCommit(payload []byte) error {
var blockHash [32]byte
copy(blockHash[:], payload[:32])
// Leader sign and add commit message
block := consensus.FBFTLog.GetBlockByHash(blockHash)
if block == nil {
return errGetPreparedBlock
}
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32)
if err != nil {
return errReadBitmapPayload
}
// update consensus structure when succeeded
// protect consensus data update logic
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
copy(consensus.blockHash[:], blockHash[:])
consensus.switchPhase("selfCommit", FBFTCommit)
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader,
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64())
for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Error().
Err(err).
Int("Index", i).
Str("Key", key.Pub.Bytes.Hex()).
Msg("[selfCommit] New Leader commit bitmap set failed")
continue
}
if _, err := consensus.Decider.SubmitVote(
quorum.Commit,
key.Pub.Bytes,
key.Pri.SignHash(commitPayload),
common.BytesToHash(consensus.blockHash[:]),
block.NumberU64(),
block.Header().ViewID().Uint64(),
); err != nil {
consensus.getLogger().Warn().
Err(err).
Int("Index", i).
Str("Key", key.Pub.Bytes.Hex()).
Msg("[selfCommit] submit vote on viewchange commit failed")
}
}
return nil
}
// getLogger returns logger for consensus contexts added // getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger { func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With(). logger := utils.Logger().With().

@ -13,6 +13,7 @@ import (
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/pkg/errors"
) )
// MaxViewIDDiff limits the received view ID to only 249 further from the current view ID // MaxViewIDDiff limits the received view ID to only 249 further from the current view ID
@ -170,6 +171,39 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
} }
} }
// stopViewChange stops the current view change
func (consensus *Consensus) stopViewChange(viewID uint64, newLeaderPriKey *bls.PrivateKeyWrapper) error {
msgToSend := consensus.constructNewViewMessage(
viewID, newLeaderPriKey,
)
if err := consensus.msgSender.SendWithRetry(
consensus.blockNum,
msg_pb.MessageType_NEWVIEW,
[]nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend),
); err != nil {
return errors.New("failed to send out the NewView message")
}
consensus.getLogger().Info().
Str("myKey", newLeaderPriKey.Pub.Bytes.Hex()).
Hex("M1Payload", consensus.vc.GetM1Payload()).
Msg("[stopViewChange] Sent NewView Messge")
consensus.current.SetMode(Normal)
consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.SetViewIDs(viewID)
consensus.ResetViewChangeState()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().
Uint64("viewID", viewID).
Str("myKey", newLeaderPriKey.Pub.Bytes.Hex()).
Msg("[stopViewChange] viewChange stopped. I am the New Leader")
return nil
}
// onViewChange is called when the view change message is received. // onViewChange is called when the view change message is received.
func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.getLogger().Info().Msg("[onViewChange] Received ViewChange Message") consensus.getLogger().Info().Msg("[onViewChange] Received ViewChange Message")
@ -229,88 +263,34 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} }
// received enough view change messages, change state to normal consensus // received enough view change messages, change state to normal consensus
if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.IsViewChangingMode() {
consensus.current.SetMode(Normal) // no previous prepared message, go straight to normal mode
consensus.getLogger().Info().Msg("[onViewChange] View Change Message Quorum Reached") // and start proposing new block
consensus.LeaderPubKey = newLeaderKey
consensus.ResetState()
if consensus.vc.IsM1PayloadEmpty() { if consensus.vc.IsM1PayloadEmpty() {
// TODO(Chao): explain why ReadySignal is sent only in this case but not the other case. if err := consensus.stopViewChange(recvMsg.ViewID, newLeaderPriKey); err != nil {
// Make sure the newly proposed block have the correct view ID consensus.getLogger().Error().Err(err).Msg("[onViewChange] stopViewChange failed")
consensus.SetCurBlockViewID(recvMsg.ViewID)
go func() {
consensus.ReadySignal <- struct{}{}
}()
} else {
consensus.switchPhase("onViewChange", FBFTCommit)
payload := consensus.vc.GetM1Payload()
copy(consensus.blockHash[:], payload[:32])
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32)
if err != nil {
consensus.getLogger().Error().Err(err).
Msg("[onViewChange] ReadSignatureBitmapPayload Fail")
return
}
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
// Leader sign and add commit message
block := consensus.FBFTLog.GetBlockByHash(consensus.blockHash)
if block == nil {
consensus.getLogger().Warn().Msg("[onViewChange] failed to get prepared block for self commit")
return return
} }
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader, consensus.ResetState()
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64()) consensus.LeaderPubKey = newLeaderKey
for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Warn().Err(err).
Msgf("[OnViewChange] New Leader commit bitmap set failed for key at index %d", i)
continue
}
if _, err := consensus.Decider.SubmitVote(
quorum.Commit,
key.Pub.Bytes,
key.Pri.SignHash(commitPayload),
common.BytesToHash(consensus.blockHash[:]),
block.NumberU64(),
block.Header().ViewID().Uint64(),
); err != nil {
consensus.getLogger().Warn().Err(err).Msg("submit vote on viewchange commit failed")
return
}
} go func() {
consensus.ReadySignal <- struct{}{}
}()
return
} }
consensus.SetViewIDs(recvMsg.ViewID) payload := consensus.vc.GetM1Payload()
msgToSend := consensus.constructNewViewMessage( if err := consensus.selfCommit(payload); err != nil {
recvMsg.ViewID, newLeaderPriKey, consensus.getLogger().Error().Err(err).Msg("[onViewChange] self commit failed")
) return
if err := consensus.msgSender.SendWithRetry(
consensus.blockNum,
msg_pb.MessageType_NEWVIEW,
[]nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend),
); err != nil {
consensus.getLogger().Err(err).
Msg("could not send out the NEWVIEW message")
} }
consensus.getLogger().Info(). if err := consensus.stopViewChange(recvMsg.ViewID, newLeaderPriKey); err != nil {
Str("myKey", newLeaderKey.Bytes.Hex()). consensus.getLogger().Error().Err(err).Msg("[onViewChange] stopViewChange failed")
Hex("M1Payload", consensus.vc.GetM1Payload()). return
Msg("[onViewChange] Sent NewView Messge") }
consensus.ResetState()
consensus.ResetViewChangeState() consensus.LeaderPubKey = newLeaderKey
consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().
Str("myKey", newLeaderKey.Bytes.Hex()).
Msg("[onViewChange] I am the New Leader")
} }
} }
@ -372,9 +352,12 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
Msg("[onNewView] Failed to Verify Signature for M1 (prepare) message") Msg("[onNewView] Failed to Verify Signature for M1 (prepare) message")
return return
} }
consensus.mutex.Lock()
copy(consensus.blockHash[:], blockHash) copy(consensus.blockHash[:], blockHash)
consensus.aggregatedPrepareSig = aggSig consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask consensus.prepareBitmap = mask
consensus.mutex.Unlock()
// create prepared message from newview // create prepared message from newview
preparedMsg := FBFTMessage{ preparedMsg := FBFTMessage{
MessageType: msg_pb.MessageType_PREPARED, MessageType: msg_pb.MessageType_PREPARED,

Loading…
Cancel
Save