refactor stage bodies to extract receip hashes in this stage rather than stage state

pull/4465/head
“GheisMohammadi” 1 year ago
parent 1f26944a33
commit 7c3807a525
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 9
      api/service/stagedstreamsync/default_stages.go
  2. 46
      api/service/stagedstreamsync/stage_bodies.go
  3. 8
      api/service/stagedstreamsync/syncing.go

@ -63,9 +63,8 @@ func initFastSyncStagesOrder() {
SyncEpoch,
ShortRange,
BlockBodies,
States,
StateSync,
Receipts,
StateSync,
LastMile,
Finish,
}
@ -73,9 +72,8 @@ func initFastSyncStagesOrder() {
StagesRevertOrder = RevertOrder{
Finish,
LastMile,
Receipts,
StateSync,
States,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,
@ -85,9 +83,8 @@ func initFastSyncStagesOrder() {
StagesCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
Receipts,
StateSync,
States,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
@ -26,6 +27,7 @@ type StageBodiesCfg struct {
concurrency int
protocol syncProtocol
isBeacon bool
extractReceiptHashes bool
logProgress bool
}
@ -35,7 +37,7 @@ func NewStageBodies(cfg StageBodiesCfg) *StageBodies {
}
}
func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageBodiesCfg {
func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, extractReceiptHashes bool, logProgress bool) StageBodiesCfg {
return StageBodiesCfg{
bc: bc,
db: db,
@ -43,6 +45,7 @@ func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concu
concurrency: concurrency,
protocol: protocol,
isBeacon: isBeacon,
extractReceiptHashes: extractReceiptHashes,
logProgress: logProgress,
}
}
@ -118,7 +121,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
for i := 0; i != s.state.config.Concurrency; i++ {
wg.Add(1)
go b.runBlockWorkerLoop(ctx, s.state.gbm, &wg, i, startTime)
go b.runBlockWorkerLoop(ctx, s.state.gbm, &wg, i, s, startTime)
}
wg.Wait()
@ -133,7 +136,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
}
// runBlockWorkerLoop creates a work loop for download blocks
func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time) {
func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, s *StageState, startTime time.Time) {
currentBlock := int(b.configs.bc.CurrentBlock().NumberU64())
@ -184,6 +187,12 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.RemoveStream(stid)
} else {
if b.configs.extractReceiptHashes {
if err = b.verifyBlockAndExtractReceiptsData(blockBytes, sigBytes, s); err != nil {
gbm.HandleRequestError(batch, err, stid)
continue
}
}
if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
panic(ErrSaveBlocksToDbFailed)
}
@ -204,6 +213,37 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
}
}
func (b *StageBodies) verifyBlockAndExtractReceiptsData(batchBlockBytes [][]byte, batchSigBytes [][]byte, s *StageState) error {
var block *types.Block
for i := uint64(0); i < uint64(len(batchBlockBytes)); i++ {
blockBytes := batchBlockBytes[i]
sigBytes := batchSigBytes[i]
if blockBytes == nil {
continue
}
if err := rlp.DecodeBytes(blockBytes, &block); err != nil {
utils.Logger().Error().
Uint64("block number", i).
Msg("block size invalid")
return ErrInvalidBlockBytes
}
if sigBytes != nil {
block.SetCurrentCommitSig(sigBytes)
}
if block.NumberU64() != i {
return ErrInvalidBlockNumber
}
if err := verifyBlock(b.configs.bc, block); err != nil {
return err
}
// add receipt hash for next stage
s.state.currentCycle.ReceiptHashes[block.NumberU64()] = block.Header().ReceiptHash()
}
return nil
}
// redownloadBadBlock tries to redownload the bad block from other streams
func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) error {

@ -81,14 +81,14 @@ func CreateStagedSync(ctx context.Context,
return nil, errInitDB
}
fastSync := config.SyncMode == FastSync
blockExecution := config.SyncMode == FullSync
extractReceiptHashes := config.SyncMode == FastSync || config.SyncMode == SnapSync
stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, !fastSync, logger, config.LogProgress)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, blockExecution, logger, config.LogProgress)
stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress)
stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB)

Loading…
Cancel
Save