improve stream sync current cycle and pivot checks, fix edge case issue to insert pivot block and its receipts

pull/4465/head
“GheisMohammadi” 1 year ago
parent c808f2b733
commit bdd7f142c7
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 6
      api/service/stagedstreamsync/stage_heads.go
  2. 2
      api/service/stagedstreamsync/stage_receipts.go
  3. 6
      api/service/stagedstreamsync/stage_state.go
  4. 5
      api/service/stagedstreamsync/stage_statesync.go
  5. 44
      api/service/stagedstreamsync/syncing.go
  6. 10
      api/service/stagedstreamsync/types.go

@ -90,11 +90,11 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock
} }
// check pivot: if chain hasn't reached to pivot yet // check pivot: if chain hasn't reached to pivot yet
if s.state.status.pivotBlock != nil && s.state.CurrentBlockNumber() < s.state.status.pivotBlock.NumberU64() { if s.state.status.cycleSyncMode != FullSync && s.state.status.pivotBlock != nil {
// set target height on the block before pivot // set target height on the block before pivot
// pivot block would be downloaded by StateSync stage // pivot block would be downloaded by StateSync stage
if targetHeight >= s.state.status.pivotBlock.NumberU64() { if !s.state.status.statesSynced && targetHeight > s.state.status.pivotBlock.NumberU64() {
targetHeight = s.state.status.pivotBlock.NumberU64() - 1 targetHeight = s.state.status.pivotBlock.NumberU64()
} }
} }

@ -52,7 +52,7 @@ func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, con
func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// only execute this stage in fast/snap sync mode // only execute this stage in fast/snap sync mode
if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() >= s.state.status.pivotBlock.NumberU64() { if s.state.status.cycleSyncMode == FullSync {
return nil return nil
} }

@ -54,10 +54,8 @@ func NewStageStatesCfg(
// Exec progresses States stage in the forward direction // Exec progresses States stage in the forward direction
func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// only execute this stage in full sync mode // only execute this stage in full sync mode
if s.state.config.SyncMode != FullSync { if s.state.status.cycleSyncMode != FullSync {
if s.state.status.pivotBlock != nil && s.state.bc.CurrentBlock().NumberU64() < s.state.status.pivotBlock.NumberU64() { return nil
return nil
}
} }
// for short range sync, skip this step // for short range sync, skip this step

@ -56,7 +56,7 @@ func NewStageStateSyncCfg(bc core.BlockChain,
func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// only execute this stage in fast/snap sync mode and once we reach to pivot // only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64()-1 { if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() {
return nil return nil
} }
@ -123,6 +123,9 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo
return err return err
} }
// states should be fully synced in this stage
s.state.status.statesSynced = true
/* /*
gbm := s.state.gbm gbm := s.state.gbm

@ -224,11 +224,16 @@ func (s *StagedStreamSync) Debug(source string, msg interface{}) {
} }
} }
func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint64) (uint64, error) { // checkPivot checks pivot block and returns pivot block and cycle Sync mode
func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint64, initSync bool) (*types.Block, SyncMode, error) {
if s.config.SyncMode == FullSync {
return nil, FullSync, nil
}
// do full sync if chain is at early stage // do full sync if chain is at early stage
if estimatedHeight < MaxPivotDistanceToHead { if initSync && estimatedHeight < MaxPivotDistanceToHead {
return 0, nil return nil, FullSync, nil
} }
pivotBlockNumber := uint64(0) pivotBlockNumber := uint64(0)
@ -240,23 +245,21 @@ func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint6
if pivotBlockNumber < estimatedHeight-MaxPivotDistanceToHead { if pivotBlockNumber < estimatedHeight-MaxPivotDistanceToHead {
pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead
if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil { if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil {
s.logger.Error().Err(err). s.logger.Warn().Err(err).
Uint64("current pivot number", *curPivot). Uint64("current pivot number", *curPivot).
Uint64("new pivot number", pivotBlockNumber). Uint64("new pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("update pivot number failed")) Msg(WrapStagedSyncMsg("update pivot number failed"))
return pivotBlockNumber, err pivotBlockNumber = *curPivot
} }
} }
} }
} else { } else {
pivot := estimatedHeight - MinPivotDistanceToHead if head := s.CurrentBlockNumber(); s.config.SyncMode == FastSync && head <= 1 {
if s.config.SyncMode == FastSync && s.CurrentBlockNumber() < pivot { pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead
pivotBlockNumber = pivot
if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil { if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil {
s.logger.Error().Err(err). s.logger.Warn().Err(err).
Uint64("new pivot number", pivotBlockNumber). Uint64("new pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("update pivot number failed")) Msg(WrapStagedSyncMsg("update pivot number failed"))
return pivotBlockNumber, err
} }
} }
} }
@ -265,17 +268,17 @@ func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint6
s.logger.Error().Err(err). s.logger.Error().Err(err).
Uint64("pivot", pivotBlockNumber). Uint64("pivot", pivotBlockNumber).
Msg(WrapStagedSyncMsg("query peers for pivot block failed")) Msg(WrapStagedSyncMsg("query peers for pivot block failed"))
return pivotBlockNumber, err return block, FastSync, err
} else { } else {
s.status.pivotBlock = block s.status.pivotBlock = block
s.logger.Info().
Uint64("estimatedHeight", estimatedHeight).
Uint64("pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("fast/snap sync mode, pivot is set successfully"))
return block, FastSync, nil
} }
s.logger.Info().
Uint64("estimatedHeight", estimatedHeight).
Uint64("pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("fast/snap sync mode, pivot is set successfully"))
} }
return nil, FullSync, nil
return pivotBlockNumber, nil
} }
// doSync does the long range sync. // doSync does the long range sync.
@ -310,9 +313,12 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
// We are probably in full sync, but we might have rewound to before the // We are probably in full sync, but we might have rewound to before the
// fast/snap sync pivot, check if we should reenable // fast/snap sync pivot, check if we should reenable
if _, err := s.checkPivot(downloaderContext, estimatedHeight); err != nil { if pivotBlock, cycleSyncMode, err := s.checkPivot(downloaderContext, estimatedHeight, initSync); err != nil {
s.logger.Error().Err(err).Msg(WrapStagedSyncMsg("check pivot failed")) s.logger.Error().Err(err).Msg(WrapStagedSyncMsg("check pivot failed"))
return 0, 0, err return 0, 0, err
} else {
s.status.cycleSyncMode = cycleSyncMode
s.status.pivotBlock = pivotBlock
} }
s.startSyncing() s.startSyncing()
@ -451,7 +457,7 @@ func (s *StagedStreamSync) checkPrerequisites() error {
func (s *StagedStreamSync) CurrentBlockNumber() uint64 { func (s *StagedStreamSync) CurrentBlockNumber() uint64 {
// if current head is ahead of pivot block, return chain head regardless of sync mode // if current head is ahead of pivot block, return chain head regardless of sync mode
if s.status.pivotBlock != nil && s.bc.CurrentBlock().NumberU64() > s.status.pivotBlock.NumberU64() { if s.status.pivotBlock != nil && s.bc.CurrentBlock().NumberU64() >= s.status.pivotBlock.NumberU64() {
return s.bc.CurrentBlock().NumberU64() return s.bc.CurrentBlock().NumberU64()
} }

@ -14,10 +14,12 @@ var (
) )
type status struct { type status struct {
isSyncing bool isSyncing bool
targetBN uint64 targetBN uint64
pivotBlock *types.Block pivotBlock *types.Block
lock sync.Mutex cycleSyncMode SyncMode
statesSynced bool
lock sync.Mutex
} }
func newStatus() status { func newStatus() status {

Loading…
Cancel
Save