refactor staged stream sync to process fast sync and new block insertion

pull/4465/head
“GheisMohammadi” 1 year ago
parent cfc94bb4e1
commit 9954a904ae
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 2
      api/service/stagedstreamsync/adapter.go
  2. 13
      api/service/stagedstreamsync/const.go
  3. 6
      api/service/stagedstreamsync/sig_verify.go
  4. 4
      api/service/stagedstreamsync/stage_short_range.go
  5. 11
      api/service/stagedstreamsync/stage_state.go
  6. 1
      api/service/stagedstreamsync/staged_stream_sync.go
  7. 6
      api/service/stagedstreamsync/syncing.go

@ -31,6 +31,6 @@ type blockChain interface {
engine.ChainReader
Engine() engine.Engine
InsertChain(chain types.Blocks, verifyHeaders bool) (int, error)
InsertChain(chain types.Blocks, verifyHeaders bool, blockExecution bool) (int, error)
WriteCommitSig(blockNum uint64, lastCommits []byte) error
}

@ -40,6 +40,16 @@ const (
ShortRangeTimeout time.Duration = 1 * time.Minute
)
// SyncMode represents the synchronization mode of the downloader.
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32
const (
FullSync SyncMode = iota // Synchronize the entire blockchain history from full blocks
FastSync // Download all blocks and states
SnapSync // Download the chain and the state via compact snapshots
)
type (
// Config is the downloader config
Config struct {
@ -47,6 +57,9 @@ type (
// TODO: remove this when stream sync is fully up.
ServerOnly bool
// Synchronization mode of the downloader
SyncMode SyncMode
// parameters
Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests

@ -20,16 +20,16 @@ func (e *sigVerifyErr) Error() string {
return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error())
}
func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) {
func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks, blockExecution bool) (int, error) {
for i, block := range blocks {
if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil {
if err := verifyAndInsertBlock(bc, block, blockExecution, blocks[i+1:]...); err != nil {
return i, err
}
}
return len(blocks), nil
}
func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error {
func verifyAndInsertBlock(bc blockChain, block *types.Block, blockExecution bool, nextBlocks ...*types.Block) error {
var (
sigBytes bls.SerializedSignature
bitmap []byte

@ -136,7 +136,9 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
sh.streamsFailed(whitelist, "remote nodes cannot provide blocks with target hashes")
}
n, err := verifyAndInsertBlocks(sr.configs.bc, blocks)
utils.Logger().Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result")
n, err := verifyAndInsertBlocks(sr.configs.bc, blocks, true)
numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n))
if err != nil {
utils.Logger().Warn().Err(err).Int("blocks inserted", n).Msg("Insert block failed")

@ -23,6 +23,7 @@ type StageStatesCfg struct {
db kv.RwDB
blockDBs []kv.RwDB
concurrency int
blockExecution bool
logger zerolog.Logger
logProgress bool
}
@ -38,6 +39,7 @@ func NewStageStatesCfg(
db kv.RwDB,
blockDBs []kv.RwDB,
concurrency int,
blockExecution bool,
logger zerolog.Logger,
logProgress bool) StageStatesCfg {
@ -46,6 +48,7 @@ func NewStageStatesCfg(
db: db,
blockDBs: blockDBs,
concurrency: concurrency,
blockExecution: blockExecution,
logger: logger,
logProgress: logProgress,
}
@ -108,6 +111,8 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
fmt.Print("\033[s") // save the cursor position
}
s.state.currentCycle.ReceiptHashes = make(map[uint64]common.Hash)
for i := currProgress + 1; i <= targetHeight; i++ {
blkKey := marshalData(i)
loopID, streamID := gbm.GetDownloadDetails(i)
@ -157,7 +162,7 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return ErrInvalidBlockNumber
}
if err := verifyAndInsertBlock(stg.configs.bc, block); err != nil {
if err := verifyAndInsertBlock(stg.configs.bc, block, stg.configs.blockExecution); err != nil {
stg.configs.logger.Warn().Err(err).Uint64("cycle target block", targetHeight).
Uint64("block number", block.NumberU64()).
Msg(WrapStagedSyncMsg("insert blocks failed in long range"))
@ -169,6 +174,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return err
}
// TODO: only for fast sync
// add receipt hash for next stage
s.state.currentCycle.ReceiptHashes[block.NumberU64()]=block.Header().ReceiptHash()
if invalidBlockRevert {
if s.state.invalidBlock.Number == i {
s.state.invalidBlock.resolve()

@ -104,6 +104,7 @@ type Timing struct {
type SyncCycle struct {
Number uint64
TargetHeight uint64
ReceiptHashes map[uint64]common.Hash
lock sync.RWMutex
}

@ -81,13 +81,16 @@ func CreateStagedSync(ctx context.Context,
return nil, errInitDB
}
fastSync := config.SyncMode == FastSync
stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, !fastSync, logger, config.LogProgress)
stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress)
stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB)
stageFinishCfg := NewStageFinishCfg(mainDB)
@ -98,6 +101,7 @@ func CreateStagedSync(ctx context.Context,
stageBodiesCfg,
stageStateSyncCfg,
stageStatesCfg,
stageReceiptsCfg,
lastMileCfg,
stageFinishCfg,
)

Loading…
Cancel
Save