Improve staged stream sync, fix the devnet node stuck issue (#4467)

* add error desc in validateNewBlock to help to identify validation issues

* let stream sync downloader continues loop even if error occured

* pass consensus to stream sync through downloader

* add last mile functions to stream sync

* add if to check invalid block revert

* add last mile stage to stream sync

* goimports

* improve stream sync downloader loop to block redundant calls

* move startSyncing out of the shortrange loop

* goimports

* fix sync loop go routine

* remove extra log

* add debug mode to stream sync help debugging syncing issues

* fix stream sync loop channels

* add streamFailed function to short range helper to avoid removing of the healthy streams

* remove execution of stage bodies, stage lastmile, stage short range and stage state for epoch chain

* refactor stage epoch

* add debug logs

* goimports

* add a few debug log to stage short range

* doSync returns estimated height as well

* only switch to short range if the current block number is very close to the chain current height

* stream sync gets UseMemDb from config file

* goimports

* only flag failed streams rather than removing them in stream sync

* if stage blocks progress behind current head, remove block cache

* add rollback to short range

* refactor stage last mile blocks, add roll back to this stage

* improve addConsensusLastMile

* goimports

* fix log spell error

* improve revert function, no need to revert if hashes are empty

* fix switch to short range by removing extra condition

* add donC chan size

* refactor downloader loop mechanism

* use atomic flag rather than done channel in downloader loop

* no need for fail stream in epoch sync

* ignore context timeout

* add mux lock to get access to last mile blocks

* remove atomic flag for downloader loop

* a few improvements on staged sync, check addedBn before switch to short range

* goimports

* fix consensus catchup issue

* fix panic issue from runnig sync loop while stream sync is runing

* goimports

* add two more logs to staged sync

* remove extra debug logs, add more file logs for stream sync

* add comment for DebugMode

* improve the byte comparison for getBlockFromLastMileBlocksByParentHash function
pull/4487/head
Gheis Mohammadi 1 year ago committed by GitHub
parent 764474acae
commit b093fea169
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      api/service/stagedstreamsync/const.go
  2. 10
      api/service/stagedstreamsync/default_stages.go
  3. 39
      api/service/stagedstreamsync/downloader.go
  4. 5
      api/service/stagedstreamsync/downloaders.go
  5. 5
      api/service/stagedstreamsync/service.go
  6. 6
      api/service/stagedstreamsync/short_range_helper.go
  7. 10
      api/service/stagedstreamsync/stage_bodies.go
  8. 36
      api/service/stagedstreamsync/stage_epoch.go
  9. 109
      api/service/stagedstreamsync/stage_lastmile.go
  10. 35
      api/service/stagedstreamsync/stage_short_range.go
  11. 12
      api/service/stagedstreamsync/stage_state.go
  12. 200
      api/service/stagedstreamsync/staged_stream_sync.go
  13. 1
      api/service/stagedstreamsync/stages.go
  14. 139
      api/service/stagedstreamsync/syncing.go
  15. 7
      api/service/stagedsync/stagedsync.go
  16. 2
      api/service/stagedsync/syncing.go
  17. 1
      cmd/harmony/default.go
  18. 7
      cmd/harmony/main.go
  19. 2
      consensus/validator.go
  20. 1
      internal/configs/harmony/harmony.go
  21. 1
      internal/configs/node/config.go
  22. 3
      node/node_syncing.go
  23. 4
      p2p/stream/protocols/sync/protocol.go

@ -14,7 +14,10 @@ const (
BlockByHashesUpperCap int = 10 // number of get blocks by hashes upper cap
BlockByHashesLowerCap int = 3 // number of get blocks by hashes lower cap
LastMileBlocksThreshold int = 10
LastMileBlocksThreshold int = 10
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
VerifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
LastMileBlocksSize = 50
// SoftQueueCap is the soft cap of size in resultQueue. When the queue size is larger than this limit,
// no more request will be assigned to workers to wait for InsertChain to finish.
@ -50,8 +53,15 @@ type (
// config for beacon config
BHConfig *BeaconHelperConfig
// use memory db
UseMemDB bool
// log the stage progress
LogProgress bool
// logs every single process and error to help debugging stream sync
// DebugMode is not accessible to the end user and is only an aid for development
DebugMode bool
}
// BeaconHelperConfig is the extra config used for beaconHelper which uses

@ -15,11 +15,13 @@ var DefaultForwardOrder = ForwardOrder{
BlockBodies,
// Stages below don't use Internet
States,
LastMile,
Finish,
}
var DefaultRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
@ -29,6 +31,7 @@ var DefaultRevertOrder = RevertOrder{
var DefaultCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
@ -42,6 +45,7 @@ func DefaultStages(ctx context.Context,
srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg,
statesCfg StageStatesCfg,
lastMileCfg StageLastMileCfg,
finishCfg StageFinishCfg,
) []*Stage {
@ -50,6 +54,7 @@ func DefaultStages(ctx context.Context,
handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg)
return []*Stage{
@ -78,6 +83,11 @@ func DefaultStages(ctx context.Context,
Description: "Update Blockchain State",
Handler: handlerStageStates,
},
{
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",
Handler: handlerStageLastMile,
},
{
ID: Finish,
Description: "Finalize Changes",

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/rs/zerolog"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
@ -37,7 +38,7 @@ type (
)
// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode bool, config Config) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues()
sp := sync.NewProtocol(sync.Config{
@ -67,8 +68,8 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode
ctx, cancel := context.WithCancel(context.Background())
//TODO: use mem db should be in config file
stagedSyncInstance, err := CreateStagedSync(ctx, bc, dbDir, false, isBeaconNode, sp, config, logger, config.LogProgress)
// create an instance of staged sync for the downloader
stagedSyncInstance, err := CreateStagedSync(ctx, bc, consensus, dbDir, isBeaconNode, sp, config, logger)
if err != nil {
cancel()
return nil
@ -189,6 +190,7 @@ func (d *Downloader) waitForBootFinish() {
func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
// for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
@ -208,7 +210,8 @@ func (d *Downloader) loop() {
go trigger()
case <-d.downloadC:
addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {
@ -216,13 +219,14 @@ func (d *Downloader) loop() {
// if many streams couldn't solve it, then that's an unresolvable bad block
if numTriedStreams >= d.config.InitStreams {
if !d.stagedSyncInstance.invalidBlock.IsLogged {
fmt.Println("unresolvable bad block:", d.stagedSyncInstance.invalidBlock.Number)
d.logger.Error().
Uint64("bad block number", d.stagedSyncInstance.invalidBlock.Number).
Msg(WrapStagedSyncMsg("unresolvable bad block"))
d.stagedSyncInstance.invalidBlock.IsLogged = true
}
//TODO: if we don't have any new or untried stream in the list, sleep or panic
}
}
// If any error happens, sleep 5 seconds and retry
d.logger.Error().
Err(err).
@ -242,16 +246,27 @@ func (d *Downloader) loop() {
Uint32("shard", d.bc.ShardID()).
Msg(WrapStagedSyncMsg("sync finished"))
}
// If block number has been changed, trigger another sync
if addedBN != 0 {
// If block number has been changed, trigger another sync
go trigger()
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
}
}
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
// if last doSync needed only to add a few blocks less than LastMileBlocksThreshold and
// the node is fully synced now, then switch to short range
// the reason why we need to check distanceBeforeSync is because, if it was long distance,
// very likely, there are a couple of new blocks have been added to the other nodes which
// we should still stay in long range and check them.
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
if estimatedHeight > 0 && addedBN > 0 &&
distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
}
initSync = false
case <-d.closeC:
return

@ -2,6 +2,7 @@ package stagedstreamsync
import (
"github.com/harmony-one/abool"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/p2p"
)
@ -15,7 +16,7 @@ type Downloaders struct {
}
// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config Config) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
@ -25,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config C
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, dbDir, isBeaconNode, config)
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
}
return &Downloaders{
ds: ds,

@ -1,6 +1,7 @@
package stagedstreamsync
import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/p2p"
)
@ -11,9 +12,9 @@ type StagedStreamSyncService struct {
}
// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, config Config, dbDir string) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, dbDir, config),
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
}
}

@ -207,6 +207,12 @@ func (sh *srHelper) removeStreams(sts []sttypes.StreamID) {
}
}
func (sh *srHelper) streamsFailed(sts []sttypes.StreamID, reason string) {
for _, st := range sts {
sh.syncProtocol.StreamFailed(st, reason)
}
}
// blameAllStreams only not to blame all whitelisted streams when the it's not the last block signature verification failed.
func (sh *srHelper) blameAllStreams(blocks types.Blocks, errIndex int, err error) bool {
if errors.As(err, &emptySigVerifyErr) && errIndex == len(blocks)-1 {

@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
)
@ -60,6 +61,11 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return nil
}
// shouldn't execute for epoch chain
if b.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
maxHeight := s.state.status.targetBN
currentHead := b.configs.bc.CurrentBlock().NumberU64()
if currentHead >= maxHeight {
@ -77,7 +83,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return errV
}
if currProgress == 0 {
if currProgress <= currentHead {
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}
@ -209,7 +215,7 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err
isOneOfTheBadStreams := false
for _, id := range s.state.invalidBlock.StreamID {
if id == stid {
b.configs.protocol.RemoveStream(stid)
b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed")
isOneOfTheBadStreams = true
break
}

@ -5,6 +5,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
@ -51,9 +52,12 @@ func (sr *StageEpoch) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
n, err := sr.doShortRangeSyncForEpochSync(ctx, s)
s.state.inserted = n
if err != nil {
utils.Logger().Info().Err(err).Msg("short range for epoch sync failed")
return err
}
if n > 0 {
utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("epoch sync short range blocks inserted successfully")
}
useInternalTx := tx == nil
if useInternalTx {
var err error
@ -108,30 +112,13 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage
return 0, nil
}
////////////////////////////////////////////////////////
hashChain, whitelist, err := sh.getHashChain(ctx, bns)
blocks, streamID, err := sh.getBlocksChain(ctx, bns)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return 0, nil
}
return 0, errors.Wrap(err, "getHashChain")
}
if len(hashChain) == 0 {
// short circuit for no sync is needed
return 0, nil
}
blocks, streamID, err := sh.getBlocksByHashes(ctx, hashChain, whitelist)
if err != nil {
utils.Logger().Warn().Err(err).Msg("epoch sync getBlocksByHashes failed")
if !errors.Is(err, context.Canceled) {
sh.removeStreams(whitelist) // Remote nodes cannot provide blocks with target hashes
}
return 0, errors.Wrap(err, "epoch sync getBlocksByHashes")
}
///////////////////////////////////////////////////////
// TODO: check this
// blocks, streamID, err := sh.getBlocksChain(bns)
// if err != nil {
// return 0, errors.Wrap(err, "getHashChain")
// }
///////////////////////////////////////////////////////
if len(blocks) == 0 {
// short circuit for no sync is needed
return 0, nil
@ -141,12 +128,9 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage
numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n))
if err != nil {
utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("Insert block failed")
sh.removeStreams(streamID) // Data provided by remote nodes is corrupted
sh.streamsFailed([]sttypes.StreamID{streamID}, "corrupted data")
return n, err
}
if n > 0 {
utils.Logger().Info().Int("blocks inserted", n).Msg("Insert block success")
}
return n, nil
}

@ -0,0 +1,109 @@
package stagedstreamsync
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
)
type StageLastMile struct {
configs StageLastMileCfg
}
type StageLastMileCfg struct {
ctx context.Context
bc core.BlockChain
db kv.RwDB
}
func NewStageLastMile(cfg StageLastMileCfg) *StageLastMile {
return &StageLastMile{
configs: cfg,
}
}
func NewStageLastMileCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageLastMileCfg {
return StageLastMileCfg{
ctx: ctx,
bc: bc,
db: db,
}
}
func (lm *StageLastMile) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// no need to download the last mile blocks if we are redoing the stages because of bad block
if invalidBlockRevert {
return nil
}
// shouldn't execute for epoch chain
if lm.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
bc := lm.configs.bc
// update last mile blocks if any
parentHash := bc.CurrentBlock().Hash()
var hashes []common.Hash
for {
block := s.state.getBlockFromLastMileBlocksByParentHash(parentHash)
if block == nil {
break
}
err = s.state.UpdateBlockAndStatus(block, bc, false)
if err != nil {
s.state.RollbackLastMileBlocks(ctx, hashes)
return err
}
hashes = append(hashes, block.Hash())
parentHash = block.Hash()
}
s.state.purgeLastMileBlocksFromCache()
return nil
}
func (lm *StageLastMile) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = lm.configs.db.BeginRw(lm.configs.ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if err = u.Done(tx); err != nil {
return err
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
func (lm *StageLastMile) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = lm.configs.db.BeginRw(lm.configs.ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}

@ -44,6 +44,7 @@ func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlo
return nil
}
// shouldn't execute for epoch chain
if sr.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
@ -52,8 +53,12 @@ func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlo
n, err := sr.doShortRangeSync(ctx, s)
s.state.inserted = n
if err != nil {
utils.Logger().Info().Err(err).Msg("short range sync failed")
return err
}
if n > 0 {
utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("short range blocks inserted successfully")
}
useInternalTx := tx == nil
if useInternalTx {
@ -98,6 +103,9 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
blkNums := sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err := sh.getHashChain(ctx, blkNums)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return 0, nil
}
return 0, errors.Wrap(err, "getHashChain")
}
@ -114,37 +122,34 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
s.state.status.setTargetBN(expEndBN)
s.state.status.startSyncing()
defer func() {
utils.Logger().Info().Msg("short range finished syncing")
s.state.status.finishSyncing()
}()
blocks, stids, err := sh.getBlocksByHashes(ctx, hashChain, whitelist)
if err != nil {
utils.Logger().Warn().Err(err).Msg("getBlocksByHashes failed")
if !errors.Is(err, context.Canceled) {
sh.removeStreams(whitelist) // Remote nodes cannot provide blocks with target hashes
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return 0, errors.Wrap(err, "getBlocksByHashes")
}
return 0, errors.Wrap(err, "getBlocksByHashes")
sh.streamsFailed(whitelist, "remote nodes cannot provide blocks with target hashes")
}
utils.Logger().Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result")
n, err := verifyAndInsertBlocks(sr.configs.bc, blocks)
numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n))
if err != nil {
utils.Logger().Warn().Err(err).Int("blocks inserted", n).Msg("Insert block failed")
// rollback all added new blocks
if rbErr := sr.configs.bc.Rollback(hashChain); rbErr != nil {
utils.Logger().Error().Err(rbErr).Msg("short range failed to rollback")
return 0, rbErr
}
// fail streams
if sh.blameAllStreams(blocks, n, err) {
sh.removeStreams(whitelist) // Data provided by remote nodes is corrupted
sh.streamsFailed(whitelist, "data provided by remote nodes is corrupted")
} else {
// It is the last block gives a wrong commit sig. Blame the provider of the last block.
st2Blame := stids[len(stids)-1]
sh.removeStreams([]sttypes.StreamID{st2Blame})
sh.streamsFailed([]sttypes.StreamID{st2Blame}, "the last block provided by stream gives a wrong commit sig")
}
return n, err
return 0, err
}
utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("Insert block success")
return n, nil
}

@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
)
@ -57,6 +58,11 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return nil
}
// shouldn't execute for epoch chain
if stg.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
maxHeight := s.state.status.targetBN
currentHead := stg.configs.bc.CurrentBlock().NumberU64()
if currentHead >= maxHeight {
@ -144,8 +150,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
if block.NumberU64() != i {
s.state.protocol.StreamFailed(streamID, "invalid block with unmatched number is received from stream")
invalidBlockHash := block.Hash()
reverter.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), i, invalidBlockHash, streamID)
if !invalidBlockRevert {
invalidBlockHash := block.Hash()
reverter.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), i, invalidBlockHash, streamID)
}
return ErrInvalidBlockNumber
}

@ -8,11 +8,16 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/utils"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
@ -54,19 +59,22 @@ func (ib *InvalidBlock) addBadStream(bsID sttypes.StreamID) {
}
type StagedStreamSync struct {
bc core.BlockChain
isBeacon bool
isExplorer bool
db kv.RwDB
protocol syncProtocol
isBeaconNode bool
gbm *blockDownloadManager // initialized when finished get block number
inserted int
config Config
logger zerolog.Logger
status *status //TODO: merge this with currentSyncCycle
initSync bool // if sets to true, node start long range syncing
UseMemDB bool
bc core.BlockChain
consensus *consensus.Consensus
isBeacon bool
isExplorer bool
db kv.RwDB
protocol syncProtocol
isBeaconNode bool
gbm *blockDownloadManager // initialized when finished get block number
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
lastMileMux sync.Mutex
inserted int
config Config
logger zerolog.Logger
status *status //TODO: merge this with currentSyncCycle
initSync bool // if sets to true, node start long range syncing
UseMemDB bool
revertPoint *uint64 // used to run stages
prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon)
@ -249,12 +257,12 @@ func (s *StagedStreamSync) cleanUp(ctx context.Context, fromStage int, db kv.RwD
// New creates a new StagedStreamSync instance
func New(
bc core.BlockChain,
consensus *consensus.Consensus,
db kv.RwDB,
stagesList []*Stage,
isBeacon bool,
protocol syncProtocol,
isBeaconNode bool,
useMemDB bool,
config Config,
logger zerolog.Logger,
) *StagedStreamSync {
@ -286,22 +294,24 @@ func New(
status := newStatus()
return &StagedStreamSync{
bc: bc,
isBeacon: isBeacon,
db: db,
protocol: protocol,
isBeaconNode: isBeaconNode,
gbm: nil,
status: &status,
inserted: 0,
config: config,
logger: logger,
stages: stagesList,
currentStage: 0,
revertOrder: revertStages,
pruningOrder: pruneStages,
logPrefixes: logPrefixes,
UseMemDB: useMemDB,
bc: bc,
consensus: consensus,
isBeacon: isBeacon,
db: db,
protocol: protocol,
isBeaconNode: isBeaconNode,
lastMileBlocks: []*types.Block{},
gbm: nil,
status: &status,
inserted: 0,
config: config,
logger: logger,
stages: stagesList,
currentStage: 0,
revertOrder: revertStages,
pruningOrder: pruneStages,
logPrefixes: logPrefixes,
UseMemDB: config.UseMemDB,
}
}
@ -583,3 +593,133 @@ func (s *StagedStreamSync) EnableStages(ids ...SyncStageID) {
}
}
}
func (ss *StagedStreamSync) purgeLastMileBlocksFromCache() {
ss.lastMileMux.Lock()
ss.lastMileBlocks = nil
ss.lastMileMux.Unlock()
}
// AddLastMileBlock adds the latest a few block into queue for syncing
// only keep the latest blocks with size capped by LastMileBlocksSize
func (ss *StagedStreamSync) AddLastMileBlock(block *types.Block) {
ss.lastMileMux.Lock()
defer ss.lastMileMux.Unlock()
if ss.lastMileBlocks != nil {
if len(ss.lastMileBlocks) >= LastMileBlocksSize {
ss.lastMileBlocks = ss.lastMileBlocks[1:]
}
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
}
}
func (ss *StagedStreamSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block {
ss.lastMileMux.Lock()
defer ss.lastMileMux.Unlock()
for _, block := range ss.lastMileBlocks {
ph := block.ParentHash()
if ph == parentHash {
return block
}
}
return nil
}
func (ss *StagedStreamSync) addConsensusLastMile(bc core.BlockChain, cs *consensus.Consensus) ([]common.Hash, error) {
curNumber := bc.CurrentBlock().NumberU64()
var hashes []common.Hash
err := cs.GetLastMileBlockIter(curNumber+1, func(blockIter *consensus.LastMileBlockIter) error {
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
hashes = append(hashes, block.Header().Hash())
}
return nil
})
return hashes, err
}
func (ss *StagedStreamSync) RollbackLastMileBlocks(ctx context.Context, hashes []common.Hash) error {
if len(hashes) == 0 {
return nil
}
utils.Logger().Info().
Interface("block", ss.bc.CurrentBlock()).
Msg("[STAGED_STREAM_SYNC] Rolling back last mile blocks")
if err := ss.bc.Rollback(hashes); err != nil {
utils.Logger().Error().Err(err).
Msg("[STAGED_STREAM_SYNC] failed to rollback last mile blocks")
return err
}
return nil
}
// UpdateBlockAndStatus updates block and its status in db
func (ss *StagedStreamSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error {
if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 {
utils.Logger().Debug().
Uint64("curBlockNum", bc.CurrentBlock().NumberU64()).
Uint64("receivedBlockNum", block.NumberU64()).
Msg("[STAGED_STREAM_SYNC] Inappropriate block number, ignore!")
return nil
}
haveCurrentSig := len(block.GetCurrentCommitSig()) != 0
// Verify block signatures
if block.NumberU64() > 1 {
// Verify signature every N blocks (which N is verifyHeaderBatchSize and can be adjusted in configs)
verifySeal := block.NumberU64()%VerifyHeaderBatchSize == 0 || verifyAllSig
verifyCurrentSig := verifyAllSig && haveCurrentSig
if verifyCurrentSig {
sig, bitmap, err := chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig())
if err != nil {
return errors.Wrap(err, "parse commitSigAndBitmap")
}
startTime := time.Now()
if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sig, bitmap); err != nil {
return errors.Wrapf(err, "verify header signature %v", block.Hash().String())
}
utils.Logger().Debug().
Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()).
Msg("[STAGED_STREAM_SYNC] VerifyHeaderSignature")
}
err := bc.Engine().VerifyHeader(bc, block.Header(), verifySeal)
if err == engine.ErrUnknownAncestor {
return nil
} else if err != nil {
utils.Logger().Error().
Err(err).
Uint64("block number", block.NumberU64()).
Msgf("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: failed verifying signatures for new block")
return err
}
}
_, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */)
if err != nil {
utils.Logger().Error().
Err(err).
Uint64("block number", block.NumberU64()).
Uint32("shard", block.ShardID()).
Msgf("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: Error adding new block to blockchain")
return err
}
utils.Logger().Info().
Uint64("blockHeight", block.NumberU64()).
Uint64("blockEpoch", block.Epoch().Uint64()).
Str("blockHex", block.Hash().Hex()).
Uint32("ShardID", block.ShardID()).
Msg("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: New Block Added to Blockchain")
for i, tx := range block.StakingTransactions() {
utils.Logger().Info().Msgf("StakingTxn %d: %s, %v", i, tx.StakingType().String(), tx.StakingMessage())
}
return nil
}

@ -13,6 +13,7 @@ const (
SyncEpoch SyncStageID = "SyncEpoch" // epoch sync
BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
States SyncStageID = "States" // will construct most recent state from downloaded blocks
LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well
Finish SyncStageID = "Finish" // Nominal stage after all other stages
)

@ -4,13 +4,15 @@ import (
"context"
"fmt"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
@ -38,41 +40,53 @@ var Buckets = []string{
// CreateStagedSync creates an instance of staged sync
func CreateStagedSync(ctx context.Context,
bc core.BlockChain,
consensus *consensus.Consensus,
dbDir string,
UseMemDB bool,
isBeaconNode bool,
protocol syncProtocol,
config Config,
logger zerolog.Logger,
logProgress bool,
) (*StagedStreamSync, error) {
isBeacon := bc.ShardID() == shard.BeaconChainShardID
logger.Info().
Uint32("shard", bc.ShardID()).
Bool("beaconNode", isBeaconNode).
Bool("memdb", config.UseMemDB).
Str("dbDir", dbDir).
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("creating staged sync"))
var mainDB kv.RwDB
dbs := make([]kv.RwDB, config.Concurrency)
if UseMemDB {
mainDB = memdb.New(getMemDbTempPath(dbDir, -1))
if config.UseMemDB {
mainDB = memdb.New(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir))
for i := 0; i < config.Concurrency; i++ {
dbs[i] = memdb.New(getMemDbTempPath(dbDir, i))
dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir)
dbs[i] = memdb.New(dbPath)
}
} else {
mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(isBeacon, -1, dbDir)).MustOpen()
logger.Info().
Str("path", getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).
Msg(WrapStagedSyncMsg("creating main db"))
mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).MustOpen()
for i := 0; i < config.Concurrency; i++ {
dbPath := getBlockDbPath(isBeacon, i, dbDir)
dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir)
dbs[i] = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
}
}
if errInitDB := initDB(ctx, mainDB, dbs, config.Concurrency); errInitDB != nil {
logger.Error().Err(errInitDB).Msg("create staged sync instance failed")
return nil, errInitDB
}
stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeacon, logProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, logProgress)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)
lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB)
stageFinishCfg := NewStageFinishCfg(mainDB)
stages := DefaultStages(ctx,
@ -81,17 +95,27 @@ func CreateStagedSync(ctx context.Context,
stageShortRangeCfg,
stageBodiesCfg,
stageStatesCfg,
lastMileCfg,
stageFinishCfg,
)
logger.Info().
Uint32("shard", bc.ShardID()).
Bool("beaconNode", isBeaconNode).
Bool("memdb", config.UseMemDB).
Str("dbDir", dbDir).
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("staged sync created successfully"))
return New(
bc,
consensus,
mainDB,
stages,
isBeacon,
isBeaconNode,
protocol,
isBeaconNode,
UseMemDB,
config,
logger,
), nil
@ -147,7 +171,7 @@ func getMemDbTempPath(dbDir string, dbIndex int) string {
}
// getBlockDbPath returns the path of the cache database which stores blocks
func getBlockDbPath(beacon bool, loopID int, dbDir string) string {
func getBlockDbPath(shardID uint32, beacon bool, loopID int, dbDir string) string {
if beacon {
if loopID >= 0 {
return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/beacon_blocks_db"), loopID)
@ -156,30 +180,57 @@ func getBlockDbPath(beacon bool, loopID int, dbDir string) string {
}
} else {
if loopID >= 0 {
return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/blocks_db"), loopID)
return fmt.Sprintf("%s_%d_%d", filepath.Join(dbDir, "cache/blocks_db"), shardID, loopID)
} else {
return filepath.Join(dbDir, "cache/blocks_db_main")
return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/blocks_db_main"), shardID)
}
}
}
func (s *StagedStreamSync) Debug(source string, msg interface{}) {
// only log the msg in debug mode
if !s.config.DebugMode {
return
}
pc, _, _, _ := runtime.Caller(1)
caller := runtime.FuncForPC(pc).Name()
callerParts := strings.Split(caller, "/")
if len(callerParts) > 0 {
caller = callerParts[len(callerParts)-1]
}
src := source
if src == "" {
src = "message"
}
// SSSD: STAGED STREAM SYNC DEBUG
if msg == nil {
fmt.Printf("[SSSD:%s] %s: nil or no error\n", caller, src)
} else if err, ok := msg.(error); ok {
fmt.Printf("[SSSD:%s] %s: %s\n", caller, src, err.Error())
} else if str, ok := msg.(string); ok {
fmt.Printf("[SSSD:%s] %s: %s\n", caller, src, str)
} else {
fmt.Printf("[SSSD:%s] %s: %v\n", caller, src, msg)
}
}
// doSync does the long range sync.
// One LongRangeSync consists of several iterations.
// For each iteration, estimate the current block number, then fetch block & insert to blockchain
func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (int, error) {
func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (uint64, int, error) {
var totalInserted int
s.initSync = initSync
if err := s.checkPrerequisites(); err != nil {
return 0, err
return 0, 0, err
}
var estimatedHeight uint64
if initSync {
if h, err := s.estimateCurrentNumber(downloaderContext); err != nil {
return 0, err
return 0, 0, err
} else {
estimatedHeight = h
//TODO: use directly currentCycle var
@ -187,36 +238,70 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
}
if curBN := s.bc.CurrentBlock().NumberU64(); estimatedHeight <= curBN {
s.logger.Info().Uint64("current number", curBN).Uint64("target number", estimatedHeight).
Msg(WrapStagedSyncMsg("early return of long range sync"))
return 0, nil
Msg(WrapStagedSyncMsg("early return of long range sync (chain is already ahead of target height)"))
return estimatedHeight, 0, nil
}
s.startSyncing()
defer s.finishSyncing()
}
s.startSyncing()
defer s.finishSyncing()
for {
ctx, cancel := context.WithCancel(downloaderContext)
n, err := s.doSyncCycle(ctx, initSync)
if err != nil {
utils.Logger().Error().
Err(err).
Bool("initSync", s.initSync).
Bool("isBeacon", s.isBeacon).
Uint32("shard", s.bc.ShardID()).
Msg(WrapStagedSyncMsg("sync cycle failed"))
pl := s.promLabels()
pl["error"] = err.Error()
numFailedDownloadCounterVec.With(pl).Inc()
cancel()
return totalInserted + n, err
return estimatedHeight, totalInserted + n, err
}
cancel()
totalInserted += n
// if it's not long range sync, skip loop
if n < LastMileBlocksThreshold || !initSync {
return totalInserted, nil
if n == 0 || !initSync {
break
}
}
if totalInserted > 0 {
utils.Logger().Info().
Bool("initSync", s.initSync).
Bool("isBeacon", s.isBeacon).
Uint32("shard", s.bc.ShardID()).
Int("blocks", totalInserted).
Msg(WrapStagedSyncMsg("sync cycle blocks inserted successfully"))
}
// add consensus last mile blocks
if s.consensus != nil {
if hashes, err := s.addConsensusLastMile(s.Blockchain(), s.consensus); err != nil {
utils.Logger().Error().Err(err).
Msg("[STAGED_STREAM_SYNC] Add consensus last mile failed")
s.RollbackLastMileBlocks(downloaderContext, hashes)
return estimatedHeight, totalInserted, err
} else {
totalInserted += len(hashes)
}
// TODO: move this to explorer handler code.
if s.isExplorer {
s.consensus.UpdateConsensusInformation()
}
}
s.purgeLastMileBlocksFromCache()
return estimatedHeight, totalInserted, nil
}
func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int, error) {

@ -83,6 +83,9 @@ type StagedSync struct {
StagedSyncTurboMode bool
// log the full sync progress in console
LogProgress bool
// log every single process and error to help to debug the syncing
// DebugMode is not accessible to the end user and is only an aid for development
DebugMode bool
}
// BlockWithSig the serialization structure for request DownloaderRequest_BLOCKWITHSIG
@ -258,7 +261,8 @@ func New(ctx context.Context,
verifyAllSig bool,
verifyHeaderBatchSize uint64,
insertChainBatchSize int,
logProgress bool) *StagedSync {
logProgress bool,
debugMode bool) *StagedSync {
revertStages := make([]*Stage, len(stagesList))
for i, stageIndex := range revertOrder {
@ -312,6 +316,7 @@ func New(ctx context.Context,
VerifyHeaderBatchSize: verifyHeaderBatchSize,
InsertChainBatchSize: insertChainBatchSize,
LogProgress: logProgress,
DebugMode: debugMode,
}
}

@ -64,6 +64,7 @@ func CreateStagedSync(
verifyHeaderBatchSize uint64,
insertChainBatchSize int,
logProgress bool,
debugMode bool,
) (*StagedSync, error) {
ctx := context.Background()
@ -134,6 +135,7 @@ func CreateStagedSync(
verifyHeaderBatchSize,
insertChainBatchSize,
logProgress,
debugMode,
), nil
}

@ -178,6 +178,7 @@ var defaultStagedSyncConfig = harmonyconfig.StagedSyncConfig{
MaxMemSyncCycleSize: 1024, // max number of blocks to use a single transaction for staged sync
UseMemDB: true, // it uses memory by default. set it to false to use disk
LogProgress: false, // log the full sync progress in console
DebugMode: false, // log every single process and error to help to debug the syncing (DebugMode is not accessible to the end user and is only an aid for development)
}
var (

@ -606,6 +606,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
nodeConfig.VerifyHeaderBatchSize = hc.Sync.StagedSyncCfg.VerifyHeaderBatchSize
nodeConfig.InsertChainBatchSize = hc.Sync.StagedSyncCfg.InsertChainBatchSize
nodeConfig.LogProgress = hc.Sync.StagedSyncCfg.LogProgress
nodeConfig.DebugMode = hc.Sync.StagedSyncCfg.DebugMode
// P2P private key is used for secure message transfer between p2p nodes.
nodeConfig.P2PPriKey, _, err = utils.LoadKeyFromFile(hc.P2P.KeyFile)
if err != nil {
@ -942,7 +943,9 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
SmHardLowCap: hc.Sync.DiscHardLowCap,
SmHiCap: hc.Sync.DiscHighCap,
SmDiscBatch: hc.Sync.DiscBatch,
LogProgress: node.NodeConfig.LogProgress,
UseMemDB: hc.Sync.StagedSyncCfg.UseMemDB,
LogProgress: hc.Sync.StagedSyncCfg.LogProgress,
DebugMode: hc.Sync.StagedSyncCfg.DebugMode,
}
// If we are running side chain, we will need to do some extra works for beacon
@ -954,7 +957,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
}
}
//Setup stream sync service
s := stagedstreamsync.NewService(host, blockchains, sConfig, hc.General.DataDir)
s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir)
node.RegisterService(service.StagedStreamSync, s)

@ -132,7 +132,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block
if err := consensus.verifyBlock(&blockObj); err != nil {
consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed")
return nil, errors.New("Block verification failed")
return nil, errors.Errorf("Block verification failed: %s", err.Error())
}
return &blockObj, nil
}

@ -343,6 +343,7 @@ type StagedSyncConfig struct {
VerifyHeaderBatchSize uint64 // batch size to verify header before insert to chain
UseMemDB bool // it uses memory by default. set it to false to use disk
LogProgress bool // log the full sync progress in console
DebugMode bool // log every single process and error to help to debug syncing issues (DebugMode is not accessible to the end user and is only an aid for development)
}
type PriceLimit int64

@ -93,6 +93,7 @@ type ConfigType struct {
VerifyAllSig bool // verify signatures for all blocks regardless of height and batch size
VerifyHeaderBatchSize uint64 // batch size to verify header before insert to chain
LogProgress bool // log the full sync progress in console
DebugMode bool // log every single process and error to help to debug the syncing issues
NtpServer string
StringRole string
P2PPriKey p2p_crypto.PrivKey `json:"-"`

@ -122,7 +122,8 @@ func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync {
node.NodeConfig.VerifyAllSig,
node.NodeConfig.VerifyHeaderBatchSize,
node.NodeConfig.InsertChainBatchSize,
node.NodeConfig.LogProgress); err != nil {
node.NodeConfig.LogProgress,
node.NodeConfig.DebugMode); err != nil {
return nil
} else {
return s

@ -2,7 +2,6 @@ package sync
import (
"context"
"fmt"
"strconv"
"time"
@ -180,7 +179,8 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) {
Msg("failed to add new stream")
return
}
fmt.Println("Connected to", raw.Conn().RemotePeer().String(), "(", st.ProtoID(), ")", "my ID: ", raw.Conn().LocalPeer().String())
//to get my ID use raw.Conn().LocalPeer().String()
p.logger.Info().Msgf("Connected to %s (%s)", raw.Conn().RemotePeer().String(), st.ProtoID())
st.run()
}

Loading…
Cancel
Save