[viewchange] time-based synchronuous view change

For view change, the view change ID is the key to determine which node
will be next leader. In our original algorithm, the view ID always increased one step
at a time. The view change period increased exponetially based on the square of
the gap between current view changing ID and previous known block view change ID.

However, it is very slow to converge the view change if multiple nodes are offline and
view change can't be reached. Especially, during the shard down event, multiple nodes
are offline and other nodes have advanced their current view changing ID.

The new time-baed synchronuous view change algorithm uses the local timestamp and
the timestamp of the block to calculate the expected view changing ID. In this case,
offline nodes can immediately catch up with the latest view changing ID as long as
the offline nodes have the latest sync'ed block and relatively acturate local
clock. This algorithm will converge the view change faster in one or two view change
duration.

Signed-off-by: Leo Chen <leo@harmony.one>
pull/3386/head
Leo Chen 4 years ago
parent 046e87e9f8
commit 234f202656
  1. 2
      consensus/checks.go
  2. 4
      consensus/consensus_v2.go
  3. 112
      consensus/view_change.go
  4. 4
      consensus/view_change_test.go

@ -92,7 +92,7 @@ func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
"[OnAnnounce] Already in ViewChanging mode, conflicing announce, doing noop", "[OnAnnounce] Already in ViewChanging mode, conflicing announce, doing noop",
) )
} else { } else {
consensus.startViewChange(consensus.GetCurBlockViewID() + 1) consensus.startViewChange(consensus.GetCurBlockViewID())
} }
} }
consensus.getLogger().Debug(). consensus.getLogger().Debug().

@ -265,12 +265,12 @@ func (consensus *Consensus) Start(
if k != timeoutViewChange { if k != timeoutViewChange {
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!") consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!")
viewID := consensus.GetCurBlockViewID() viewID := consensus.GetCurBlockViewID()
consensus.startViewChange(viewID + 1) consensus.startViewChange(viewID)
break break
} else { } else {
consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!") consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!")
viewID := consensus.GetViewChangingID() viewID := consensus.GetViewChangingID()
consensus.startViewChange(viewID + 1) consensus.startViewChange(viewID)
break break
} }
} }

@ -90,20 +90,111 @@ func (pm *State) GetViewChangeDuraion() time.Duration {
return time.Duration(diff * diff * int64(viewChangeDuration)) return time.Duration(diff * diff * int64(viewChangeDuration))
} }
// GetNextLeaderKey uniquely determine who is the leader for given viewID // fallbackNextViewID return the next view ID and duration when there is an exception
func (consensus *Consensus) GetNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper { // to calculate the time-based viewId
func (consensus *Consensus) fallbackNextViewID(viewID uint64) (uint64, time.Duration) {
consensus.getLogger().Error().
Uint64("viewID", viewID).
Msg("[fallbackNextViewID] use legacy viewID algorithm")
if viewID < consensus.current.GetCurBlockViewID() {
return consensus.current.GetCurBlockViewID() + 1, time.Duration(int64(viewChangeDuration))
}
diff := int64(viewID + 1 - consensus.current.GetCurBlockViewID())
return viewID + 1, time.Duration(diff * diff * int64(viewChangeDuration))
}
// getNextViewID return the next view ID based on the timestamp
// The next view ID is calculated based on the difference of validator's timestamp
// and the block's timestamp. So that it can be deterministic to return the next view ID
// only based on the blockchain block and the validator's current timestamp.
// The next view ID is the single factor used to determine
// the next leader, so it is mod the number of nodes per shard.
// It returns the next viewID and duration of the view change
// The view change duration is a fixed duration now to avoid stuck into offline nodes during
// the view change.
// viewID is only used as the fallback mechansim to determine the nextViewID
func (consensus *Consensus) getNextViewID(viewID uint64) (uint64, time.Duration) {
// handle corner case at first
if consensus.ChainReader == nil {
return consensus.fallbackNextViewID(viewID)
}
curHeader := consensus.ChainReader.CurrentHeader()
if curHeader == nil {
return consensus.fallbackNextViewID(viewID)
}
blockTimestamp := curHeader.Time().Int64()
curTimestamp := time.Now().Unix()
// timestamp messed up in current validator node
if curTimestamp < blockTimestamp {
return consensus.fallbackNextViewID(viewID)
}
totalNode := consensus.Decider.ParticipantsCount()
// diff is at least 1, and it won't exceeded the totalNode
diff := uint64(((curTimestamp - blockTimestamp) / viewChangeTimeout) % int64(totalNode))
if diff == 0 {
diff = 1
}
nextViewID := diff + consensus.current.GetCurBlockViewID()
consensus.getLogger().Info().
Int64("curTimestamp", curTimestamp).
Int64("blockTimestamp", blockTimestamp).
Uint64("nextViewID", nextViewID).
Uint64("curViewID", consensus.current.GetCurBlockViewID()).
Msg("[getNextViewID]")
// duration is always the fixed view change duration for synchronous view change
return nextViewID, viewChangeDuration
}
// getNextLeaderKey uniquely determine who is the leader for given viewID
// It reads the current leader's pubkey based on the blockchain data and returns
// the next leader based on the gap of the viewID of the view change and the last
// know view id of the block.
func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper {
gap := 1 gap := 1
if viewID > consensus.GetCurBlockViewID() {
gap = int(viewID - consensus.GetCurBlockViewID())
}
var lastLeaderPubKey *bls.PublicKeyWrapper
var err error
if consensus.ChainReader == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] ChainReader is nil. Use consensus.LeaderPubKey")
lastLeaderPubKey = consensus.LeaderPubKey
} else {
curHeader := consensus.ChainReader.CurrentHeader()
if curHeader == nil {
consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain")
lastLeaderPubKey = consensus.LeaderPubKey
} else {
// this is the truth of the leader based on blockchain blocks
lastLeaderPubKey, err = consensus.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil || lastLeaderPubKey == nil {
consensus.getLogger().Error().Err(err).
Msg("[getNextLeaderKey] Unable to get leaderPubKey from coinbase. Set it to consensus.LeaderPubKey")
lastLeaderPubKey = consensus.LeaderPubKey
}
}
}
consensus.getLogger().Info(). consensus.getLogger().Info().
Str("lastLeaderPubKey", lastLeaderPubKey.Bytes.Hex()).
Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()). Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()).
Int("gap", gap).
Uint64("newViewID", viewID). Uint64("newViewID", viewID).
Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()). Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()).
Msg("[GetNextLeaderKey] got leaderPubKey from coinbase") Msg("[getNextLeaderKey] got leaderPubKey from coinbase")
wasFound, next := consensus.Decider.NthNext(consensus.LeaderPubKey, gap) wasFound, next := consensus.Decider.NthNext(lastLeaderPubKey, gap)
if !wasFound { if !wasFound {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Str("key", consensus.LeaderPubKey.Bytes.Hex()). Str("key", consensus.LeaderPubKey.Bytes.Hex()).
Msg("GetNextLeaderKey: currentLeaderKey not found") Msg("[getNextLeaderKey] currentLeaderKey not found")
} }
consensus.getLogger().Info().
Str("nextLeader", next.Bytes.Hex()).
Msg("[getNextLeaderKey] next Leader")
return next return next
} }
@ -124,12 +215,13 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
consensus.consensusTimeout[timeoutConsensus].Stop() consensus.consensusTimeout[timeoutConsensus].Stop()
consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.current.SetMode(ViewChanging) consensus.current.SetMode(ViewChanging)
consensus.SetViewChangingID(viewID) nextViewID, duration := consensus.getNextViewID(viewID)
consensus.LeaderPubKey = consensus.GetNextLeaderKey(viewID) consensus.SetViewChangingID(nextViewID)
consensus.LeaderPubKey = consensus.getNextLeaderKey(nextViewID)
duration := consensus.current.GetViewChangeDuraion()
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Uint64("viewID", viewID). Uint64("viewID", viewID).
Uint64("nextViewID", nextViewID).
Uint64("viewChangingID", consensus.GetViewChangingID()). Uint64("viewChangingID", consensus.GetViewChangingID()).
Dur("timeoutDuration", duration). Dur("timeoutDuration", duration).
Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()). Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()).
@ -139,12 +231,12 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
defer consensus.consensusTimeout[timeoutViewChange].Start() defer consensus.consensusTimeout[timeoutViewChange].Start()
// update the dictionary key if the viewID is first time received // update the dictionary key if the viewID is first time received
consensus.vc.AddViewIDKeyIfNotExist(viewID, consensus.Decider.Participants()) consensus.vc.AddViewIDKeyIfNotExist(nextViewID, consensus.Decider.Participants())
// init my own payload // init my own payload
if err := consensus.vc.InitPayload( if err := consensus.vc.InitPayload(
consensus.FBFTLog, consensus.FBFTLog,
viewID, nextViewID,
consensus.blockNum, consensus.blockNum,
consensus.priKey); err != nil { consensus.priKey); err != nil {
consensus.getLogger().Error().Err(err).Msg("Init Payload Error") consensus.getLogger().Error().Err(err).Msg("Init Payload Error")

@ -87,7 +87,7 @@ func TestGetNextLeaderKeyShouldFailForStandardGeneratedConsensus(t *testing.T) {
// The below results in: "panic: runtime error: integer divide by zero" // The below results in: "panic: runtime error: integer divide by zero"
// This happens because there's no check for if there are any participants or not in https://github.com/harmony-one/harmony/blob/main/consensus/quorum/quorum.go#L188-L197 // This happens because there's no check for if there are any participants or not in https://github.com/harmony-one/harmony/blob/main/consensus/quorum/quorum.go#L188-L197
assert.Panics(t, func() { consensus.GetNextLeaderKey(uint64(1)) }) assert.Panics(t, func() { consensus.getNextLeaderKey(uint64(1)) })
} }
func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
@ -115,7 +115,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount()) assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount())
consensus.LeaderPubKey = &wrappedBLSKeys[0] consensus.LeaderPubKey = &wrappedBLSKeys[0]
nextKey := consensus.GetNextLeaderKey(uint64(1)) nextKey := consensus.getNextLeaderKey(uint64(1))
assert.Equal(t, nextKey, &wrappedBLSKeys[1]) assert.Equal(t, nextKey, &wrappedBLSKeys[1])
} }

Loading…
Cancel
Save