Complete Fast Sync codes (#4594)

* adjust full state sync request parameters, rename stage_state

* add full state stage to the list of stages in fast sync

* add RangeMode and ChainExecutionMode to handle execution of the stream sync stage

* fix block exists issue on stage_states in stream sync

* fix double insertion in stage states

* add count for state downloader to return number of tasks

* fix travis build issue by goimports

* switch to Full Sync on pivot block, fix checking nil length in HandleRequestError
pull/4602/head
Gheis Mohammadi 11 months ago committed by GitHub
parent 718286f622
commit e68b44fd98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 87
      api/service/stagedstreamsync/default_stages.go
  2. 1
      api/service/stagedstreamsync/downloader.go
  3. 12
      api/service/stagedstreamsync/sig_verify.go
  4. 23
      api/service/stagedstreamsync/stage.go
  5. 5
      api/service/stagedstreamsync/stage_finish.go
  6. 6
      api/service/stagedstreamsync/stage_receipts.go
  7. 4
      api/service/stagedstreamsync/stage_states.go
  8. 9
      api/service/stagedstreamsync/stage_statesync.go
  9. 70
      api/service/stagedstreamsync/stage_statesync_full.go
  10. 55
      api/service/stagedstreamsync/staged_stream_sync.go
  11. 19
      api/service/stagedstreamsync/stages.go
  12. 147
      api/service/stagedstreamsync/state_sync_full.go
  13. 2
      api/service/stagedstreamsync/syncing.go

@ -64,7 +64,7 @@ func initFastSyncStagesOrder() {
ShortRange,
BlockBodies,
Receipts,
StateSync,
FullStateSync,
States,
LastMile,
Finish,
@ -74,7 +74,7 @@ func initFastSyncStagesOrder() {
Finish,
LastMile,
States,
StateSync,
FullStateSync,
Receipts,
BlockBodies,
ShortRange,
@ -86,7 +86,7 @@ func initFastSyncStagesOrder() {
Finish,
LastMile,
States,
StateSync,
FullStateSync,
Receipts,
BlockBodies,
ShortRange,
@ -101,6 +101,7 @@ func DefaultStages(ctx context.Context,
srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg,
stateSyncCfg StageStateSyncCfg,
fullStateSyncCfg StageFullStateSyncCfg,
statesCfg StageStatesCfg,
receiptsCfg StageReceiptsCfg,
lastMileCfg StageLastMileCfg,
@ -113,55 +114,81 @@ func DefaultStages(ctx context.Context,
handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg)
handlerStageStateSync := NewStageStateSync(stateSyncCfg)
handlerStageFullStateSync := NewStageFullStateSync(fullStateSyncCfg)
handlerStageReceipts := NewStageReceipts(receiptsCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg)
return []*Stage{
{
ID: Heads,
Description: "Retrieve Chain Heads",
Handler: handlerStageHeads,
ID: Heads,
Description: "Retrieve Chain Heads",
Handler: handlerStageHeads,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChains,
},
{
ID: SyncEpoch,
Description: "Sync only Last Block of Epoch",
Handler: handlerStageEpochSync,
ID: SyncEpoch,
Description: "Sync only Last Block of Epoch",
Handler: handlerStageEpochSync,
RangeMode: OnlyShortRange,
ChainExecutionMode: OnlyEpochChain,
},
{
ID: ShortRange,
Description: "Short Range Sync",
Handler: handlerStageShortRange,
ID: ShortRange,
Description: "Short Range Sync",
Handler: handlerStageShortRange,
RangeMode: OnlyShortRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: BlockBodies,
Description: "Retrieve Block Bodies",
Handler: handlerStageBodies,
ID: BlockBodies,
Description: "Retrieve Block Bodies",
Handler: handlerStageBodies,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: States,
Description: "Update Blockchain State",
Handler: handlerStageStates,
ID: States,
Description: "Update Blockchain State",
Handler: handlerStageStates,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: StateSync,
Description: "Retrieve States",
Handler: handlerStageStateSync,
ID: StateSync,
Description: "Retrieve States",
Handler: handlerStageStateSync,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: Receipts,
Description: "Retrieve Receipts",
Handler: handlerStageReceipts,
ID: FullStateSync,
Description: "Retrieve Full States",
Handler: handlerStageFullStateSync,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",
Handler: handlerStageLastMile,
ID: Receipts,
Description: "Retrieve Receipts",
Handler: handlerStageReceipts,
RangeMode: OnlyLongRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: Finish,
Description: "Finalize Changes",
Handler: handlerStageFinish,
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",
Handler: handlerStageLastMile,
RangeMode: LongRangeAndShortRange,
ChainExecutionMode: AllChainsExceptEpochChain,
},
{
ID: Finish,
Description: "Finalize Changes",
Handler: handlerStageFinish,
RangeMode: LongRangeAndShortRange,
ChainExecutionMode: AllChains,
},
}
}

@ -285,4 +285,5 @@ func (d *Downloader) loop() {
return
}
}
}

@ -54,14 +54,7 @@ func verifyBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block)
if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil {
return errors.Wrap(err, "[VerifyHeader]")
}
_, err = bc.InsertChain(types.Blocks{block}, false)
switch {
case errors.Is(err, core.ErrKnownBlock):
return nil
case err != nil:
return errors.Wrap(err, "[InsertChain]")
default:
}
return nil
}
@ -72,6 +65,9 @@ func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*type
}
// insert block
if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil {
if errors.Is(err, core.ErrKnownBlock) {
return nil
}
return errors.Wrap(err, "[InsertChain]")
}
return nil

@ -30,6 +30,25 @@ type StageHandler interface {
CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error
}
type RangeExecution uint32
const (
LongRangeAndShortRange RangeExecution = iota // Both short range and long range
OnlyShortRange // only short range
OnlyLongRange // only long range
//OnlyEpochSync // only epoch sync
)
type ChainExecution uint32
const (
AllChains ChainExecution = iota // Can execute for any shard
AllChainsExceptEpochChain // Can execute for any shard except epoch chain
OnlyBeaconNode // only for beacon node
OnlyEpochChain // only for epoch chain
OnlyShardChain // only for shard node (exclude beacon node and epoch chain)
)
// Stage is a single sync stage in staged sync.
type Stage struct {
// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
@ -42,6 +61,10 @@ type Stage struct {
DisabledDescription string
// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
Disabled bool
// Range defines whether stage has to be executed for either long range or short range
RangeMode RangeExecution
// ShardExecution defines this stage has to be executed for which shards
ChainExecutionMode ChainExecution
}
// StageState is the state of the stage.

@ -39,6 +39,11 @@ func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlo
// TODO: prepare indices (useful for RPC) and finalize
// switch to Full Sync Mode if the states are synced
if s.state.status.statesSynced {
s.state.status.cycleSyncMode = FullSync
}
if useInternalTx {
if err := tx.Commit(); err != nil {
return err

@ -12,6 +12,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
)
@ -56,6 +57,11 @@ func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return nil
}
// shouldn't execute for epoch chain
if r.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
useInternalTx := tx == nil
if invalidBlockRevert {

@ -165,6 +165,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return ErrInvalidBlockNumber
}
if stg.configs.bc.HasBlock(block.Hash(), block.NumberU64()) {
continue
}
if err := verifyAndInsertBlock(stg.configs.bc, block); err != nil {
stg.configs.logger.Warn().Err(err).Uint64("cycle target block", targetHeight).
Uint64("block number", block.NumberU64()).

@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -58,8 +59,14 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo
// for short range sync, skip this step
if !s.state.initSync {
return nil
} // only execute this stage in fast/snap sync mode and once we reach to pivot
}
// shouldn't execute for epoch chain
if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
// only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced {

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/pkg/errors"
//sttypes "github.com/harmony-one/harmony/p2p/stream/types"
@ -59,8 +60,19 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
// for short range sync, skip this step
if !s.state.initSync {
return nil
} // only execute this stage in fast/snap sync mode and once we reach to pivot
}
// shouldn't execute for epoch chain
if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}
// if states are already synced, don't execute this stage
if s.state.status.statesSynced {
return
}
// only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced {
@ -72,21 +84,21 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
// if currentHead >= maxHeight {
// return nil
// }
// currProgress := s.state.CurrentBlockNumber()
// targetHeight := s.state.currentCycle.TargetHeight
// if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error {
// if currProgress, err = s.CurrentStageProgress(etx); err != nil {
// return err
// }
// return nil
// }); errV != nil {
// return errV
// }
currProgress := uint64(0)
if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error {
if currProgress, err = s.CurrentStageProgress(etx); err != nil {
return err
}
return nil
}); errV != nil {
return errV
}
if currProgress >= s.state.status.pivotBlock.NumberU64() {
return nil
}
// if currProgress >= targetHeight {
// return nil
// }
useInternalTx := tx == nil
if useInternalTx {
var err error
@ -109,6 +121,8 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
scheme := sss.configs.bc.TrieDB().Scheme()
sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger)
sdm.setRootHash(currentBlockRootHash)
sdm.SyncStarted()
var wg sync.WaitGroup
for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1)
@ -128,6 +142,12 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever
// states should be fully synced in this stage
s.state.status.statesSynced = true
if err := sss.saveProgress(s, tx); err != nil {
sss.configs.logger.Warn().Err(err).
Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()).
Msg(WrapStagedSyncMsg("save progress for statesync stage failed"))
}
/*
gbm := s.state.gbm
@ -169,8 +189,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
return
default:
}
accountTasks, codes, storages, healtask, codetask, err := sdm.GetNextBatch()
if len(accountTasks)+len(codes)+len(storages.accounts)+len(healtask.hashes)+len(codetask.hashes) == 0 || err != nil {
accountTasks, codes, storages, healtask, codetask, nTasks, err := sdm.GetNextBatch()
if nTasks == 0 || err != nil {
select {
case <-ctx.Done():
return
@ -184,8 +204,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
task := accountTasks[0]
origin := task.Next
limit := task.Last
root := sdm.root
cap := maxRequestSize
root := task.root
cap := task.cap
retAccounts, proof, stid, err := sss.configs.protocol.GetAccountRange(ctx, root, origin, limit, uint64(cap))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
@ -234,10 +254,10 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
} else if len(storages.accounts) > 0 {
root := sdm.root
root := storages.root
roots := storages.roots
accounts := storages.accounts
cap := maxRequestSize
cap := storages.cap
origin := storages.origin
limit := storages.limit
mainTask := storages.mainTask
@ -276,13 +296,14 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
} else {
// assign trie node Heal Tasks
if len(healtask.hashes) > 0 {
root := sdm.root
root := healtask.root
task := healtask.task
hashes := healtask.hashes
pathsets := healtask.pathsets
paths := healtask.paths
bytes := healtask.bytes
nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, maxRequestSize)
nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, uint64(bytes))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetTrieNodes failed")
@ -316,7 +337,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full
if len(codetask.hashes) > 0 {
task := codetask.task
hashes := codetask.hashes
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, maxRequestSize)
bytes := codetask.bytes
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, uint64(bytes))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetByteCodes failed")
@ -354,7 +376,7 @@ func (sss *StageFullStateSync) downloadByteCodes(ctx context.Context, sdm *FullS
for _, codeTask := range codeTasks {
// try to get byte codes from remote peer
// if any of them failed, the stid will be the id of the failed stream
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, maxRequestSize)
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, uint64(codeTask.cap))
if err != nil {
return stid, err
}
@ -413,7 +435,7 @@ func (stg *StageFullStateSync) saveProgress(s *StageState, tx kv.RwTx) (err erro
}
// save progress
if err = s.Update(tx, s.state.CurrentBlockNumber()); err != nil {
if err = s.Update(tx, s.state.status.pivotBlock.NumberU64()); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_SYNC] saving progress for block States stage failed")

@ -16,6 +16,7 @@ import (
"github.com/harmony-one/harmony/internal/utils"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -405,6 +406,11 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs
continue
}
// TODO: enable this part after make sure all works well
// if !s.canExecute(stage) {
// continue
// }
if err := s.runStage(ctx, stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil {
utils.Logger().Error().
Err(err).
@ -431,6 +437,55 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs
return nil
}
func (s *StagedStreamSync) canExecute(stage *Stage) bool {
// check range mode
if stage.RangeMode != LongRangeAndShortRange {
isLongRange := s.initSync
switch stage.RangeMode {
case OnlyLongRange:
if !isLongRange {
return false
}
case OnlyShortRange:
if isLongRange {
return false
}
default:
return false
}
}
// check chain execution
if stage.ChainExecutionMode != AllChains {
shardID := s.bc.ShardID()
isBeaconNode := s.isBeaconNode
isShardChain := shardID != shard.BeaconChainShardID
isEpochChain := shardID == shard.BeaconChainShardID && !isBeaconNode
switch stage.ChainExecutionMode {
case AllChainsExceptEpochChain:
if isEpochChain {
return false
}
case OnlyBeaconNode:
if !isBeaconNode {
return false
}
case OnlyShardChain:
if !isShardChain {
return false
}
case OnlyEpochChain:
if !isEpochChain {
return false
}
default:
return false
}
}
return true
}
// CreateView creates a view for a given db
func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error {
if tx != nil {

@ -8,15 +8,16 @@ import (
type SyncStageID string
const (
Heads SyncStageID = "Heads" // Heads are downloaded
ShortRange SyncStageID = "ShortRange" // short range
SyncEpoch SyncStageID = "SyncEpoch" // epoch sync
BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
States SyncStageID = "States" // will construct most recent state from downloaded blocks
StateSync SyncStageID = "StateSync" // State sync
Receipts SyncStageID = "Receipts" // Receipts
LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well
Finish SyncStageID = "Finish" // Nominal stage after all other stages
Heads SyncStageID = "Heads" // Heads are downloaded
ShortRange SyncStageID = "ShortRange" // short range
SyncEpoch SyncStageID = "SyncEpoch" // epoch sync
BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
States SyncStageID = "States" // will construct most recent state from downloaded blocks
StateSync SyncStageID = "StateSync" // State sync
FullStateSync SyncStageID = "FullStateSync" // Full State Sync
Receipts SyncStageID = "Receipts" // Receipts
LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well
Finish SyncStageID = "Finish" // Nominal stage after all other stages
)
// GetStageName returns the stage name in string

@ -108,6 +108,11 @@ var (
type accountTask struct {
id uint64 //unique id for account task
root common.Hash
origin common.Hash
limit common.Hash
cap int
// These fields get serialized to leveldb on shutdown
Next common.Hash // Next account to sync in this interval
Last common.Hash // Last account to sync in this interval
@ -229,16 +234,19 @@ type byteCodeTasksBundle struct {
id uint64 //unique id for bytecode task bundle
task *accountTask
hashes []common.Hash
cap int
}
type storageTaskBundle struct {
id uint64 //unique id for storage task bundle
root common.Hash
accounts []common.Hash
roots []common.Hash
mainTask *accountTask
subtask *storageTask
origin common.Hash
limit common.Hash
cap int
}
// healTask represents the sync task for healing the snap-synced chunk boundaries.
@ -251,6 +259,7 @@ type healTask struct {
pathsets []*message.TrieNodePathSet
task *healTask
root common.Hash
bytes int
byteCodeReq bool
}
@ -259,7 +268,6 @@ type tasks struct {
storageTasks map[uint64]*storageTaskBundle // Set of trie node tasks currently queued for retrieval, indexed by path
codeTasks map[uint64]*byteCodeTasksBundle // Set of byte code tasks currently queued for retrieval, indexed by hash
healer map[uint64]*healTask
snapped bool // Flag to signal that snap phase is done
}
func newTasks() *tasks {
@ -268,7 +276,6 @@ func newTasks() *tasks {
storageTasks: make(map[uint64]*storageTaskBundle, 0),
codeTasks: make(map[uint64]*byteCodeTasksBundle),
healer: make(map[uint64]*healTask, 0),
snapped: false,
}
}
@ -399,8 +406,6 @@ type FullStateDownloadManager struct {
storageSynced uint64 // Number of storage slots downloaded
storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
stateWriter ethdb.Batch // Shared batch writer used for persisting raw states
accountHealed uint64 // Number of accounts downloaded during the healing stage
accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage
@ -420,6 +425,9 @@ type FullStateDownloadManager struct {
bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
bytecodeHealDups uint64 // Number of bytecodes already processed
bytecodeHealNops uint64 // Number of bytecodes not requested
startTime time.Time // Time instance when snapshot sync started
logTime time.Time // Time instance when status was last reported
}
func newFullStateDownloadManager(db ethdb.KeyValueStore,
@ -430,18 +438,19 @@ func newFullStateDownloadManager(db ethdb.KeyValueStore,
logger zerolog.Logger) *FullStateDownloadManager {
return &FullStateDownloadManager{
db: db,
scheme: scheme,
bc: bc,
stateWriter: db.NewBatch(),
tx: tx,
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
concurrency: concurrency,
logger: logger,
tasks: newTasks(),
requesting: newTasks(),
processing: newTasks(),
retries: newTasks(),
db: db,
scheme: scheme,
bc: bc,
stateWriter: db.NewBatch(),
tx: tx,
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
concurrency: concurrency,
logger: logger,
tasks: newTasks(),
requesting: newTasks(),
processing: newTasks(),
retries: newTasks(),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
}
}
@ -531,6 +540,12 @@ func (s *FullStateDownloadManager) commitHealer(force bool) {
utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data")
}
func (s *FullStateDownloadManager) SyncStarted() {
if s.startTime == (time.Time{}) {
s.startTime = time.Now()
}
}
func (s *FullStateDownloadManager) SyncCompleted() {
defer func() { // Persist any progress, independent of failure
for _, task := range s.tasks.accountTasks {
@ -556,7 +571,8 @@ func (s *FullStateDownloadManager) SyncCompleted() {
utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle")
}()
utils.Logger().Debug().Msg("Snapshot sync already completed")
elapsed := time.Since(s.startTime)
utils.Logger().Debug().Interface("elapsed", elapsed).Msg("Snapshot sync already completed")
}
// getNextBatch returns objects with a maximum of n state download
@ -566,38 +582,30 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask,
storages *storageTaskBundle,
healtask *healTask,
codetask *healTask,
nItems int,
err error) {
s.lock.Lock()
defer s.lock.Unlock()
accounts, codes, storages, healtask, codetask = s.getBatchFromRetries()
nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes)
accounts, codes, storages, healtask, codetask, nItems = s.getBatchFromRetries()
if nItems > 0 {
return
}
if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 {
if nItems == 0 {
s.SyncCompleted()
}
s.SyncCompleted()
return
}
// Refill available tasks from the scheduler.
withHealTasks := true
if healtask != nil || codetask != nil {
withHealTasks = false
}
newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask := s.getBatchFromUnprocessed(withHealTasks)
newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask, nItems := s.getBatchFromUnprocessed()
accounts = append(accounts, newAccounts...)
codes = append(codes, newCodes...)
storages = newStorageTaskBundle
if withHealTasks {
healtask = newHealTask
codetask = newCodeTask
}
healtask = newHealTask
codetask = newCodeTask
return
}
@ -714,7 +722,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() {
// Either we've failed to decode the previous state, or there was none.
// Start a fresh sync by chunking up the account range and scheduling
// them for retrieval.
s.tasks.accountTasks = nil
s.tasks = newTasks()
s.accountSynced, s.accountBytes = 0, 0
s.bytecodeSynced, s.bytecodeBytes = 0, 0
s.storageSynced, s.storageBytes = 0, 0
@ -921,16 +929,18 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in
// getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download
// tasks to send to the remote peer.
func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
func (s *FullStateDownloadManager) getBatchFromUnprocessed() (
accounts []*accountTask,
codes []*byteCodeTasksBundle,
storages *storageTaskBundle,
healtask *healTask,
codetask *healTask) {
codetask *healTask,
count int) {
// over trie nodes as those can be written to disk and forgotten about.
codes = make([]*byteCodeTasksBundle, 0)
accounts = make([]*accountTask, 0)
count = 0
for i, task := range s.tasks.accountTasks {
// Stop when we've gathered enough requests
@ -956,12 +966,18 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
break
}
task.root = s.root
task.origin = task.Next
task.limit = task.Last
task.cap = maxRequestSize
task.requested = true
s.tasks.accountTasks[i].requested = true
accounts = append(accounts, task)
s.requesting.addAccountTask(task.id, task)
s.tasks.addAccountTask(task.id, task)
// one task account is enough for an stream
count = len(accounts)
return
}
@ -997,6 +1013,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
id: taskID,
hashes: hashes,
task: task,
cap: maxRequestSize,
}
codes = append(codes, bytecodeTask)
@ -1005,12 +1022,14 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// Stop when we've gathered enough requests
if totalHashes >= maxCodeRequestCount {
count = totalHashes
return
}
}
// if we found some codes, can assign it to node
if totalHashes > 0 {
count = totalHashes
return
}
@ -1020,14 +1039,8 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
continue
}
// TODO: check cap calculations (shouldn't give us big chunk)
// if cap > maxRequestSize {
// cap = maxRequestSize
// }
// if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
// cap = minRequestSize
// }
storageSets := maxRequestSize / 1024
cap := maxRequestSize
storageSets := cap / 1024
storages = &storageTaskBundle{
accounts: make([]common.Hash, 0, storageSets),
@ -1089,23 +1102,21 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
storages.origin = storages.subtask.Next
storages.limit = storages.subtask.Last
}
storages.root = s.root
storages.cap = cap
s.tasks.addStorageTaskBundle(taskID, storages)
s.requesting.addStorageTaskBundle(taskID, storages)
count = len(storages.accounts)
return
}
if len(storages.accounts) > 0 {
return
}
if !withHealTasks {
count = len(storages.accounts)
return
}
// Sync phase done, run heal phase
// Iterate over pending tasks and try to find a peer to retrieve with
// Iterate over pending tasks
for (len(s.tasks.healer) > 0 && len(s.tasks.healer[0].hashes) > 0) || s.scheduler.Pending() > 0 {
// If there are not enough trie tasks queued to fully assign, fill the
// queue from the state sync scheduler. The trie synced schedules these
@ -1129,7 +1140,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// If all the heal tasks are bytecodes or already downloading, bail
if len(s.tasks.healer[0].trieTasks) == 0 {
return
break
}
// Generate the network query and send it to the peer
// if cap > maxTrieRequestCount {
@ -1177,6 +1188,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
pathsets: pathsets,
root: s.root,
task: s.tasks.healer[0],
bytes: maxRequestSize,
byteCodeReq: false,
}
@ -1184,6 +1196,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
s.requesting.addHealerTask(taskID, healtask)
if len(hashes) > 0 {
count = len(hashes)
return
}
}
@ -1205,7 +1218,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// If all the heal tasks are trienodes or already downloading, bail
if len(s.tasks.healer[0].codeTasks) == 0 {
return
break
}
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
@ -1243,9 +1256,10 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
id: taskID,
hashes: hashes,
task: s.tasks.healer[0],
bytes: maxRequestSize,
byteCodeReq: true,
}
count = len(hashes)
s.tasks.healer[taskID] = codetask
s.requesting.addHealerTask(taskID, healtask)
}
@ -1272,7 +1286,8 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
codes []*byteCodeTasksBundle,
storages *storageTaskBundle,
healtask *healTask,
codetask *healTask) {
codetask *healTask,
count int) {
// over trie nodes as those can be written to disk and forgotten about.
accounts = make([]*accountTask, 0)
@ -1290,6 +1305,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
if len(accounts) > 0 {
count = len(accounts)
return
}
@ -1301,6 +1317,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
if len(codes) > 0 {
count = len(codes)
return
}
@ -1316,10 +1333,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
s.requesting.addStorageTaskBundle(storages.id, storages)
s.retries.deleteStorageTaskBundle(storages.id)
return
}
if len(storages.accounts) > 0 {
count = len(storages.accounts)
return
}
@ -1338,6 +1352,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
s.requesting.addHealerTask(id, task)
s.retries.deleteHealerTask(id)
count = len(task.hashes)
return
}
if task.byteCodeReq {
@ -1352,11 +1367,13 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
s.requesting.addHealerTask(id, task)
s.retries.deleteHealerTask(id)
count = len(task.hashes)
return
}
}
}
count = 0
return
}
@ -1371,14 +1388,18 @@ func (s *FullStateDownloadManager) HandleRequestError(accounts []*accountTask,
s.lock.Lock()
defer s.lock.Unlock()
for _, task := range accounts {
s.requesting.deleteAccountTask(task.id)
s.retries.addAccountTask(task.id, task)
if accounts != nil && len(accounts) > 0 {
for _, task := range accounts {
s.requesting.deleteAccountTask(task.id)
s.retries.addAccountTask(task.id, task)
}
}
for _, code := range codes {
s.requesting.deleteCodeTask(code.id)
s.retries.addCodeTask(code.id, code)
if codes != nil && len(codes) > 0 {
for _, code := range codes {
s.requesting.deleteCodeTask(code.id)
s.retries.addCodeTask(code.id, code)
}
}
if storages != nil {

@ -90,6 +90,7 @@ func CreateStagedSync(ctx context.Context,
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)
stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress)
stageFullStateSyncCfg := NewStageFullStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress)
stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB)
stageFinishCfg := NewStageFinishCfg(mainDB)
@ -103,6 +104,7 @@ func CreateStagedSync(ctx context.Context,
stageShortRangeCfg,
stageBodiesCfg,
stageStateSyncCfg,
stageFullStateSyncCfg,
stageStatesCfg,
stageReceiptsCfg,
lastMileCfg,

Loading…
Cancel
Save