diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index 55986ff6e..4a1e719f2 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -13,6 +13,7 @@ var DefaultForwardOrder = ForwardOrder{ SyncEpoch, ShortRange, BlockBodies, + StateSync, // Stages below don't use Internet States, LastMile, @@ -23,6 +24,7 @@ var DefaultRevertOrder = RevertOrder{ Finish, LastMile, States, + StateSync, BlockBodies, ShortRange, SyncEpoch, @@ -33,6 +35,7 @@ var DefaultCleanUpOrder = CleanUpOrder{ Finish, LastMile, States, + StateSync, BlockBodies, ShortRange, SyncEpoch, @@ -44,6 +47,7 @@ func DefaultStages(ctx context.Context, seCfg StageEpochCfg, srCfg StageShortRangeCfg, bodiesCfg StageBodiesCfg, + stateSyncCfg StageStateSyncCfg, statesCfg StageStatesCfg, lastMileCfg StageLastMileCfg, finishCfg StageFinishCfg, @@ -53,6 +57,7 @@ func DefaultStages(ctx context.Context, handlerStageShortRange := NewStageShortRange(srCfg) handlerStageEpochSync := NewStageEpoch(seCfg) handlerStageBodies := NewStageBodies(bodiesCfg) + handlerStageStateSync := NewStageStateSync(stateSyncCfg) handlerStageStates := NewStageStates(statesCfg) handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageFinish := NewStageFinish(finishCfg) @@ -78,6 +83,11 @@ func DefaultStages(ctx context.Context, Description: "Retrieve Block Bodies", Handler: handlerStageBodies, }, + { + ID: StateSync, + Description: "Retrieve States", + Handler: handlerStageStateSync, + }, { ID: States, Description: "Update Blockchain State", diff --git a/api/service/stagedstreamsync/stage_statesync.go b/api/service/stagedstreamsync/stage_statesync.go new file mode 100644 index 000000000..10cce8462 --- /dev/null +++ b/api/service/stagedstreamsync/stage_statesync.go @@ -0,0 +1,215 @@ +package stagedstreamsync + +import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/trie" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/rawdb" + "github.com/harmony-one/harmony/core/state" + "github.com/harmony-one/harmony/internal/utils" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog" + "golang.org/x/crypto/sha3" +) + +type StageStateSync struct { + configs StageStateSyncCfg +} + +// trieTask represents a single trie node download task, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type trieTask struct { + hash common.Hash + path [][]byte + attempts map[string]struct{} +} + +// codeTask represents a single byte code download task, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type codeTask struct { + attempts map[string]struct{} +} + +type StageStateSyncCfg struct { + bc core.BlockChain + protocol syncProtocol + db kv.RwDB + root common.Hash // State root currently being synced + sched *trie.Sync // State trie sync scheduler defining the tasks + keccak crypto.KeccakState // Keccak256 hasher to verify deliveries with + trieTasks map[string]*trieTask // Set of trie node tasks currently queued for retrieval, indexed by path + codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval, indexed by hash + concurrency int + logger zerolog.Logger + logProgress bool +} + +func NewStageStateSync(cfg StageStateSyncCfg) *StageStateSync { + return &StageStateSync{ + configs: cfg, + } +} + +func NewStageStateSyncCfg(bc core.BlockChain, + db kv.RwDB, + root common.Hash, + concurrency int, + protocol syncProtocol, + logger zerolog.Logger, + logProgress bool) StageStateSyncCfg { + + return StageStateSyncCfg{ + bc: bc, + db: db, + root: root, + sched: state.NewStateSync(root, bc.ChainDb(), nil, rawdb.HashScheme), + keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), + trieTasks: make(map[string]*trieTask), + codeTasks: make(map[common.Hash]*codeTask), + concurrency: concurrency, + logger: logger, + logProgress: logProgress, + } +} + +// Exec progresses States stage in the forward direction +func (stg *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { + + // for short range sync, skip this step + if !s.state.initSync { + return nil + } + + maxHeight := s.state.status.targetBN + currentHead := stg.configs.bc.CurrentBlock().NumberU64() + if currentHead >= maxHeight { + return nil + } + currProgress := stg.configs.bc.CurrentBlock().NumberU64() + targetHeight := s.state.currentCycle.TargetHeight + if currProgress >= targetHeight { + return nil + } + useInternalTx := tx == nil + if useInternalTx { + var err error + tx, err = stg.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + // isLastCycle := targetHeight >= maxHeight + startTime := time.Now() + startBlock := currProgress + + if stg.configs.logProgress { + fmt.Print("\033[s") // save the cursor position + } + + for i := currProgress + 1; i <= targetHeight; i++ { + // log the stage progress in console + if stg.configs.logProgress { + //calculating block speed + dt := time.Now().Sub(startTime).Seconds() + speed := float64(0) + if dt > 0 { + speed = float64(currProgress-startBlock) / dt + } + blockSpeed := fmt.Sprintf("%.2f", speed) + fmt.Print("\033[u\033[K") // restore the cursor position and clear the line + fmt.Println("insert blocks progress:", currProgress, "/", targetHeight, "(", blockSpeed, "blocks/s", ")") + } + + } + + if useInternalTx { + if err := tx.Commit(); err != nil { + return err + } + } + + return nil +} + +func (stg *StageStateSync) insertChain(gbm *blockDownloadManager, + protocol syncProtocol, + lbls prometheus.Labels, + targetBN uint64) { + +} + +func (stg *StageStateSync) saveProgress(s *StageState, tx kv.RwTx) (err error) { + + useInternalTx := tx == nil + if useInternalTx { + var err error + tx, err = stg.configs.db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + } + + // save progress + if err = s.Update(tx, stg.configs.bc.CurrentBlock().NumberU64()); err != nil { + utils.Logger().Error(). + Err(err). + Msgf("[STAGED_SYNC] saving progress for block States stage failed") + return ErrSaveStateProgressFail + } + + if useInternalTx { + if err := tx.Commit(); err != nil { + return err + } + } + return nil +} + +func (stg *StageStateSync) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { + useInternalTx := tx == nil + if useInternalTx { + tx, err = stg.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = u.Done(tx); err != nil { + return err + } + + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + +func (stg *StageStateSync) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { + useInternalTx := tx == nil + if useInternalTx { + tx, err = stg.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index 6a21fe707..cb6efa0cd 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -12,6 +12,7 @@ const ( ShortRange SyncStageID = "ShortRange" // short range SyncEpoch SyncStageID = "SyncEpoch" // epoch sync BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified + StateSync SyncStageID = "StateSync" // State sync States SyncStageID = "States" // will construct most recent state from downloaded blocks LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well Finish SyncStageID = "Finish" // Nominal stage after all other stages diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index 9e8926468..ba5ab3a20 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -10,6 +10,7 @@ import ( "time" "github.com/harmony-one/harmony/consensus" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -84,9 +85,11 @@ func CreateStagedSync(ctx context.Context, stageHeadsCfg := NewStageHeadersCfg(bc, mainDB) stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB) stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB) + stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB) + stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, common.Hash{}, config.Concurrency, protocol, logger, config.LogProgress) stageFinishCfg := NewStageFinishCfg(mainDB) stages := DefaultStages(ctx, @@ -94,6 +97,7 @@ func CreateStagedSync(ctx context.Context, stageSyncEpochCfg, stageShortRangeCfg, stageBodiesCfg, + stageStateSyncCfg, stageStatesCfg, lastMileCfg, stageFinishCfg,