From 6f3aa67b88d4e026fc1117b7c2db88b0fc81a1ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 29 Jun 2023 18:43:05 +0800 Subject: [PATCH] refactor stage receipts and change the stages sorting --- .../stagedstreamsync/default_stages.go | 25 +- .../receipt_download_manager.go | 5 +- .../stagedstreamsync/stage_receipts.go | 327 ++++++++++++++++++ api/service/stagedstreamsync/stages.go | 3 +- 4 files changed, 349 insertions(+), 11 deletions(-) create mode 100644 api/service/stagedstreamsync/stage_receipts.go diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index 4a1e719f2..e1bb8578d 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -13,9 +13,9 @@ var DefaultForwardOrder = ForwardOrder{ SyncEpoch, ShortRange, BlockBodies, - StateSync, - // Stages below don't use Internet States, + StateSync, + Receipts, LastMile, Finish, } @@ -23,8 +23,9 @@ var DefaultForwardOrder = ForwardOrder{ var DefaultRevertOrder = RevertOrder{ Finish, LastMile, - States, + Receipts, StateSync, + States, BlockBodies, ShortRange, SyncEpoch, @@ -34,8 +35,9 @@ var DefaultRevertOrder = RevertOrder{ var DefaultCleanUpOrder = CleanUpOrder{ Finish, LastMile, - States, + Receipts, StateSync, + States, BlockBodies, ShortRange, SyncEpoch, @@ -49,6 +51,7 @@ func DefaultStages(ctx context.Context, bodiesCfg StageBodiesCfg, stateSyncCfg StageStateSyncCfg, statesCfg StageStatesCfg, + receiptsCfg StageReceiptsCfg, lastMileCfg StageLastMileCfg, finishCfg StageFinishCfg, ) []*Stage { @@ -57,8 +60,9 @@ func DefaultStages(ctx context.Context, handlerStageShortRange := NewStageShortRange(srCfg) handlerStageEpochSync := NewStageEpoch(seCfg) handlerStageBodies := NewStageBodies(bodiesCfg) - handlerStageStateSync := NewStageStateSync(stateSyncCfg) handlerStageStates := NewStageStates(statesCfg) + handlerStageStateSync := NewStageStateSync(stateSyncCfg) + handlerStageReceipts := NewStageReceipts(receiptsCfg) handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageFinish := NewStageFinish(finishCfg) @@ -83,15 +87,20 @@ func DefaultStages(ctx context.Context, Description: "Retrieve Block Bodies", Handler: handlerStageBodies, }, + { + ID: States, + Description: "Update Blockchain State", + Handler: handlerStageStates, + }, { ID: StateSync, Description: "Retrieve States", Handler: handlerStageStateSync, }, { - ID: States, - Description: "Update Blockchain State", - Handler: handlerStageStates, + ID: Receipts, + Description: "Retrieve Receipts", + Handler: handlerStageReceipts, }, { ID: LastMile, diff --git a/api/service/stagedstreamsync/receipt_download_manager.go b/api/service/stagedstreamsync/receipt_download_manager.go index ffe20d0be..2eaa3ca45 100644 --- a/api/service/stagedstreamsync/receipt_download_manager.go +++ b/api/service/stagedstreamsync/receipt_download_manager.go @@ -3,6 +3,7 @@ package stagedstreamsync import ( "sync" + "github.com/harmony-one/harmony/core/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/ledgerwatch/erigon-lib/kv" "github.com/rs/zerolog" @@ -76,13 +77,13 @@ func (rdm *receiptDownloadManager) HandleRequestError(bns []uint64, err error, s } // HandleRequestResult handles get receipts result -func (rdm *receiptDownloadManager) HandleRequestResult(bns []uint64, receiptBytes [][]byte, sigBytes [][]byte, loopID int, streamID sttypes.StreamID) error { +func (rdm *receiptDownloadManager) HandleRequestResult(bns []uint64, receipts []*types.Receipt, loopID int, streamID sttypes.StreamID) error { rdm.lock.Lock() defer rdm.lock.Unlock() for i, bn := range bns { delete(rdm.requesting, bn) - if indexExists(receiptBytes, i) && len(receiptBytes[i]) <= 1 { + if indexExists(receipts, i) { rdm.retries.push(bn) } else { rdm.processing[bn] = struct{}{} diff --git a/api/service/stagedstreamsync/stage_receipts.go b/api/service/stagedstreamsync/stage_receipts.go new file mode 100644 index 000000000..a9bffa30f --- /dev/null +++ b/api/service/stagedstreamsync/stage_receipts.go @@ -0,0 +1,327 @@ +package stagedstreamsync + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/utils" + sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/pkg/errors" +) + +type StageReceipts struct { + configs StageReceiptsCfg +} + +type StageReceiptsCfg struct { + bc core.BlockChain + db kv.RwDB + blockDBs []kv.RwDB + concurrency int + protocol syncProtocol + isBeacon bool + logProgress bool +} + +func NewStageReceipts(cfg StageReceiptsCfg) *StageReceipts { + return &StageReceipts{ + configs: cfg, + } +} + +func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageReceiptsCfg { + return StageReceiptsCfg{ + bc: bc, + db: db, + blockDBs: blockDBs, + concurrency: concurrency, + protocol: protocol, + isBeacon: isBeacon, + logProgress: logProgress, + } +} + +// Exec progresses Bodies stage in the forward direction +func (b *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { + + useInternalTx := tx == nil + + if invalidBlockRevert { + return nil + } + + // for short range sync, skip this stage + if !s.state.initSync { + return nil + } + + maxHeight := s.state.status.targetBN + currentHead := b.configs.bc.CurrentBlock().NumberU64() + if currentHead >= maxHeight { + return nil + } + currProgress := uint64(0) + targetHeight := s.state.currentCycle.TargetHeight + + if errV := CreateView(ctx, b.configs.db, tx, func(etx kv.Tx) error { + if currProgress, err = s.CurrentStageProgress(etx); err != nil { + return err + } + return nil + }); errV != nil { + return errV + } + + if currProgress == 0 { + currProgress = currentHead + } + + if currProgress >= targetHeight { + return nil + } + + // size := uint64(0) + startTime := time.Now() + // startBlock := currProgress + if b.configs.logProgress { + fmt.Print("\033[s") // save the cursor position + } + + if useInternalTx { + var err error + tx, err = b.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + // Fetch blocks from neighbors + s.state.rdm = newReceiptDownloadManager(tx, b.configs.bc, targetHeight, s.state.logger) + + // Setup workers to fetch blocks from remote node + var wg sync.WaitGroup + + for i := 0; i != s.state.config.Concurrency; i++ { + wg.Add(1) + go b.runReceiptWorkerLoop(ctx, s.state.rdm, &wg, i, startTime) + } + + wg.Wait() + + if useInternalTx { + if err := tx.Commit(); err != nil { + return err + } + } + + return nil +} + +// runReceiptWorkerLoop creates a work loop for download receipts +func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time) { + + currentBlock := int(b.configs.bc.CurrentBlock().NumberU64()) + + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + } + batch := rdm.GetNextBatch() + if len(batch) == 0 { + select { + case <-ctx.Done(): + return + case <-time.After(100 * time.Millisecond): + return + } + } + var hashes []common.Hash + for _, bn := range batch { + header := b.configs.bc.GetHeaderByNumber(bn) + hashes = append(hashes, header.ReceiptHash()) + } + receipts, stid, err := b.downloadReceipts(ctx, hashes) + if err != nil { + if !errors.Is(err, context.Canceled) { + b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed") + } + utils.Logger().Error(). + Err(err). + Str("stream", string(stid)). + Interface("block numbers", batch). + Msg(WrapStagedSyncMsg("downloadRawBlocks failed")) + err = errors.Wrap(err, "request error") + rdm.HandleRequestError(batch, err, stid) + } else if receipts == nil || len(receipts) == 0 { + utils.Logger().Warn(). + Str("stream", string(stid)). + Interface("block numbers", batch). + Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty reciptBytes")) + err := errors.New("downloadRawBlocks received empty reciptBytes") + rdm.HandleRequestError(batch, err, stid) + } else { + rdm.HandleRequestResult(batch, receipts, loopID, stid) + if b.configs.logProgress { + //calculating block download speed + dt := time.Now().Sub(startTime).Seconds() + speed := float64(0) + if dt > 0 { + speed = float64(len(rdm.rdd)) / dt + } + blockSpeed := fmt.Sprintf("%.2f", speed) + + fmt.Print("\033[u\033[K") // restore the cursor position and clear the line + fmt.Println("downloaded blocks:", currentBlock+len(rdm.rdd), "/", int(rdm.targetBN), "(", blockSpeed, "blocks/s", ")") + } + } + } +} + +func (b *StageReceipts) downloadReceipts(ctx context.Context, hs []common.Hash) ([]*types.Receipt, sttypes.StreamID, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + receipts, stid, err := b.configs.protocol.GetReceipts(ctx, hs) + if err != nil { + return nil, stid, err + } + if err := validateGetReceiptsResult(hs, receipts); err != nil { + return nil, stid, err + } + return receipts, stid, nil +} + +func (b *StageReceipts) downloadRawBlocks(ctx context.Context, bns []uint64) ([][]byte, [][]byte, sttypes.StreamID, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + return b.configs.protocol.GetRawBlocksByNumber(ctx, bns) +} + +func validateGetReceiptsResult(requested []common.Hash, result []*types.Receipt) error { + // TODO: validate each receipt here + + return nil +} + +func (b *StageReceipts) saveProgress(ctx context.Context, s *StageState, progress uint64, tx kv.RwTx) (err error) { + useInternalTx := tx == nil + if useInternalTx { + var err error + tx, err = b.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + // save progress + if err = s.Update(tx, progress); err != nil { + utils.Logger().Error(). + Err(err). + Msgf("[STAGED_SYNC] saving progress for block bodies stage failed") + return ErrSavingBodiesProgressFail + } + + if useInternalTx { + if err := tx.Commit(); err != nil { + return err + } + } + return nil +} + +func (b *StageReceipts) cleanBlocksDB(ctx context.Context, loopID int) (err error) { + tx, errb := b.configs.blockDBs[loopID].BeginRw(ctx) + if errb != nil { + return errb + } + defer tx.Rollback() + + // clean block bodies db + if err = tx.ClearBucket(BlocksBucket); err != nil { + utils.Logger().Error(). + Err(err). + Msgf("[STAGED_STREAM_SYNC] clear blocks bucket after revert failed") + return err + } + // clean block signatures db + if err = tx.ClearBucket(BlockSignaturesBucket); err != nil { + utils.Logger().Error(). + Err(err). + Msgf("[STAGED_STREAM_SYNC] clear block signatures bucket after revert failed") + return err + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +func (b *StageReceipts) cleanAllBlockDBs(ctx context.Context) (err error) { + //clean all blocks DBs + for i := 0; i < b.configs.concurrency; i++ { + if err := b.cleanBlocksDB(ctx, i); err != nil { + return err + } + } + return nil +} + +func (b *StageReceipts) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { + + //clean all blocks DBs + if err := b.cleanAllBlockDBs(ctx); err != nil { + return err + } + + useInternalTx := tx == nil + if useInternalTx { + tx, err = b.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + // save progress + currentHead := b.configs.bc.CurrentBlock().NumberU64() + if err = s.Update(tx, currentHead); err != nil { + utils.Logger().Error(). + Err(err). + Msgf("[STAGED_SYNC] saving progress for block bodies stage after revert failed") + return err + } + + if err = u.Done(tx); err != nil { + return err + } + + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + +func (b *StageReceipts) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { + //clean all blocks DBs + if err := b.cleanAllBlockDBs(ctx); err != nil { + return err + } + + return nil +} diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index cb6efa0cd..909bb25c0 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -12,8 +12,9 @@ const ( ShortRange SyncStageID = "ShortRange" // short range SyncEpoch SyncStageID = "SyncEpoch" // epoch sync BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified - StateSync SyncStageID = "StateSync" // State sync States SyncStageID = "States" // will construct most recent state from downloaded blocks + StateSync SyncStageID = "StateSync" // State sync + Receipts SyncStageID = "Receipts" // Receipts LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well Finish SyncStageID = "Finish" // Nominal stage after all other stages )