diff --git a/api/service/stagedstreamsync/stage_heads.go b/api/service/stagedstreamsync/stage_heads.go index 99e0248ba..46ebed1d2 100644 --- a/api/service/stagedstreamsync/stage_heads.go +++ b/api/service/stagedstreamsync/stage_heads.go @@ -90,11 +90,11 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock } // check pivot: if chain hasn't reached to pivot yet - if s.state.status.pivotBlock != nil && s.state.CurrentBlockNumber() < s.state.status.pivotBlock.NumberU64() { + if s.state.status.cycleSyncMode != FullSync && s.state.status.pivotBlock != nil { // set target height on the block before pivot // pivot block would be downloaded by StateSync stage - if targetHeight >= s.state.status.pivotBlock.NumberU64() { - targetHeight = s.state.status.pivotBlock.NumberU64() - 1 + if !s.state.status.statesSynced && targetHeight > s.state.status.pivotBlock.NumberU64() { + targetHeight = s.state.status.pivotBlock.NumberU64() } } diff --git a/api/service/stagedstreamsync/stage_receipts.go b/api/service/stagedstreamsync/stage_receipts.go index 0a2d8ab02..63f09f986 100644 --- a/api/service/stagedstreamsync/stage_receipts.go +++ b/api/service/stagedstreamsync/stage_receipts.go @@ -52,7 +52,7 @@ func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, con func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { // only execute this stage in fast/snap sync mode - if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() >= s.state.status.pivotBlock.NumberU64() { + if s.state.status.cycleSyncMode == FullSync { return nil } diff --git a/api/service/stagedstreamsync/stage_state.go b/api/service/stagedstreamsync/stage_state.go index 80a3faa0e..c477f4309 100644 --- a/api/service/stagedstreamsync/stage_state.go +++ b/api/service/stagedstreamsync/stage_state.go @@ -54,10 +54,8 @@ func NewStageStatesCfg( // Exec progresses States stage in the forward direction func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { // only execute this stage in full sync mode - if s.state.config.SyncMode != FullSync { - if s.state.status.pivotBlock != nil && s.state.bc.CurrentBlock().NumberU64() < s.state.status.pivotBlock.NumberU64() { - return nil - } + if s.state.status.cycleSyncMode != FullSync { + return nil } // for short range sync, skip this step diff --git a/api/service/stagedstreamsync/stage_statesync.go b/api/service/stagedstreamsync/stage_statesync.go index 130f7f71f..1a973c13e 100644 --- a/api/service/stagedstreamsync/stage_statesync.go +++ b/api/service/stagedstreamsync/stage_statesync.go @@ -56,7 +56,7 @@ func NewStageStateSyncCfg(bc core.BlockChain, func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { // only execute this stage in fast/snap sync mode and once we reach to pivot - if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64()-1 { + if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() { return nil } @@ -123,6 +123,9 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo return err } + // states should be fully synced in this stage + s.state.status.statesSynced = true + /* gbm := s.state.gbm diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index 03043525b..88e0a0857 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -224,11 +224,16 @@ func (s *StagedStreamSync) Debug(source string, msg interface{}) { } } -func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint64) (uint64, error) { +// checkPivot checks pivot block and returns pivot block and cycle Sync mode +func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint64, initSync bool) (*types.Block, SyncMode, error) { + + if s.config.SyncMode == FullSync { + return nil, FullSync, nil + } // do full sync if chain is at early stage - if estimatedHeight < MaxPivotDistanceToHead { - return 0, nil + if initSync && estimatedHeight < MaxPivotDistanceToHead { + return nil, FullSync, nil } pivotBlockNumber := uint64(0) @@ -240,23 +245,21 @@ func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint6 if pivotBlockNumber < estimatedHeight-MaxPivotDistanceToHead { pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil { - s.logger.Error().Err(err). + s.logger.Warn().Err(err). Uint64("current pivot number", *curPivot). Uint64("new pivot number", pivotBlockNumber). Msg(WrapStagedSyncMsg("update pivot number failed")) - return pivotBlockNumber, err + pivotBlockNumber = *curPivot } } } } else { - pivot := estimatedHeight - MinPivotDistanceToHead - if s.config.SyncMode == FastSync && s.CurrentBlockNumber() < pivot { - pivotBlockNumber = pivot + if head := s.CurrentBlockNumber(); s.config.SyncMode == FastSync && head <= 1 { + pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil { - s.logger.Error().Err(err). + s.logger.Warn().Err(err). Uint64("new pivot number", pivotBlockNumber). Msg(WrapStagedSyncMsg("update pivot number failed")) - return pivotBlockNumber, err } } } @@ -265,17 +268,17 @@ func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint6 s.logger.Error().Err(err). Uint64("pivot", pivotBlockNumber). Msg(WrapStagedSyncMsg("query peers for pivot block failed")) - return pivotBlockNumber, err + return block, FastSync, err } else { s.status.pivotBlock = block + s.logger.Info(). + Uint64("estimatedHeight", estimatedHeight). + Uint64("pivot number", pivotBlockNumber). + Msg(WrapStagedSyncMsg("fast/snap sync mode, pivot is set successfully")) + return block, FastSync, nil } - s.logger.Info(). - Uint64("estimatedHeight", estimatedHeight). - Uint64("pivot number", pivotBlockNumber). - Msg(WrapStagedSyncMsg("fast/snap sync mode, pivot is set successfully")) } - - return pivotBlockNumber, nil + return nil, FullSync, nil } // doSync does the long range sync. @@ -310,9 +313,12 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo // We are probably in full sync, but we might have rewound to before the // fast/snap sync pivot, check if we should reenable - if _, err := s.checkPivot(downloaderContext, estimatedHeight); err != nil { + if pivotBlock, cycleSyncMode, err := s.checkPivot(downloaderContext, estimatedHeight, initSync); err != nil { s.logger.Error().Err(err).Msg(WrapStagedSyncMsg("check pivot failed")) return 0, 0, err + } else { + s.status.cycleSyncMode = cycleSyncMode + s.status.pivotBlock = pivotBlock } s.startSyncing() @@ -451,7 +457,7 @@ func (s *StagedStreamSync) checkPrerequisites() error { func (s *StagedStreamSync) CurrentBlockNumber() uint64 { // if current head is ahead of pivot block, return chain head regardless of sync mode - if s.status.pivotBlock != nil && s.bc.CurrentBlock().NumberU64() > s.status.pivotBlock.NumberU64() { + if s.status.pivotBlock != nil && s.bc.CurrentBlock().NumberU64() >= s.status.pivotBlock.NumberU64() { return s.bc.CurrentBlock().NumberU64() } diff --git a/api/service/stagedstreamsync/types.go b/api/service/stagedstreamsync/types.go index 17a3d345f..e46b61429 100644 --- a/api/service/stagedstreamsync/types.go +++ b/api/service/stagedstreamsync/types.go @@ -14,10 +14,12 @@ var ( ) type status struct { - isSyncing bool - targetBN uint64 - pivotBlock *types.Block - lock sync.Mutex + isSyncing bool + targetBN uint64 + pivotBlock *types.Block + cycleSyncMode SyncMode + statesSynced bool + lock sync.Mutex } func newStatus() status {