diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index 3bebe8a00..60e9f4962 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -63,9 +63,8 @@ func initFastSyncStagesOrder() { SyncEpoch, ShortRange, BlockBodies, - States, - StateSync, Receipts, + StateSync, LastMile, Finish, } @@ -73,9 +72,8 @@ func initFastSyncStagesOrder() { StagesRevertOrder = RevertOrder{ Finish, LastMile, - Receipts, StateSync, - States, + Receipts, BlockBodies, ShortRange, SyncEpoch, @@ -85,9 +83,8 @@ func initFastSyncStagesOrder() { StagesCleanUpOrder = CleanUpOrder{ Finish, LastMile, - Receipts, StateSync, - States, + Receipts, BlockBodies, ShortRange, SyncEpoch, diff --git a/api/service/stagedstreamsync/stage_bodies.go b/api/service/stagedstreamsync/stage_bodies.go index b5d92e3a1..401a8bc6c 100644 --- a/api/service/stagedstreamsync/stage_bodies.go +++ b/api/service/stagedstreamsync/stage_bodies.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" @@ -26,6 +27,7 @@ type StageBodiesCfg struct { concurrency int protocol syncProtocol isBeacon bool + extractReceiptHashes bool logProgress bool } @@ -35,7 +37,7 @@ func NewStageBodies(cfg StageBodiesCfg) *StageBodies { } } -func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageBodiesCfg { +func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, extractReceiptHashes bool, logProgress bool) StageBodiesCfg { return StageBodiesCfg{ bc: bc, db: db, @@ -43,6 +45,7 @@ func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concu concurrency: concurrency, protocol: protocol, isBeacon: isBeacon, + extractReceiptHashes: extractReceiptHashes, logProgress: logProgress, } } @@ -118,7 +121,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev for i := 0; i != s.state.config.Concurrency; i++ { wg.Add(1) - go b.runBlockWorkerLoop(ctx, s.state.gbm, &wg, i, startTime) + go b.runBlockWorkerLoop(ctx, s.state.gbm, &wg, i, s, startTime) } wg.Wait() @@ -133,7 +136,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev } // runBlockWorkerLoop creates a work loop for download blocks -func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time) { +func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, s *StageState, startTime time.Time) { currentBlock := int(b.configs.bc.CurrentBlock().NumberU64()) @@ -184,6 +187,12 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload gbm.HandleRequestError(batch, err, stid) b.configs.protocol.RemoveStream(stid) } else { + if b.configs.extractReceiptHashes { + if err = b.verifyBlockAndExtractReceiptsData(blockBytes, sigBytes, s); err != nil { + gbm.HandleRequestError(batch, err, stid) + continue + } + } if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil { panic(ErrSaveBlocksToDbFailed) } @@ -204,6 +213,37 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload } } +func (b *StageBodies) verifyBlockAndExtractReceiptsData(batchBlockBytes [][]byte, batchSigBytes [][]byte, s *StageState) error { + var block *types.Block + for i := uint64(0); i < uint64(len(batchBlockBytes)); i++ { + blockBytes := batchBlockBytes[i] + sigBytes := batchSigBytes[i] + if blockBytes == nil { + continue + } + if err := rlp.DecodeBytes(blockBytes, &block); err != nil { + utils.Logger().Error(). + Uint64("block number", i). + Msg("block size invalid") + return ErrInvalidBlockBytes + } + if sigBytes != nil { + block.SetCurrentCommitSig(sigBytes) + } + + if block.NumberU64() != i { + return ErrInvalidBlockNumber + } + + if err := verifyBlock(b.configs.bc, block); err != nil { + return err + } + // add receipt hash for next stage + s.state.currentCycle.ReceiptHashes[block.NumberU64()] = block.Header().ReceiptHash() + } + return nil +} + // redownloadBadBlock tries to redownload the bad block from other streams func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) error { diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index c741151cd..7abf00e7a 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -81,14 +81,14 @@ func CreateStagedSync(ctx context.Context, return nil, errInitDB } - fastSync := config.SyncMode == FastSync + blockExecution := config.SyncMode == FullSync + extractReceiptHashes := config.SyncMode == FastSync || config.SyncMode == SnapSync stageHeadsCfg := NewStageHeadersCfg(bc, mainDB) stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB) stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB) - - stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) - stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, !fastSync, logger, config.LogProgress) + stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress) + stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, blockExecution, logger, config.LogProgress) stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress) stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB)