diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index f869ee5fe..fe64e26d4 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -64,7 +64,7 @@ func initFastSyncStagesOrder() { ShortRange, BlockBodies, Receipts, - StateSync, + FullStateSync, States, LastMile, Finish, @@ -74,7 +74,7 @@ func initFastSyncStagesOrder() { Finish, LastMile, States, - StateSync, + FullStateSync, Receipts, BlockBodies, ShortRange, @@ -86,7 +86,7 @@ func initFastSyncStagesOrder() { Finish, LastMile, States, - StateSync, + FullStateSync, Receipts, BlockBodies, ShortRange, @@ -101,6 +101,7 @@ func DefaultStages(ctx context.Context, srCfg StageShortRangeCfg, bodiesCfg StageBodiesCfg, stateSyncCfg StageStateSyncCfg, + fullStateSyncCfg StageFullStateSyncCfg, statesCfg StageStatesCfg, receiptsCfg StageReceiptsCfg, lastMileCfg StageLastMileCfg, @@ -113,55 +114,81 @@ func DefaultStages(ctx context.Context, handlerStageBodies := NewStageBodies(bodiesCfg) handlerStageStates := NewStageStates(statesCfg) handlerStageStateSync := NewStageStateSync(stateSyncCfg) + handlerStageFullStateSync := NewStageFullStateSync(fullStateSyncCfg) handlerStageReceipts := NewStageReceipts(receiptsCfg) handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageFinish := NewStageFinish(finishCfg) return []*Stage{ { - ID: Heads, - Description: "Retrieve Chain Heads", - Handler: handlerStageHeads, + ID: Heads, + Description: "Retrieve Chain Heads", + Handler: handlerStageHeads, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChains, }, { - ID: SyncEpoch, - Description: "Sync only Last Block of Epoch", - Handler: handlerStageEpochSync, + ID: SyncEpoch, + Description: "Sync only Last Block of Epoch", + Handler: handlerStageEpochSync, + RangeMode: OnlyShortRange, + ChainExecutionMode: OnlyEpochChain, }, { - ID: ShortRange, - Description: "Short Range Sync", - Handler: handlerStageShortRange, + ID: ShortRange, + Description: "Short Range Sync", + Handler: handlerStageShortRange, + RangeMode: OnlyShortRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: BlockBodies, - Description: "Retrieve Block Bodies", - Handler: handlerStageBodies, + ID: BlockBodies, + Description: "Retrieve Block Bodies", + Handler: handlerStageBodies, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: States, - Description: "Update Blockchain State", - Handler: handlerStageStates, + ID: States, + Description: "Update Blockchain State", + Handler: handlerStageStates, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: StateSync, - Description: "Retrieve States", - Handler: handlerStageStateSync, + ID: StateSync, + Description: "Retrieve States", + Handler: handlerStageStateSync, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: Receipts, - Description: "Retrieve Receipts", - Handler: handlerStageReceipts, + ID: FullStateSync, + Description: "Retrieve Full States", + Handler: handlerStageFullStateSync, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: LastMile, - Description: "update status for blocks after sync and update last mile blocks as well", - Handler: handlerStageLastMile, + ID: Receipts, + Description: "Retrieve Receipts", + Handler: handlerStageReceipts, + RangeMode: OnlyLongRange, + ChainExecutionMode: AllChainsExceptEpochChain, }, { - ID: Finish, - Description: "Finalize Changes", - Handler: handlerStageFinish, + ID: LastMile, + Description: "update status for blocks after sync and update last mile blocks as well", + Handler: handlerStageLastMile, + RangeMode: LongRangeAndShortRange, + ChainExecutionMode: AllChainsExceptEpochChain, + }, + { + ID: Finish, + Description: "Finalize Changes", + Handler: handlerStageFinish, + RangeMode: LongRangeAndShortRange, + ChainExecutionMode: AllChains, }, } } diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 371104895..9d564b016 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -285,4 +285,5 @@ func (d *Downloader) loop() { return } } + } diff --git a/api/service/stagedstreamsync/sig_verify.go b/api/service/stagedstreamsync/sig_verify.go index bdf5a2107..cd7fc4f91 100644 --- a/api/service/stagedstreamsync/sig_verify.go +++ b/api/service/stagedstreamsync/sig_verify.go @@ -54,14 +54,7 @@ func verifyBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil { return errors.Wrap(err, "[VerifyHeader]") } - _, err = bc.InsertChain(types.Blocks{block}, false) - switch { - case errors.Is(err, core.ErrKnownBlock): - return nil - case err != nil: - return errors.Wrap(err, "[InsertChain]") - default: - } + return nil } @@ -72,6 +65,9 @@ func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*type } // insert block if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil { + if errors.Is(err, core.ErrKnownBlock) { + return nil + } return errors.Wrap(err, "[InsertChain]") } return nil diff --git a/api/service/stagedstreamsync/stage.go b/api/service/stagedstreamsync/stage.go index 48334a5e5..59602fe81 100644 --- a/api/service/stagedstreamsync/stage.go +++ b/api/service/stagedstreamsync/stage.go @@ -30,6 +30,25 @@ type StageHandler interface { CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error } +type RangeExecution uint32 + +const ( + LongRangeAndShortRange RangeExecution = iota // Both short range and long range + OnlyShortRange // only short range + OnlyLongRange // only long range + //OnlyEpochSync // only epoch sync +) + +type ChainExecution uint32 + +const ( + AllChains ChainExecution = iota // Can execute for any shard + AllChainsExceptEpochChain // Can execute for any shard except epoch chain + OnlyBeaconNode // only for beacon node + OnlyEpochChain // only for epoch chain + OnlyShardChain // only for shard node (exclude beacon node and epoch chain) +) + // Stage is a single sync stage in staged sync. type Stage struct { // ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`). @@ -42,6 +61,10 @@ type Stage struct { DisabledDescription string // Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`. Disabled bool + // Range defines whether stage has to be executed for either long range or short range + RangeMode RangeExecution + // ShardExecution defines this stage has to be executed for which shards + ChainExecutionMode ChainExecution } // StageState is the state of the stage. diff --git a/api/service/stagedstreamsync/stage_finish.go b/api/service/stagedstreamsync/stage_finish.go index 0dfae53ae..c94aa692b 100644 --- a/api/service/stagedstreamsync/stage_finish.go +++ b/api/service/stagedstreamsync/stage_finish.go @@ -39,6 +39,11 @@ func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlo // TODO: prepare indices (useful for RPC) and finalize + // switch to Full Sync Mode if the states are synced + if s.state.status.statesSynced { + s.state.status.cycleSyncMode = FullSync + } + if useInternalTx { if err := tx.Commit(); err != nil { return err diff --git a/api/service/stagedstreamsync/stage_receipts.go b/api/service/stagedstreamsync/stage_receipts.go index 4445eb6ba..78e8e089c 100644 --- a/api/service/stagedstreamsync/stage_receipts.go +++ b/api/service/stagedstreamsync/stage_receipts.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" ) @@ -56,6 +57,11 @@ func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR return nil } + // shouldn't execute for epoch chain + if r.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + useInternalTx := tx == nil if invalidBlockRevert { diff --git a/api/service/stagedstreamsync/stage_state.go b/api/service/stagedstreamsync/stage_states.go similarity index 98% rename from api/service/stagedstreamsync/stage_state.go rename to api/service/stagedstreamsync/stage_states.go index df864d63f..1b668786c 100644 --- a/api/service/stagedstreamsync/stage_state.go +++ b/api/service/stagedstreamsync/stage_states.go @@ -165,6 +165,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR return ErrInvalidBlockNumber } + if stg.configs.bc.HasBlock(block.Hash(), block.NumberU64()) { + continue + } + if err := verifyAndInsertBlock(stg.configs.bc, block); err != nil { stg.configs.logger.Warn().Err(err).Uint64("cycle target block", targetHeight). Uint64("block number", block.NumberU64()). diff --git a/api/service/stagedstreamsync/stage_statesync.go b/api/service/stagedstreamsync/stage_statesync.go index c4e66e10e..3ce733f41 100644 --- a/api/service/stagedstreamsync/stage_statesync.go +++ b/api/service/stagedstreamsync/stage_statesync.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -58,8 +59,14 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo // for short range sync, skip this step if !s.state.initSync { return nil - } // only execute this stage in fast/snap sync mode and once we reach to pivot + } + + // shouldn't execute for epoch chain + if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + // only execute this stage in fast/snap sync mode and once we reach to pivot if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.status.statesSynced { diff --git a/api/service/stagedstreamsync/stage_statesync_full.go b/api/service/stagedstreamsync/stage_statesync_full.go index d304ca1c3..c1579114b 100644 --- a/api/service/stagedstreamsync/stage_statesync_full.go +++ b/api/service/stagedstreamsync/stage_statesync_full.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/pkg/errors" //sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -59,8 +60,19 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // for short range sync, skip this step if !s.state.initSync { return nil - } // only execute this stage in fast/snap sync mode and once we reach to pivot + } + + // shouldn't execute for epoch chain + if sss.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + + // if states are already synced, don't execute this stage + if s.state.status.statesSynced { + return + } + // only execute this stage in fast/snap sync mode and once we reach to pivot if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() || s.state.status.statesSynced { @@ -72,21 +84,21 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // if currentHead >= maxHeight { // return nil // } - // currProgress := s.state.CurrentBlockNumber() // targetHeight := s.state.currentCycle.TargetHeight - // if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error { - // if currProgress, err = s.CurrentStageProgress(etx); err != nil { - // return err - // } - // return nil - // }); errV != nil { - // return errV - // } + currProgress := uint64(0) + if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error { + if currProgress, err = s.CurrentStageProgress(etx); err != nil { + return err + } + return nil + }); errV != nil { + return errV + } + if currProgress >= s.state.status.pivotBlock.NumberU64() { + return nil + } - // if currProgress >= targetHeight { - // return nil - // } useInternalTx := tx == nil if useInternalTx { var err error @@ -109,6 +121,8 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever scheme := sss.configs.bc.TrieDB().Scheme() sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger) sdm.setRootHash(currentBlockRootHash) + + sdm.SyncStarted() var wg sync.WaitGroup for i := 0; i < s.state.config.Concurrency; i++ { wg.Add(1) @@ -128,6 +142,12 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever // states should be fully synced in this stage s.state.status.statesSynced = true + if err := sss.saveProgress(s, tx); err != nil { + sss.configs.logger.Warn().Err(err). + Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()). + Msg(WrapStagedSyncMsg("save progress for statesync stage failed")) + } + /* gbm := s.state.gbm @@ -169,8 +189,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full return default: } - accountTasks, codes, storages, healtask, codetask, err := sdm.GetNextBatch() - if len(accountTasks)+len(codes)+len(storages.accounts)+len(healtask.hashes)+len(codetask.hashes) == 0 || err != nil { + accountTasks, codes, storages, healtask, codetask, nTasks, err := sdm.GetNextBatch() + if nTasks == 0 || err != nil { select { case <-ctx.Done(): return @@ -184,8 +204,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full task := accountTasks[0] origin := task.Next limit := task.Last - root := sdm.root - cap := maxRequestSize + root := task.root + cap := task.cap retAccounts, proof, stid, err := sss.configs.protocol.GetAccountRange(ctx, root, origin, limit, uint64(cap)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { @@ -234,10 +254,10 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } else if len(storages.accounts) > 0 { - root := sdm.root + root := storages.root roots := storages.roots accounts := storages.accounts - cap := maxRequestSize + cap := storages.cap origin := storages.origin limit := storages.limit mainTask := storages.mainTask @@ -276,13 +296,14 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full } else { // assign trie node Heal Tasks if len(healtask.hashes) > 0 { - root := sdm.root + root := healtask.root task := healtask.task hashes := healtask.hashes pathsets := healtask.pathsets paths := healtask.paths + bytes := healtask.bytes - nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, maxRequestSize) + nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, uint64(bytes)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { sss.configs.protocol.StreamFailed(stid, "GetTrieNodes failed") @@ -316,7 +337,8 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full if len(codetask.hashes) > 0 { task := codetask.task hashes := codetask.hashes - retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, maxRequestSize) + bytes := codetask.bytes + retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, uint64(bytes)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { sss.configs.protocol.StreamFailed(stid, "GetByteCodes failed") @@ -354,7 +376,7 @@ func (sss *StageFullStateSync) downloadByteCodes(ctx context.Context, sdm *FullS for _, codeTask := range codeTasks { // try to get byte codes from remote peer // if any of them failed, the stid will be the id of the failed stream - retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, maxRequestSize) + retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, uint64(codeTask.cap)) if err != nil { return stid, err } @@ -413,7 +435,7 @@ func (stg *StageFullStateSync) saveProgress(s *StageState, tx kv.RwTx) (err erro } // save progress - if err = s.Update(tx, s.state.CurrentBlockNumber()); err != nil { + if err = s.Update(tx, s.state.status.pivotBlock.NumberU64()); err != nil { utils.Logger().Error(). Err(err). Msgf("[STAGED_SYNC] saving progress for block States stage failed") diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 03340eb15..1782068b2 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -16,6 +16,7 @@ import ( "github.com/harmony-one/harmony/internal/utils" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -405,6 +406,11 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs continue } + // TODO: enable this part after make sure all works well + // if !s.canExecute(stage) { + // continue + // } + if err := s.runStage(ctx, stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil { utils.Logger().Error(). Err(err). @@ -431,6 +437,55 @@ func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firs return nil } +func (s *StagedStreamSync) canExecute(stage *Stage) bool { + // check range mode + if stage.RangeMode != LongRangeAndShortRange { + isLongRange := s.initSync + switch stage.RangeMode { + case OnlyLongRange: + if !isLongRange { + return false + } + case OnlyShortRange: + if isLongRange { + return false + } + default: + return false + } + } + + // check chain execution + if stage.ChainExecutionMode != AllChains { + shardID := s.bc.ShardID() + isBeaconNode := s.isBeaconNode + isShardChain := shardID != shard.BeaconChainShardID + isEpochChain := shardID == shard.BeaconChainShardID && !isBeaconNode + switch stage.ChainExecutionMode { + case AllChainsExceptEpochChain: + if isEpochChain { + return false + } + case OnlyBeaconNode: + if !isBeaconNode { + return false + } + case OnlyShardChain: + if !isShardChain { + return false + } + case OnlyEpochChain: + if !isEpochChain { + return false + } + default: + return false + } + } + + return true +} + // CreateView creates a view for a given db func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error { if tx != nil { diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index 6ad9e4519..33f3b293b 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -8,15 +8,16 @@ import ( type SyncStageID string const ( - Heads SyncStageID = "Heads" // Heads are downloaded - ShortRange SyncStageID = "ShortRange" // short range - SyncEpoch SyncStageID = "SyncEpoch" // epoch sync - BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified - States SyncStageID = "States" // will construct most recent state from downloaded blocks - StateSync SyncStageID = "StateSync" // State sync - Receipts SyncStageID = "Receipts" // Receipts - LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well - Finish SyncStageID = "Finish" // Nominal stage after all other stages + Heads SyncStageID = "Heads" // Heads are downloaded + ShortRange SyncStageID = "ShortRange" // short range + SyncEpoch SyncStageID = "SyncEpoch" // epoch sync + BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified + States SyncStageID = "States" // will construct most recent state from downloaded blocks + StateSync SyncStageID = "StateSync" // State sync + FullStateSync SyncStageID = "FullStateSync" // Full State Sync + Receipts SyncStageID = "Receipts" // Receipts + LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well + Finish SyncStageID = "Finish" // Nominal stage after all other stages ) // GetStageName returns the stage name in string diff --git a/api/service/stagedstreamsync/state_sync_full.go b/api/service/stagedstreamsync/state_sync_full.go index c98dcbafd..14cdb1f59 100644 --- a/api/service/stagedstreamsync/state_sync_full.go +++ b/api/service/stagedstreamsync/state_sync_full.go @@ -108,6 +108,11 @@ var ( type accountTask struct { id uint64 //unique id for account task + root common.Hash + origin common.Hash + limit common.Hash + cap int + // These fields get serialized to leveldb on shutdown Next common.Hash // Next account to sync in this interval Last common.Hash // Last account to sync in this interval @@ -229,16 +234,19 @@ type byteCodeTasksBundle struct { id uint64 //unique id for bytecode task bundle task *accountTask hashes []common.Hash + cap int } type storageTaskBundle struct { id uint64 //unique id for storage task bundle + root common.Hash accounts []common.Hash roots []common.Hash mainTask *accountTask subtask *storageTask origin common.Hash limit common.Hash + cap int } // healTask represents the sync task for healing the snap-synced chunk boundaries. @@ -251,6 +259,7 @@ type healTask struct { pathsets []*message.TrieNodePathSet task *healTask root common.Hash + bytes int byteCodeReq bool } @@ -259,7 +268,6 @@ type tasks struct { storageTasks map[uint64]*storageTaskBundle // Set of trie node tasks currently queued for retrieval, indexed by path codeTasks map[uint64]*byteCodeTasksBundle // Set of byte code tasks currently queued for retrieval, indexed by hash healer map[uint64]*healTask - snapped bool // Flag to signal that snap phase is done } func newTasks() *tasks { @@ -268,7 +276,6 @@ func newTasks() *tasks { storageTasks: make(map[uint64]*storageTaskBundle, 0), codeTasks: make(map[uint64]*byteCodeTasksBundle), healer: make(map[uint64]*healTask, 0), - snapped: false, } } @@ -399,8 +406,6 @@ type FullStateDownloadManager struct { storageSynced uint64 // Number of storage slots downloaded storageBytes common.StorageSize // Number of storage trie bytes persisted to disk - pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown - stateWriter ethdb.Batch // Shared batch writer used for persisting raw states accountHealed uint64 // Number of accounts downloaded during the healing stage accountHealedBytes common.StorageSize // Number of raw account bytes persisted to disk during the healing stage @@ -420,6 +425,9 @@ type FullStateDownloadManager struct { bytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk bytecodeHealDups uint64 // Number of bytecodes already processed bytecodeHealNops uint64 // Number of bytecodes not requested + + startTime time.Time // Time instance when snapshot sync started + logTime time.Time // Time instance when status was last reported } func newFullStateDownloadManager(db ethdb.KeyValueStore, @@ -430,18 +438,19 @@ func newFullStateDownloadManager(db ethdb.KeyValueStore, logger zerolog.Logger) *FullStateDownloadManager { return &FullStateDownloadManager{ - db: db, - scheme: scheme, - bc: bc, - stateWriter: db.NewBatch(), - tx: tx, - keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), - concurrency: concurrency, - logger: logger, - tasks: newTasks(), - requesting: newTasks(), - processing: newTasks(), - retries: newTasks(), + db: db, + scheme: scheme, + bc: bc, + stateWriter: db.NewBatch(), + tx: tx, + keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), + concurrency: concurrency, + logger: logger, + tasks: newTasks(), + requesting: newTasks(), + processing: newTasks(), + retries: newTasks(), + trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk } } @@ -531,6 +540,12 @@ func (s *FullStateDownloadManager) commitHealer(force bool) { utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data") } +func (s *FullStateDownloadManager) SyncStarted() { + if s.startTime == (time.Time{}) { + s.startTime = time.Now() + } +} + func (s *FullStateDownloadManager) SyncCompleted() { defer func() { // Persist any progress, independent of failure for _, task := range s.tasks.accountTasks { @@ -556,7 +571,8 @@ func (s *FullStateDownloadManager) SyncCompleted() { utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle") }() - utils.Logger().Debug().Msg("Snapshot sync already completed") + elapsed := time.Since(s.startTime) + utils.Logger().Debug().Interface("elapsed", elapsed).Msg("Snapshot sync already completed") } // getNextBatch returns objects with a maximum of n state download @@ -566,38 +582,30 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask, storages *storageTaskBundle, healtask *healTask, codetask *healTask, + nItems int, err error) { s.lock.Lock() defer s.lock.Unlock() - accounts, codes, storages, healtask, codetask = s.getBatchFromRetries() - nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes) + accounts, codes, storages, healtask, codetask, nItems = s.getBatchFromRetries() if nItems > 0 { return } if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 { - if nItems == 0 { - s.SyncCompleted() - } + s.SyncCompleted() return } // Refill available tasks from the scheduler. - withHealTasks := true - if healtask != nil || codetask != nil { - withHealTasks = false - } - newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask := s.getBatchFromUnprocessed(withHealTasks) + newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask, nItems := s.getBatchFromUnprocessed() accounts = append(accounts, newAccounts...) codes = append(codes, newCodes...) storages = newStorageTaskBundle - if withHealTasks { - healtask = newHealTask - codetask = newCodeTask - } + healtask = newHealTask + codetask = newCodeTask return } @@ -714,7 +722,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() { // Either we've failed to decode the previous state, or there was none. // Start a fresh sync by chunking up the account range and scheduling // them for retrieval. - s.tasks.accountTasks = nil + s.tasks = newTasks() s.accountSynced, s.accountBytes = 0, 0 s.bytecodeSynced, s.bytecodeBytes = 0, 0 s.storageSynced, s.storageBytes = 0, 0 @@ -921,16 +929,18 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in // getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download // tasks to send to the remote peer. -func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( +func (s *FullStateDownloadManager) getBatchFromUnprocessed() ( accounts []*accountTask, codes []*byteCodeTasksBundle, storages *storageTaskBundle, healtask *healTask, - codetask *healTask) { + codetask *healTask, + count int) { // over trie nodes as those can be written to disk and forgotten about. codes = make([]*byteCodeTasksBundle, 0) accounts = make([]*accountTask, 0) + count = 0 for i, task := range s.tasks.accountTasks { // Stop when we've gathered enough requests @@ -956,12 +966,18 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( break } + task.root = s.root + task.origin = task.Next + task.limit = task.Last + task.cap = maxRequestSize + task.requested = true s.tasks.accountTasks[i].requested = true accounts = append(accounts, task) s.requesting.addAccountTask(task.id, task) s.tasks.addAccountTask(task.id, task) // one task account is enough for an stream + count = len(accounts) return } @@ -997,6 +1013,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( id: taskID, hashes: hashes, task: task, + cap: maxRequestSize, } codes = append(codes, bytecodeTask) @@ -1005,12 +1022,14 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // Stop when we've gathered enough requests if totalHashes >= maxCodeRequestCount { + count = totalHashes return } } // if we found some codes, can assign it to node if totalHashes > 0 { + count = totalHashes return } @@ -1020,14 +1039,8 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( continue } - // TODO: check cap calculations (shouldn't give us big chunk) - // if cap > maxRequestSize { - // cap = maxRequestSize - // } - // if cap < minRequestSize { // Don't bother with peers below a bare minimum performance - // cap = minRequestSize - // } - storageSets := maxRequestSize / 1024 + cap := maxRequestSize + storageSets := cap / 1024 storages = &storageTaskBundle{ accounts: make([]common.Hash, 0, storageSets), @@ -1089,23 +1102,21 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( storages.origin = storages.subtask.Next storages.limit = storages.subtask.Last } + storages.root = s.root + storages.cap = cap s.tasks.addStorageTaskBundle(taskID, storages) s.requesting.addStorageTaskBundle(taskID, storages) - + count = len(storages.accounts) return } if len(storages.accounts) > 0 { - return - } - - if !withHealTasks { + count = len(storages.accounts) return } // Sync phase done, run heal phase - - // Iterate over pending tasks and try to find a peer to retrieve with + // Iterate over pending tasks for (len(s.tasks.healer) > 0 && len(s.tasks.healer[0].hashes) > 0) || s.scheduler.Pending() > 0 { // If there are not enough trie tasks queued to fully assign, fill the // queue from the state sync scheduler. The trie synced schedules these @@ -1129,7 +1140,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // If all the heal tasks are bytecodes or already downloading, bail if len(s.tasks.healer[0].trieTasks) == 0 { - return + break } // Generate the network query and send it to the peer // if cap > maxTrieRequestCount { @@ -1177,6 +1188,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( pathsets: pathsets, root: s.root, task: s.tasks.healer[0], + bytes: maxRequestSize, byteCodeReq: false, } @@ -1184,6 +1196,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( s.requesting.addHealerTask(taskID, healtask) if len(hashes) > 0 { + count = len(hashes) return } } @@ -1205,7 +1218,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( // If all the heal tasks are trienodes or already downloading, bail if len(s.tasks.healer[0].codeTasks) == 0 { - return + break } // Task pending retrieval, try to find an idle peer. If no such peer // exists, we probably assigned tasks for all (or they are stateless). @@ -1243,9 +1256,10 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) ( id: taskID, hashes: hashes, task: s.tasks.healer[0], + bytes: maxRequestSize, byteCodeReq: true, } - + count = len(hashes) s.tasks.healer[taskID] = codetask s.requesting.addHealerTask(taskID, healtask) } @@ -1272,7 +1286,8 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( codes []*byteCodeTasksBundle, storages *storageTaskBundle, healtask *healTask, - codetask *healTask) { + codetask *healTask, + count int) { // over trie nodes as those can be written to disk and forgotten about. accounts = make([]*accountTask, 0) @@ -1290,6 +1305,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } if len(accounts) > 0 { + count = len(accounts) return } @@ -1301,6 +1317,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } if len(codes) > 0 { + count = len(codes) return } @@ -1316,10 +1333,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addStorageTaskBundle(storages.id, storages) s.retries.deleteStorageTaskBundle(storages.id) - return - } - - if len(storages.accounts) > 0 { + count = len(storages.accounts) return } @@ -1338,6 +1352,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addHealerTask(id, task) s.retries.deleteHealerTask(id) + count = len(task.hashes) return } if task.byteCodeReq { @@ -1352,11 +1367,13 @@ func (s *FullStateDownloadManager) getBatchFromRetries() ( } s.requesting.addHealerTask(id, task) s.retries.deleteHealerTask(id) + count = len(task.hashes) return } } } + count = 0 return } @@ -1371,14 +1388,18 @@ func (s *FullStateDownloadManager) HandleRequestError(accounts []*accountTask, s.lock.Lock() defer s.lock.Unlock() - for _, task := range accounts { - s.requesting.deleteAccountTask(task.id) - s.retries.addAccountTask(task.id, task) + if accounts != nil && len(accounts) > 0 { + for _, task := range accounts { + s.requesting.deleteAccountTask(task.id) + s.retries.addAccountTask(task.id, task) + } } - for _, code := range codes { - s.requesting.deleteCodeTask(code.id) - s.retries.addCodeTask(code.id, code) + if codes != nil && len(codes) > 0 { + for _, code := range codes { + s.requesting.deleteCodeTask(code.id) + s.retries.addCodeTask(code.id, code) + } } if storages != nil { diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index e6879a523..c3bc585f2 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -90,6 +90,7 @@ func CreateStagedSync(ctx context.Context, stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) + stageFullStateSyncCfg := NewStageFullStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB) stageFinishCfg := NewStageFinishCfg(mainDB) @@ -103,6 +104,7 @@ func CreateStagedSync(ctx context.Context, stageShortRangeCfg, stageBodiesCfg, stageStateSyncCfg, + stageFullStateSyncCfg, stageStatesCfg, stageReceiptsCfg, lastMileCfg,