Compare commits

...

2 Commits

Author SHA1 Message Date
frozen e0412ae06a
refactored. 9 months ago
frozen f8071179ab
Refactored consensus. 9 months ago
  1. 10
      cmd/harmony/main.go
  2. 7
      consensus/consensus.go
  3. 32
      consensus/consensus_service.go
  4. 7
      consensus/consensus_test.go
  5. 15
      consensus/consensus_v2.go
  6. 58
      consensus/view_change.go
  7. 2
      go.mod
  8. 1
      go.sum

@ -812,7 +812,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
registry = setupChain(hc, nodeConfig, registry) registry = setupChain(hc, nodeConfig, registry)
if registry.GetShardChainCollection() == nil { if registry.GetShardChainCollection() == nil {
panic("shard chain collection is nil1111111") panic("shard chain collection is nil")
} }
registry.SetWebHooks(nodeConfig.WebHooks.Hooks) registry.SetWebHooks(nodeConfig.WebHooks.Hooks)
cxPool := core.NewCxPool(core.CxPoolSize) cxPool := core.NewCxPool(core.CxPoolSize)
@ -862,14 +862,6 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(shard.BeaconChainShardID)) currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(shard.BeaconChainShardID))
currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey
// This needs to be executed after consensus setup
if err := currentConsensus.InitConsensusWithValidators(); err != nil {
utils.Logger().Warn().
Int("shardID", hc.General.ShardID).
Err(err).
Msg("InitConsensusWithMembers failed")
}
// Set the consensus ID to be the current block number // Set the consensus ID to be the current block number
viewID := currentNode.Blockchain().CurrentBlock().Header().ViewID().Uint64() viewID := currentNode.Blockchain().CurrentBlock().Header().ViewID().Uint64()
currentConsensus.SetViewIDs(viewID + 1) currentConsensus.SetViewIDs(viewID + 1)

@ -300,7 +300,7 @@ func New(
// viewID has to be initialized as the height of // viewID has to be initialized as the height of
// the blockchain during initialization as it was // the blockchain during initialization as it was
// displayed on explorer as Height right now // displayed on explorer as Height right now
consensus.setCurBlockViewID(0) consensus.current.setCurBlockViewID(0)
consensus.SlashChan = make(chan slash.Record) consensus.SlashChan = make(chan slash.Record)
consensus.readySignal = make(chan ProposalType) consensus.readySignal = make(chan ProposalType)
consensus.commitSigChannel = make(chan []byte) consensus.commitSigChannel = make(chan []byte)
@ -313,6 +313,11 @@ func New(
initMetrics() initMetrics()
consensus.AddPubkeyMetrics() consensus.AddPubkeyMetrics()
err := consensus.InitConsensusWithValidators()
if err != nil {
return nil, errors.WithMessage(err, "failed to init consensus with validators")
}
return &consensus, nil return &consensus, nil
} }

@ -5,6 +5,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/bls"
@ -301,11 +302,10 @@ func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offse
func (consensus *Consensus) UpdateConsensusInformation() Mode { func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
return consensus.updateConsensusInformation() return consensus.updateConsensusInformation(consensus.Blockchain().CurrentHeader())
} }
func (consensus *Consensus) updateConsensusInformation() Mode { func (consensus *Consensus) updateConsensusInformation(curHeader *block.Header) Mode {
curHeader := consensus.Blockchain().CurrentHeader()
curEpoch := curHeader.Epoch() curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1) nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
@ -508,30 +508,8 @@ func (consensus *Consensus) SetViewIDs(height uint64) {
// SetViewIDs set both current view ID and view changing ID to the height // SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state // of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) setViewIDs(height uint64) { func (consensus *Consensus) setViewIDs(height uint64) {
consensus.setCurBlockViewID(height) consensus.current.setCurBlockViewID(height)
consensus.setViewChangingID(height) consensus.current.setViewChangingID(height)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.setCurBlockViewID(viewID)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) SetViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) setViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
} }
// StartFinalityCount set the finality counter to current time // StartFinalityCount set the finality counter to current time

@ -17,6 +17,13 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.current.setCurBlockViewID(viewID)
}
func TestConsensusInitialization(t *testing.T) { func TestConsensusInitialization(t *testing.T) {
host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting() host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting()
assert.NoError(t, err) assert.NoError(t, err)

@ -342,10 +342,11 @@ func (consensus *Consensus) StartChannel() {
func (consensus *Consensus) syncReadyChan() { func (consensus *Consensus) syncReadyChan() {
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
if consensus.getBlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 { h := consensus.Blockchain().CurrentHeader()
consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) if consensus.getBlockNum() < h.Number().Uint64()+1 {
consensus.setViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1) consensus.setBlockNum(h.Number().Uint64() + 1)
mode := consensus.updateConsensusInformation() consensus.setViewIDs(h.ViewID().Uint64() + 1)
mode := consensus.updateConsensusInformation(h)
consensus.current.SetMode(mode) consensus.current.SetMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
@ -354,7 +355,7 @@ func (consensus *Consensus) syncReadyChan() {
} else if consensus.mode() == Syncing { } else if consensus.mode() == Syncing {
// Corner case where sync is triggered before `onCommitted` and there is a race // Corner case where sync is triggered before `onCommitted` and there is a race
// for block insertion between consensus and downloader. // for block insertion between consensus and downloader.
mode := consensus.updateConsensusInformation() mode := consensus.updateConsensusInformation(consensus.Blockchain().CurrentHeader())
consensus.setMode(mode) consensus.setMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
@ -793,7 +794,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK
// SetupForNewConsensus sets the state for new consensus // SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.setCurBlockViewID(committedMsg.ViewID + 1) consensus.current.setCurBlockViewID(committedMsg.ViewID + 1)
var epoch *big.Int var epoch *big.Int
if blk.IsLastBlockInEpoch() { if blk.IsLastBlockInEpoch() {
epoch = new(big.Int).Add(blk.Epoch(), common.Big1) epoch = new(big.Int).Add(blk.Epoch(), common.Big1)
@ -822,7 +823,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
// Update consensus keys at last so the change of leader status doesn't mess up normal flow // Update consensus keys at last so the change of leader status doesn't mess up normal flow
if blk.IsLastBlockInEpoch() { if blk.IsLastBlockInEpoch() {
consensus.setMode(consensus.updateConsensusInformation()) consensus.setMode(consensus.updateConsensusInformation(consensus.Blockchain().CurrentHeader()))
} }
consensus.fBFTLog.PruneCacheBeforeBlock(blk.NumberU64()) consensus.fBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.resetState() consensus.resetState()

@ -4,7 +4,9 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/chain"
"github.com/rs/zerolog"
"github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/bls"
@ -53,25 +55,46 @@ func (pm *State) SetMode(s Mode) {
// GetCurBlockViewID return the current view id // GetCurBlockViewID return the current view id
func (pm *State) GetCurBlockViewID() uint64 { func (pm *State) GetCurBlockViewID() uint64 {
return pm.getCurBlockViewID()
}
// GetCurBlockViewID return the current view id
func (pm *State) getCurBlockViewID() uint64 {
return pm.blockViewID return pm.blockViewID
} }
// SetCurBlockViewID sets the current view id // SetCurBlockViewID set the current view ID
func (pm *State) SetCurBlockViewID(viewID uint64) uint64 { func (pm *State) setCurBlockViewID(viewID uint64) uint64 {
pm.blockViewID = viewID pm.blockViewID = viewID
return pm.blockViewID return pm.blockViewID
} }
// SetCurBlockViewID sets the current view id
func (pm *State) SetCurBlockViewID(viewID uint64) uint64 {
return pm.setCurBlockViewID(viewID)
}
// GetViewChangingID return the current view changing id // GetViewChangingID return the current view changing id
// It is meaningful during view change mode // It is meaningful during view change mode
func (pm *State) GetViewChangingID() uint64 { func (pm *State) GetViewChangingID() uint64 {
return pm.getViewChangingID()
}
// getViewChangingID return the current view changing id
// It is meaningful during view change mode
func (pm *State) getViewChangingID() uint64 {
return pm.viewChangingID return pm.viewChangingID
} }
// SetViewChangingID set the current view change ID
func (consensus *State) setViewChangingID(viewID uint64) {
consensus.viewChangingID = viewID
}
// SetViewChangingID set the current view changing id // SetViewChangingID set the current view changing id
// It is meaningful during view change mode // It is meaningful during view change mode
func (pm *State) SetViewChangingID(id uint64) { func (pm *State) SetViewChangingID(id uint64) {
pm.viewChangingID = id pm.setViewChangingID(id)
} }
// GetViewChangeDuraion return the duration of the current view change // GetViewChangeDuraion return the duration of the current view change
@ -87,12 +110,12 @@ func (pm *State) SetIsBackup(isBackup bool) {
// fallbackNextViewID return the next view ID and duration when there is an exception // fallbackNextViewID return the next view ID and duration when there is an exception
// to calculate the time-based viewId // to calculate the time-based viewId
func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { func (consensus *State) fallbackNextViewID(logger *zerolog.Logger) (uint64, time.Duration) {
diff := int64(consensus.getViewChangingID() + 1 - consensus.getCurBlockViewID()) diff := int64(consensus.getViewChangingID() + 1 - consensus.getCurBlockViewID())
if diff <= 0 { if diff <= 0 {
diff = int64(1) diff = int64(1)
} }
consensus.getLogger().Error(). logger.Error().
Int64("diff", diff). Int64("diff", diff).
Msg("[fallbackNextViewID] use legacy viewID algorithm") Msg("[fallbackNextViewID] use legacy viewID algorithm")
return consensus.getViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration)) return consensus.getViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration))
@ -108,14 +131,9 @@ func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
// The view change duration is a fixed duration now to avoid stuck into offline nodes during // The view change duration is a fixed duration now to avoid stuck into offline nodes during
// the view change. // the view change.
// viewID is only used as the fallback mechansim to determine the nextViewID // viewID is only used as the fallback mechansim to determine the nextViewID
func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { func (consensus *State) getNextViewID(curHeader *block.Header, logger *zerolog.Logger) (uint64, time.Duration) {
// handle corner case at first
if consensus.Blockchain() == nil {
return consensus.fallbackNextViewID()
}
curHeader := consensus.Blockchain().CurrentHeader()
if curHeader == nil { if curHeader == nil {
return consensus.fallbackNextViewID() return consensus.fallbackNextViewID(logger)
} }
blockTimestamp := curHeader.Time().Int64() blockTimestamp := curHeader.Time().Int64()
stuckBlockViewID := curHeader.ViewID().Uint64() + 1 stuckBlockViewID := curHeader.ViewID().Uint64() + 1
@ -123,18 +141,18 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
// timestamp messed up in current validator node // timestamp messed up in current validator node
if curTimestamp <= blockTimestamp { if curTimestamp <= blockTimestamp {
consensus.getLogger().Error(). logger.Error().
Int64("curTimestamp", curTimestamp). Int64("curTimestamp", curTimestamp).
Int64("blockTimestamp", blockTimestamp). Int64("blockTimestamp", blockTimestamp).
Msg("[getNextViewID] timestamp of block too high") Msg("[getNextViewID] timestamp of block too high")
return consensus.fallbackNextViewID() return consensus.fallbackNextViewID(logger)
} }
// diff only increases, since view change timeout is shorter than // diff only increases, since view change timeout is shorter than
// view change slot now, we want to make sure diff is always greater than 0 // view change slot now, we want to make sure diff is always greater than 0
diff := uint64((curTimestamp-blockTimestamp)/viewChangeSlot + 1) diff := uint64((curTimestamp-blockTimestamp)/viewChangeSlot + 1)
nextViewID := diff + stuckBlockViewID nextViewID := diff + stuckBlockViewID
consensus.getLogger().Info(). logger.Info().
Int64("curTimestamp", curTimestamp). Int64("curTimestamp", curTimestamp).
Int64("blockTimestamp", blockTimestamp). Int64("blockTimestamp", blockTimestamp).
Uint64("nextViewID", nextViewID). Uint64("nextViewID", nextViewID).
@ -248,10 +266,12 @@ func (consensus *Consensus) startViewChange() {
consensus.consensusTimeout[timeoutConsensus].Stop() consensus.consensusTimeout[timeoutConsensus].Stop()
consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.current.SetMode(ViewChanging) consensus.current.SetMode(ViewChanging)
nextViewID, duration := consensus.getNextViewID() // handle corner case at first
consensus.setViewChangingID(nextViewID) bc := consensus.Blockchain()
epoch := consensus.Blockchain().CurrentHeader().Epoch() nextViewID, duration := consensus.current.getNextViewID(bc.CurrentHeader(), consensus.getLogger())
ss, err := consensus.Blockchain().ReadShardState(epoch) consensus.current.setViewChangingID(nextViewID)
epoch := bc.CurrentHeader().Epoch()
ss, err := bc.ReadShardState(epoch)
if err != nil { if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state") utils.Logger().Error().Err(err).Msg("Failed to read shard state")
return return

@ -70,6 +70,7 @@ require (
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b
github.com/grafana/pyroscope-go v1.0.4 github.com/grafana/pyroscope-go v1.0.4
github.com/holiman/bloomfilter/v2 v2.0.3 github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.2.3
github.com/ledgerwatch/erigon-lib v0.0.0-20230607152933-42c9c28cac68 github.com/ledgerwatch/erigon-lib v0.0.0-20230607152933-42c9c28cac68
github.com/ledgerwatch/log/v3 v3.8.0 github.com/ledgerwatch/log/v3 v3.8.0
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
@ -155,7 +156,6 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e // indirect github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/huin/goupnp v1.3.0 // indirect github.com/huin/goupnp v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-cid v0.4.1 // indirect

@ -1433,6 +1433,7 @@ github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q
github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=

Loading…
Cancel
Save