Refactored consensus.

feature/refactor-consensus-view
frozen 9 months ago
parent ca91cb22b6
commit f8071179ab
No known key found for this signature in database
GPG Key ID: 5391C63E79B03EDE
  1. 2
      consensus/consensus.go
  2. 32
      consensus/consensus_service.go
  3. 7
      consensus/consensus_test.go
  4. 15
      consensus/consensus_v2.go
  5. 58
      consensus/view_change.go
  6. 2
      go.mod
  7. 1
      go.sum

@ -300,7 +300,7 @@ func New(
// viewID has to be initialized as the height of
// the blockchain during initialization as it was
// displayed on explorer as Height right now
consensus.setCurBlockViewID(0)
consensus.current.setCurBlockViewID(0)
consensus.SlashChan = make(chan slash.Record)
consensus.readySignal = make(chan ProposalType)
consensus.commitSigChannel = make(chan []byte)

@ -5,6 +5,7 @@ import (
"sync/atomic"
"time"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
@ -301,11 +302,10 @@ func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offse
func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.updateConsensusInformation()
return consensus.updateConsensusInformation(consensus.Blockchain().CurrentHeader())
}
func (consensus *Consensus) updateConsensusInformation() Mode {
curHeader := consensus.Blockchain().CurrentHeader()
func (consensus *Consensus) updateConsensusInformation(curHeader *block.Header) Mode {
curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
@ -508,30 +508,8 @@ func (consensus *Consensus) SetViewIDs(height uint64) {
// SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) setViewIDs(height uint64) {
consensus.setCurBlockViewID(height)
consensus.setViewChangingID(height)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.setCurBlockViewID(viewID)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) SetViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) setViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
consensus.current.setCurBlockViewID(height)
consensus.current.setViewChangingID(height)
}
// StartFinalityCount set the finality counter to current time

@ -17,6 +17,13 @@ import (
"github.com/stretchr/testify/assert"
)
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.current.setCurBlockViewID(viewID)
}
func TestConsensusInitialization(t *testing.T) {
host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting()
assert.NoError(t, err)

@ -342,10 +342,11 @@ func (consensus *Consensus) StartChannel() {
func (consensus *Consensus) syncReadyChan() {
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
if consensus.getBlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 {
consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.setViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1)
mode := consensus.updateConsensusInformation()
h := consensus.Blockchain().CurrentHeader()
if consensus.getBlockNum() < h.Number().Uint64()+1 {
consensus.setBlockNum(h.Number().Uint64() + 1)
consensus.setViewIDs(h.ViewID().Uint64() + 1)
mode := consensus.updateConsensusInformation(h)
consensus.current.SetMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
@ -354,7 +355,7 @@ func (consensus *Consensus) syncReadyChan() {
} else if consensus.mode() == Syncing {
// Corner case where sync is triggered before `onCommitted` and there is a race
// for block insertion between consensus and downloader.
mode := consensus.updateConsensusInformation()
mode := consensus.updateConsensusInformation(consensus.Blockchain().CurrentHeader())
consensus.setMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
@ -793,7 +794,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK
// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.setCurBlockViewID(committedMsg.ViewID + 1)
consensus.current.setCurBlockViewID(committedMsg.ViewID + 1)
var epoch *big.Int
if blk.IsLastBlockInEpoch() {
epoch = new(big.Int).Add(blk.Epoch(), common.Big1)
@ -822,7 +823,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
// Update consensus keys at last so the change of leader status doesn't mess up normal flow
if blk.IsLastBlockInEpoch() {
consensus.setMode(consensus.updateConsensusInformation())
consensus.setMode(consensus.updateConsensusInformation(consensus.Blockchain().CurrentHeader()))
}
consensus.fBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.resetState()

@ -4,7 +4,9 @@ import (
"math/big"
"time"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/internal/chain"
"github.com/rs/zerolog"
"github.com/harmony-one/harmony/crypto/bls"
@ -53,25 +55,46 @@ func (pm *State) SetMode(s Mode) {
// GetCurBlockViewID return the current view id
func (pm *State) GetCurBlockViewID() uint64 {
return pm.getCurBlockViewID()
}
// GetCurBlockViewID return the current view id
func (pm *State) getCurBlockViewID() uint64 {
return pm.blockViewID
}
// SetCurBlockViewID sets the current view id
func (pm *State) SetCurBlockViewID(viewID uint64) uint64 {
// SetCurBlockViewID set the current view ID
func (pm *State) setCurBlockViewID(viewID uint64) uint64 {
pm.blockViewID = viewID
return pm.blockViewID
}
// SetCurBlockViewID sets the current view id
func (pm *State) SetCurBlockViewID(viewID uint64) uint64 {
return pm.setCurBlockViewID(viewID)
}
// GetViewChangingID return the current view changing id
// It is meaningful during view change mode
func (pm *State) GetViewChangingID() uint64 {
return pm.getViewChangingID()
}
// getViewChangingID return the current view changing id
// It is meaningful during view change mode
func (pm *State) getViewChangingID() uint64 {
return pm.viewChangingID
}
// SetViewChangingID set the current view change ID
func (consensus *State) setViewChangingID(viewID uint64) {
consensus.viewChangingID = viewID
}
// SetViewChangingID set the current view changing id
// It is meaningful during view change mode
func (pm *State) SetViewChangingID(id uint64) {
pm.viewChangingID = id
pm.setViewChangingID(id)
}
// GetViewChangeDuraion return the duration of the current view change
@ -87,12 +110,12 @@ func (pm *State) SetIsBackup(isBackup bool) {
// fallbackNextViewID return the next view ID and duration when there is an exception
// to calculate the time-based viewId
func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
func (consensus *State) fallbackNextViewID(logger *zerolog.Logger) (uint64, time.Duration) {
diff := int64(consensus.getViewChangingID() + 1 - consensus.getCurBlockViewID())
if diff <= 0 {
diff = int64(1)
}
consensus.getLogger().Error().
logger.Error().
Int64("diff", diff).
Msg("[fallbackNextViewID] use legacy viewID algorithm")
return consensus.getViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration))
@ -108,14 +131,9 @@ func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
// The view change duration is a fixed duration now to avoid stuck into offline nodes during
// the view change.
// viewID is only used as the fallback mechansim to determine the nextViewID
func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
// handle corner case at first
if consensus.Blockchain() == nil {
return consensus.fallbackNextViewID()
}
curHeader := consensus.Blockchain().CurrentHeader()
func (consensus *State) getNextViewID(curHeader *block.Header, logger *zerolog.Logger) (uint64, time.Duration) {
if curHeader == nil {
return consensus.fallbackNextViewID()
return consensus.fallbackNextViewID(logger)
}
blockTimestamp := curHeader.Time().Int64()
stuckBlockViewID := curHeader.ViewID().Uint64() + 1
@ -123,18 +141,18 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
// timestamp messed up in current validator node
if curTimestamp <= blockTimestamp {
consensus.getLogger().Error().
logger.Error().
Int64("curTimestamp", curTimestamp).
Int64("blockTimestamp", blockTimestamp).
Msg("[getNextViewID] timestamp of block too high")
return consensus.fallbackNextViewID()
return consensus.fallbackNextViewID(logger)
}
// diff only increases, since view change timeout is shorter than
// view change slot now, we want to make sure diff is always greater than 0
diff := uint64((curTimestamp-blockTimestamp)/viewChangeSlot + 1)
nextViewID := diff + stuckBlockViewID
consensus.getLogger().Info().
logger.Info().
Int64("curTimestamp", curTimestamp).
Int64("blockTimestamp", blockTimestamp).
Uint64("nextViewID", nextViewID).
@ -248,10 +266,12 @@ func (consensus *Consensus) startViewChange() {
consensus.consensusTimeout[timeoutConsensus].Stop()
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.current.SetMode(ViewChanging)
nextViewID, duration := consensus.getNextViewID()
consensus.setViewChangingID(nextViewID)
epoch := consensus.Blockchain().CurrentHeader().Epoch()
ss, err := consensus.Blockchain().ReadShardState(epoch)
// handle corner case at first
bc := consensus.Blockchain()
nextViewID, duration := consensus.current.getNextViewID(bc.CurrentHeader(), consensus.getLogger())
consensus.current.setViewChangingID(nextViewID)
epoch := bc.CurrentHeader().Epoch()
ss, err := bc.ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
return

@ -70,6 +70,7 @@ require (
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b
github.com/grafana/pyroscope-go v1.0.4
github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.2.3
github.com/ledgerwatch/erigon-lib v0.0.0-20230607152933-42c9c28cac68
github.com/ledgerwatch/log/v3 v3.8.0
github.com/olekukonko/tablewriter v0.0.5
@ -155,7 +156,6 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect

@ -1433,6 +1433,7 @@ github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q
github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=

Loading…
Cancel
Save