Fixed bug with storing already cancelled context. (#4445)

* Fixed context usages.

* Additional fixes.

* Increased timeout.

* Additional logs.

* Fixed short range.
improvement/stream_sync_single_tx
Konstantin 1 year ago committed by GitHub
parent 098900291c
commit e4088f9e83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      api/service/stagedstreamsync/short_range_helper.go
  2. 9
      api/service/stagedstreamsync/stage.go
  3. 77
      api/service/stagedstreamsync/stage_bodies.go
  4. 39
      api/service/stagedstreamsync/stage_epoch.go
  5. 28
      api/service/stagedstreamsync/stage_finish.go
  6. 31
      api/service/stagedstreamsync/stage_heads.go
  7. 42
      api/service/stagedstreamsync/stage_short_range.go
  8. 36
      api/service/stagedstreamsync/stage_state.go
  9. 59
      api/service/stagedstreamsync/staged_stream_sync.go
  10. 30
      api/service/stagedstreamsync/syncing.go
  11. 2
      consensus/consensus_service.go

@ -15,13 +15,11 @@ import (
type srHelper struct {
syncProtocol syncProtocol
ctx context.Context
config Config
logger zerolog.Logger
config Config
logger zerolog.Logger
}
func (sh *srHelper) getHashChain(bns []uint64) ([]common.Hash, []sttypes.StreamID, error) {
func (sh *srHelper) getHashChain(ctx context.Context, bns []uint64) ([]common.Hash, []sttypes.StreamID, error) {
results := newBlockHashResults(bns)
var wg sync.WaitGroup
@ -31,7 +29,7 @@ func (sh *srHelper) getHashChain(bns []uint64) ([]common.Hash, []sttypes.StreamI
go func(index int) {
defer wg.Done()
hashes, stid, err := sh.doGetBlockHashesRequest(bns)
hashes, stid, err := sh.doGetBlockHashesRequest(ctx, bns)
if err != nil {
sh.logger.Warn().Err(err).Str("StreamID", string(stid)).
Msg(WrapStagedSyncMsg("doGetBlockHashes return error"))
@ -43,10 +41,10 @@ func (sh *srHelper) getHashChain(bns []uint64) ([]common.Hash, []sttypes.StreamI
wg.Wait()
select {
case <-sh.ctx.Done():
sh.logger.Info().Err(sh.ctx.Err()).Int("num blocks", results.numBlocksWithResults()).
case <-ctx.Done():
sh.logger.Info().Err(ctx.Err()).Int("num blocks", results.numBlocksWithResults()).
Msg(WrapStagedSyncMsg("short range sync get hashes timed out"))
return nil, nil, sh.ctx.Err()
return nil, nil, ctx.Err()
default:
}
@ -56,13 +54,12 @@ func (sh *srHelper) getHashChain(bns []uint64) ([]common.Hash, []sttypes.StreamI
return hashChain, wl, nil
}
func (sh *srHelper) getBlocksChain(bns []uint64) ([]*types.Block, sttypes.StreamID, error) {
return sh.doGetBlocksByNumbersRequest(bns)
func (sh *srHelper) getBlocksChain(ctx context.Context, bns []uint64) ([]*types.Block, sttypes.StreamID, error) {
return sh.doGetBlocksByNumbersRequest(ctx, bns)
}
func (sh *srHelper) getBlocksByHashes(hashes []common.Hash, whitelist []sttypes.StreamID) ([]*types.Block, []sttypes.StreamID, error) {
ctx, cancel := context.WithCancel(sh.ctx)
defer cancel()
func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash, whitelist []sttypes.StreamID) ([]*types.Block, []sttypes.StreamID, error) {
m := newGetBlocksByHashManager(hashes, whitelist)
var (
@ -80,7 +77,8 @@ func (sh *srHelper) getBlocksByHashes(hashes []common.Hash, whitelist []sttypes.
for i := 0; i != concurrency; i++ {
go func(index int) {
defer wg.Done()
defer cancel() // it's ok to cancel context more than once
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
if m.isDone() {
@ -121,11 +119,11 @@ func (sh *srHelper) getBlocksByHashes(hashes []common.Hash, whitelist []sttypes.
return nil, nil, gErr
}
select {
case <-sh.ctx.Done():
case <-ctx.Done():
res, _, _ := m.getResults()
sh.logger.Info().Err(sh.ctx.Err()).Int("num blocks", len(res)).
sh.logger.Info().Err(ctx.Err()).Int("num blocks", len(res)).
Msg(WrapStagedSyncMsg("short range sync get blocks timed out"))
return nil, nil, sh.ctx.Err()
return nil, nil, ctx.Err()
default:
}
@ -149,8 +147,8 @@ func (sh *srHelper) prepareBlockHashNumbers(curNumber uint64) []uint64 {
return res
}
func (sh *srHelper) doGetBlockHashesRequest(bns []uint64) ([]common.Hash, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(sh.ctx, 1*time.Second)
func (sh *srHelper) doGetBlockHashesRequest(ctx context.Context, bns []uint64) ([]common.Hash, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
hashes, stid, err := sh.syncProtocol.GetBlockHashes(ctx, bns)
@ -171,8 +169,8 @@ func (sh *srHelper) doGetBlockHashesRequest(bns []uint64) ([]common.Hash, sttype
return hashes, stid, nil
}
func (sh *srHelper) doGetBlocksByNumbersRequest(bns []uint64) ([]*types.Block, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(sh.ctx, 10*time.Second)
func (sh *srHelper) doGetBlocksByNumbersRequest(ctx context.Context, bns []uint64) ([]*types.Block, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
blocks, stid, err := sh.syncProtocol.GetBlocksByNumber(ctx, bns)
@ -186,7 +184,7 @@ func (sh *srHelper) doGetBlocksByNumbersRequest(bns []uint64) ([]*types.Block, s
}
func (sh *srHelper) doGetBlocksByHashesRequest(ctx context.Context, hashes []common.Hash, wl []sttypes.StreamID) ([]*types.Block, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(sh.ctx, 10*time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
blocks, stid, err := sh.syncProtocol.GetBlocksByHashes(ctx, hashes,

@ -16,21 +16,18 @@ type StageHandler interface {
// * invalidBlockRevert - whether the execution is to solve the invalid block
// * s - is the current state of the stage and contains stage data.
// * reverter - if the stage needs to cause reverting, `reverter` methods can be used.
Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error
Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error
// Revert is the reverting logic of the stage.
// * firstCycle - is it the first cycle of syncing.
// * u - contains information about the revert itself.
// * s - represents the state of this stage at the beginning of revert.
Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) error
Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) error
// CleanUp is the execution function for the stage to prune old data.
// * firstCycle - is it the first cycle of syncing.
// * p - is the current state of the stage and contains stage data.
CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) error
// SetStageContext updates the context for stage
SetStageContext(ctx context.Context)
CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error
}
// Stage is a single sync stage in staged sync.

@ -17,8 +17,8 @@ import (
type StageBodies struct {
configs StageBodiesCfg
}
type StageBodiesCfg struct {
ctx context.Context
bc core.BlockChain
db kv.RwDB
blockDBs []kv.RwDB
@ -34,9 +34,8 @@ func NewStageBodies(cfg StageBodiesCfg) *StageBodies {
}
}
func NewStageBodiesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageBodiesCfg {
func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageBodiesCfg {
return StageBodiesCfg{
ctx: ctx,
bc: bc,
db: db,
blockDBs: blockDBs,
@ -47,17 +46,13 @@ func NewStageBodiesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, bloc
}
}
func (b *StageBodies) SetStageContext(ctx context.Context) {
b.configs.ctx = ctx
}
// Exec progresses Bodies stage in the forward direction
func (b *StageBodies) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if invalidBlockRevert {
return b.redownloadBadBlock(s)
return b.redownloadBadBlock(ctx, s)
}
// for short range sync, skip this stage
@ -72,10 +67,8 @@ func (b *StageBodies) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
}
currProgress := uint64(0)
targetHeight := s.state.currentCycle.TargetHeight
// isBeacon := s.state.isBeacon
// isLastCycle := targetHeight >= maxHeight
if errV := CreateView(b.configs.ctx, b.configs.db, tx, func(etx kv.Tx) error {
if errV := CreateView(ctx, b.configs.db, tx, func(etx kv.Tx) error {
if currProgress, err = s.CurrentStageProgress(etx); err != nil {
return err
}
@ -85,7 +78,7 @@ func (b *StageBodies) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
}
if currProgress == 0 {
if err := b.cleanAllBlockDBs(); err != nil {
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}
currProgress = currentHead
@ -104,7 +97,7 @@ func (b *StageBodies) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(context.Background())
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -119,7 +112,7 @@ func (b *StageBodies) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
for i := 0; i != s.state.config.Concurrency; i++ {
wg.Add(1)
go b.runBlockWorkerLoop(s.state.gbm, &wg, i, startTime)
go b.runBlockWorkerLoop(ctx, s.state.gbm, &wg, i, startTime)
}
wg.Wait()
@ -134,7 +127,7 @@ func (b *StageBodies) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
}
// runBlockWorkerLoop creates a work loop for download blocks
func (b *StageBodies) runBlockWorkerLoop(gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time) {
func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time) {
currentBlock := int(b.configs.bc.CurrentBlock().NumberU64())
@ -142,21 +135,21 @@ func (b *StageBodies) runBlockWorkerLoop(gbm *blockDownloadManager, wg *sync.Wai
for {
select {
case <-b.configs.ctx.Done():
case <-ctx.Done():
return
default:
}
batch := gbm.GetNextBatch()
if len(batch) == 0 {
select {
case <-b.configs.ctx.Done():
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
return
}
}
blockBytes, sigBytes, stid, err := b.downloadRawBlocks(batch)
blockBytes, sigBytes, stid, err := b.downloadRawBlocks(ctx, batch)
if err != nil {
if !errors.Is(err, context.Canceled) {
b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed")
@ -176,7 +169,7 @@ func (b *StageBodies) runBlockWorkerLoop(gbm *blockDownloadManager, wg *sync.Wai
err := errors.New("downloadRawBlocks received empty blockBytes")
gbm.HandleRequestError(batch, err, stid)
} else {
if err = b.saveBlocks(gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
panic(ErrSaveBlocksToDbFailed)
}
gbm.HandleRequestResult(batch, blockBytes, sigBytes, loopID, stid)
@ -197,7 +190,7 @@ func (b *StageBodies) runBlockWorkerLoop(gbm *blockDownloadManager, wg *sync.Wai
}
// redownloadBadBlock tries to redownload the bad block from other streams
func (b *StageBodies) redownloadBadBlock(s *StageState) error {
func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) error {
batch := make([]uint64, 1)
batch = append(batch, s.state.invalidBlock.Number)
@ -206,7 +199,7 @@ func (b *StageBodies) redownloadBadBlock(s *StageState) error {
if b.configs.protocol.NumStreams() == 0 {
return errors.Errorf("re-download bad block from all streams failed")
}
blockBytes, sigBytes, stid, err := b.downloadRawBlocks(batch)
blockBytes, sigBytes, stid, err := b.downloadRawBlocks(ctx, batch)
if err != nil {
if !errors.Is(err, context.Canceled) {
b.configs.protocol.StreamFailed(stid, "tried to re-download bad block from this stream, but downloadRawBlocks failed")
@ -225,8 +218,8 @@ func (b *StageBodies) redownloadBadBlock(s *StageState) error {
continue
}
s.state.gbm.SetDownloadDetails(batch, 0, stid)
if errU := b.configs.blockDBs[0].Update(context.Background(), func(tx kv.RwTx) error {
if err = b.saveBlocks(tx, batch, blockBytes, sigBytes, 0, stid); err != nil {
if errU := b.configs.blockDBs[0].Update(ctx, func(tx kv.RwTx) error {
if err = b.saveBlocks(ctx, tx, batch, blockBytes, sigBytes, 0, stid); err != nil {
return errors.Errorf("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed.")
}
return nil
@ -238,8 +231,8 @@ func (b *StageBodies) redownloadBadBlock(s *StageState) error {
return nil
}
func (b *StageBodies) downloadBlocks(bns []uint64) ([]*types.Block, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(b.configs.ctx, 10*time.Second)
func (b *StageBodies) downloadBlocks(ctx context.Context, bns []uint64) ([]*types.Block, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
blocks, stid, err := b.configs.protocol.GetBlocksByNumber(ctx, bns)
@ -252,8 +245,8 @@ func (b *StageBodies) downloadBlocks(bns []uint64) ([]*types.Block, sttypes.Stre
return blocks, stid, nil
}
func (b *StageBodies) downloadRawBlocks(bns []uint64) ([][]byte, [][]byte, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(b.configs.ctx, 10*time.Second)
func (b *StageBodies) downloadRawBlocks(ctx context.Context, bns []uint64) ([][]byte, [][]byte, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return b.configs.protocol.GetRawBlocksByNumber(ctx, bns)
@ -272,9 +265,9 @@ func validateGetBlocksResult(requested []uint64, result []*types.Block) error {
}
// saveBlocks saves the blocks into db
func (b *StageBodies) saveBlocks(tx kv.RwTx, bns []uint64, blockBytes [][]byte, sigBytes [][]byte, loopID int, stid sttypes.StreamID) error {
func (b *StageBodies) saveBlocks(ctx context.Context, tx kv.RwTx, bns []uint64, blockBytes [][]byte, sigBytes [][]byte, loopID int, stid sttypes.StreamID) error {
tx, err := b.configs.blockDBs[loopID].BeginRw(context.Background())
tx, err := b.configs.blockDBs[loopID].BeginRw(ctx)
if err != nil {
return err
}
@ -313,11 +306,11 @@ func (b *StageBodies) saveBlocks(tx kv.RwTx, bns []uint64, blockBytes [][]byte,
return nil
}
func (b *StageBodies) saveProgress(s *StageState, progress uint64, tx kv.RwTx) (err error) {
func (b *StageBodies) saveProgress(ctx context.Context, s *StageState, progress uint64, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(context.Background())
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -340,9 +333,8 @@ func (b *StageBodies) saveProgress(s *StageState, progress uint64, tx kv.RwTx) (
return nil
}
func (b *StageBodies) cleanBlocksDB(loopID int) (err error) {
tx, errb := b.configs.blockDBs[loopID].BeginRw(b.configs.ctx)
func (b *StageBodies) cleanBlocksDB(ctx context.Context, loopID int) (err error) {
tx, errb := b.configs.blockDBs[loopID].BeginRw(ctx)
if errb != nil {
return errb
}
@ -370,26 +362,26 @@ func (b *StageBodies) cleanBlocksDB(loopID int) (err error) {
return nil
}
func (b *StageBodies) cleanAllBlockDBs() (err error) {
func (b *StageBodies) cleanAllBlockDBs(ctx context.Context) (err error) {
//clean all blocks DBs
for i := 0; i < b.configs.concurrency; i++ {
if err := b.cleanBlocksDB(i); err != nil {
if err := b.cleanBlocksDB(ctx, i); err != nil {
return err
}
}
return nil
}
func (b *StageBodies) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
func (b *StageBodies) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
//clean all blocks DBs
if err := b.cleanAllBlockDBs(); err != nil {
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}
useInternalTx := tx == nil
if useInternalTx {
tx, err = b.configs.db.BeginRw(b.configs.ctx)
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -416,10 +408,9 @@ func (b *StageBodies) Revert(firstCycle bool, u *RevertState, s *StageState, tx
return nil
}
func (b *StageBodies) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
func (b *StageBodies) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
//clean all blocks DBs
if err := b.cleanAllBlockDBs(); err != nil {
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}

@ -15,9 +15,8 @@ type StageEpoch struct {
}
type StageEpochCfg struct {
ctx context.Context
bc core.BlockChain
db kv.RwDB
bc core.BlockChain
db kv.RwDB
}
func NewStageEpoch(cfg StageEpochCfg) *StageEpoch {
@ -26,19 +25,14 @@ func NewStageEpoch(cfg StageEpochCfg) *StageEpoch {
}
}
func NewStageEpochCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageEpochCfg {
func NewStageEpochCfg(bc core.BlockChain, db kv.RwDB) StageEpochCfg {
return StageEpochCfg{
ctx: ctx,
bc: bc,
db: db,
bc: bc,
db: db,
}
}
func (sr *StageEpoch) SetStageContext(ctx context.Context) {
sr.configs.ctx = ctx
}
func (sr *StageEpoch) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
func (sr *StageEpoch) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
// no need to update epoch chain if we are redoing the stages because of bad block
if invalidBlockRevert {
@ -54,7 +48,7 @@ func (sr *StageEpoch) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
}
// doShortRangeSyncForEpochSync
n, err := sr.doShortRangeSyncForEpochSync(s)
n, err := sr.doShortRangeSyncForEpochSync(ctx, s)
s.state.inserted = n
if err != nil {
return err
@ -63,7 +57,7 @@ func (sr *StageEpoch) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = sr.configs.db.BeginRw(sr.configs.ctx)
tx, err = sr.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -79,17 +73,16 @@ func (sr *StageEpoch) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
return nil
}
func (sr *StageEpoch) doShortRangeSyncForEpochSync(s *StageState) (int, error) {
func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *StageState) (int, error) {
numShortRangeCounterVec.With(s.state.promLabels()).Inc()
srCtx, cancel := context.WithTimeout(s.state.ctx, ShortRangeTimeout)
ctx, cancel := context.WithTimeout(ctx, ShortRangeTimeout)
defer cancel()
//TODO: merge srHelper with StageEpochConfig
sh := &srHelper{
syncProtocol: s.state.protocol,
ctx: srCtx,
config: s.state.config,
logger: utils.Logger().With().Str("mode", "epoch chain short range").Logger(),
}
@ -116,7 +109,7 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(s *StageState) (int, error) {
}
////////////////////////////////////////////////////////
hashChain, whitelist, err := sh.getHashChain(bns)
hashChain, whitelist, err := sh.getHashChain(ctx, bns)
if err != nil {
return 0, errors.Wrap(err, "getHashChain")
}
@ -124,7 +117,7 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(s *StageState) (int, error) {
// short circuit for no sync is needed
return 0, nil
}
blocks, streamID, err := sh.getBlocksByHashes(hashChain, whitelist)
blocks, streamID, err := sh.getBlocksByHashes(ctx, hashChain, whitelist)
if err != nil {
utils.Logger().Warn().Err(err).Msg("epoch sync getBlocksByHashes failed")
if !errors.Is(err, context.Canceled) {
@ -157,10 +150,10 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(s *StageState) (int, error) {
return n, nil
}
func (sr *StageEpoch) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
func (sr *StageEpoch) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(context.Background())
tx, err = sr.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -179,10 +172,10 @@ func (sr *StageEpoch) Revert(firstCycle bool, u *RevertState, s *StageState, tx
return nil
}
func (sr *StageEpoch) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
func (sr *StageEpoch) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(context.Background())
tx, err = sr.configs.db.BeginRw(ctx)
if err != nil {
return err
}

@ -11,8 +11,7 @@ type StageFinish struct {
}
type StageFinishCfg struct {
ctx context.Context
db kv.RwDB
db kv.RwDB
}
func NewStageFinish(cfg StageFinishCfg) *StageFinish {
@ -21,22 +20,17 @@ func NewStageFinish(cfg StageFinishCfg) *StageFinish {
}
}
func NewStageFinishCfg(ctx context.Context, db kv.RwDB) StageFinishCfg {
func NewStageFinishCfg(db kv.RwDB) StageFinishCfg {
return StageFinishCfg{
ctx: ctx,
db: db,
db: db,
}
}
func (finish *StageFinish) SetStageContext(ctx context.Context) {
finish.configs.ctx = ctx
}
func (finish *StageFinish) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = finish.configs.db.BeginRw(context.Background())
tx, err = finish.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -54,11 +48,11 @@ func (finish *StageFinish) Exec(firstCycle bool, invalidBlockRevert bool, s *Sta
return nil
}
func (bh *StageFinish) clearBucket(tx kv.RwTx, isBeacon bool) error {
func (finish *StageFinish) clearBucket(ctx context.Context, tx kv.RwTx, isBeacon bool) error {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = bh.configs.db.BeginRw(context.Background())
tx, err = finish.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -73,10 +67,10 @@ func (bh *StageFinish) clearBucket(tx kv.RwTx, isBeacon bool) error {
return nil
}
func (finish *StageFinish) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
func (finish *StageFinish) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = finish.configs.db.BeginRw(finish.configs.ctx)
tx, err = finish.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -95,10 +89,10 @@ func (finish *StageFinish) Revert(firstCycle bool, u *RevertState, s *StageState
return nil
}
func (finish *StageFinish) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
func (finish *StageFinish) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = finish.configs.db.BeginRw(finish.configs.ctx)
tx, err = finish.configs.db.BeginRw(ctx)
if err != nil {
return err
}

@ -13,9 +13,8 @@ type StageHeads struct {
}
type StageHeadsCfg struct {
ctx context.Context
bc core.BlockChain
db kv.RwDB
bc core.BlockChain
db kv.RwDB
}
func NewStageHeads(cfg StageHeadsCfg) *StageHeads {
@ -24,20 +23,14 @@ func NewStageHeads(cfg StageHeadsCfg) *StageHeads {
}
}
func NewStageHeadersCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageHeadsCfg {
func NewStageHeadersCfg(bc core.BlockChain, db kv.RwDB) StageHeadsCfg {
return StageHeadsCfg{
ctx: ctx,
bc: bc,
db: db,
bc: bc,
db: db,
}
}
func (heads *StageHeads) SetStageContext(ctx context.Context) {
heads.configs.ctx = ctx
}
func (heads *StageHeads) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
// no need to update target if we are redoing the stages because of bad block
if invalidBlockRevert {
return nil
@ -51,7 +44,7 @@ func (heads *StageHeads) Exec(firstCycle bool, invalidBlockRevert bool, s *Stage
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = heads.configs.db.BeginRw(heads.configs.ctx)
tx, err = heads.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -63,7 +56,7 @@ func (heads *StageHeads) Exec(firstCycle bool, invalidBlockRevert bool, s *Stage
currentHeight := heads.configs.bc.CurrentBlock().NumberU64()
s.state.currentCycle.TargetHeight = maxHeight
targetHeight := uint64(0)
if errV := CreateView(heads.configs.ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) {
if errV := CreateView(ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) {
if targetHeight, err = s.CurrentStageProgress(etx); err != nil {
return err
}
@ -114,10 +107,10 @@ func (heads *StageHeads) Exec(firstCycle bool, invalidBlockRevert bool, s *Stage
return nil
}
func (heads *StageHeads) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
func (heads *StageHeads) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = heads.configs.db.BeginRw(context.Background())
tx, err = heads.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -136,10 +129,10 @@ func (heads *StageHeads) Revert(firstCycle bool, u *RevertState, s *StageState,
return nil
}
func (heads *StageHeads) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
func (heads *StageHeads) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = heads.configs.db.BeginRw(context.Background())
tx, err = heads.configs.db.BeginRw(ctx)
if err != nil {
return err
}

@ -16,9 +16,8 @@ type StageShortRange struct {
}
type StageShortRangeCfg struct {
ctx context.Context
bc core.BlockChain
db kv.RwDB
bc core.BlockChain
db kv.RwDB
}
func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
@ -27,20 +26,14 @@ func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
}
}
func NewStageShortRangeCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageShortRangeCfg {
func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB) StageShortRangeCfg {
return StageShortRangeCfg{
ctx: ctx,
bc: bc,
db: db,
bc: bc,
db: db,
}
}
func (sr *StageShortRange) SetStageContext(ctx context.Context) {
sr.configs.ctx = ctx
}
func (sr *StageShortRange) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error {
// no need to do short range if we are redoing the stages because of bad block
if invalidBlockRevert {
return nil
@ -56,7 +49,7 @@ func (sr *StageShortRange) Exec(firstCycle bool, invalidBlockRevert bool, s *Sta
}
// do short range sync
n, err := sr.doShortRangeSync(s)
n, err := sr.doShortRangeSync(ctx, s)
s.state.inserted = n
if err != nil {
return err
@ -65,7 +58,7 @@ func (sr *StageShortRange) Exec(firstCycle bool, invalidBlockRevert bool, s *Sta
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = sr.configs.db.BeginRw(sr.configs.ctx)
tx, err = sr.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -87,16 +80,13 @@ func (sr *StageShortRange) Exec(firstCycle bool, invalidBlockRevert bool, s *Sta
// 1. Obtain the block hashes and compute the longest hash chain..
// 2. Get blocks by hashes from computed hash chain.
// 3. Insert the blocks to blockchain.
func (sr *StageShortRange) doShortRangeSync(s *StageState) (int, error) {
func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) (int, error) {
numShortRangeCounterVec.With(s.state.promLabels()).Inc()
srCtx, cancel := context.WithTimeout(s.state.ctx, ShortRangeTimeout)
ctx, cancel := context.WithTimeout(ctx, ShortRangeTimeout)
defer cancel()
sh := &srHelper{
syncProtocol: s.state.protocol,
ctx: srCtx,
config: s.state.config,
logger: utils.Logger().With().Str("mode", "short range").Logger(),
}
@ -106,7 +96,7 @@ func (sr *StageShortRange) doShortRangeSync(s *StageState) (int, error) {
}
curBN := sr.configs.bc.CurrentBlock().NumberU64()
blkNums := sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err := sh.getHashChain(blkNums)
hashChain, whitelist, err := sh.getHashChain(ctx, blkNums)
if err != nil {
return 0, errors.Wrap(err, "getHashChain")
}
@ -130,7 +120,7 @@ func (sr *StageShortRange) doShortRangeSync(s *StageState) (int, error) {
s.state.status.finishSyncing()
}()
blocks, stids, err := sh.getBlocksByHashes(hashChain, whitelist)
blocks, stids, err := sh.getBlocksByHashes(ctx, hashChain, whitelist)
if err != nil {
utils.Logger().Warn().Err(err).Msg("getBlocksByHashes failed")
if !errors.Is(err, context.Canceled) {
@ -159,10 +149,10 @@ func (sr *StageShortRange) doShortRangeSync(s *StageState) (int, error) {
return n, nil
}
func (sr *StageShortRange) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
func (sr *StageShortRange) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(context.Background())
tx, err = sr.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -181,10 +171,10 @@ func (sr *StageShortRange) Revert(firstCycle bool, u *RevertState, s *StageState
return nil
}
func (sr *StageShortRange) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
func (sr *StageShortRange) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(context.Background())
tx, err = sr.configs.db.BeginRw(ctx)
if err != nil {
return err
}

@ -11,7 +11,6 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
@ -19,7 +18,6 @@ type StageStates struct {
configs StageStatesCfg
}
type StageStatesCfg struct {
ctx context.Context
bc core.BlockChain
db kv.RwDB
blockDBs []kv.RwDB
@ -34,7 +32,7 @@ func NewStageStates(cfg StageStatesCfg) *StageStates {
}
}
func NewStageStatesCfg(ctx context.Context,
func NewStageStatesCfg(
bc core.BlockChain,
db kv.RwDB,
blockDBs []kv.RwDB,
@ -43,7 +41,6 @@ func NewStageStatesCfg(ctx context.Context,
logProgress bool) StageStatesCfg {
return StageStatesCfg{
ctx: ctx,
bc: bc,
db: db,
blockDBs: blockDBs,
@ -53,13 +50,8 @@ func NewStageStatesCfg(ctx context.Context,
}
}
func (stg *StageStates) SetStageContext(ctx context.Context) {
stg.configs.ctx = ctx
}
// Exec progresses States stage in the forward direction
func (stg *StageStates) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// for short range sync, skip this step
if !s.state.initSync {
return nil
@ -78,7 +70,7 @@ func (stg *StageStates) Exec(firstCycle bool, invalidBlockRevert bool, s *StageS
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = stg.configs.db.BeginRw(stg.configs.ctx)
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -94,7 +86,7 @@ func (stg *StageStates) Exec(firstCycle bool, invalidBlockRevert bool, s *StageS
// prepare db transactions
txs := make([]kv.RwTx, stg.configs.concurrency)
for i := 0; i < stg.configs.concurrency; i++ {
txs[i], err = stg.configs.blockDBs[i].BeginRw(context.Background())
txs[i], err = stg.configs.blockDBs[i].BeginRw(ctx)
if err != nil {
return err
}
@ -219,19 +211,11 @@ func (stg *StageStates) Exec(firstCycle bool, invalidBlockRevert bool, s *StageS
return nil
}
func (stg *StageStates) insertChain(gbm *blockDownloadManager,
protocol syncProtocol,
lbls prometheus.Labels,
targetBN uint64) {
}
func (stg *StageStates) saveProgress(s *StageState, tx kv.RwTx) (err error) {
func (stg *StageStates) saveProgress(ctx context.Context, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = stg.configs.db.BeginRw(context.Background())
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -254,10 +238,10 @@ func (stg *StageStates) saveProgress(s *StageState, tx kv.RwTx) (err error) {
return nil
}
func (stg *StageStates) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
func (stg *StageStates) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = stg.configs.db.BeginRw(stg.configs.ctx)
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}
@ -276,10 +260,10 @@ func (stg *StageStates) Revert(firstCycle bool, u *RevertState, s *StageState, t
return nil
}
func (stg *StageStates) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
func (stg *StageStates) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = stg.configs.db.BeginRw(stg.configs.ctx)
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}

@ -54,7 +54,6 @@ func (ib *InvalidBlock) addBadStream(bsID sttypes.StreamID) {
}
type StagedStreamSync struct {
ctx context.Context
bc core.BlockChain
isBeacon bool
isExplorer bool
@ -101,7 +100,6 @@ type SyncCycle struct {
}
func (s *StagedStreamSync) Len() int { return len(s.stages) }
func (s *StagedStreamSync) Context() context.Context { return s.ctx }
func (s *StagedStreamSync) Blockchain() core.BlockChain { return s.bc }
func (s *StagedStreamSync) DB() kv.RwDB { return s.db }
func (s *StagedStreamSync) IsBeacon() bool { return s.isBeacon }
@ -118,11 +116,11 @@ func (s *StagedStreamSync) NewRevertState(id SyncStageID, revertPoint uint64) *R
return &RevertState{id, revertPoint, s}
}
func (s *StagedStreamSync) CleanUpStageState(id SyncStageID, forwardProgress uint64, tx kv.Tx, db kv.RwDB) (*CleanUpState, error) {
func (s *StagedStreamSync) CleanUpStageState(ctx context.Context, id SyncStageID, forwardProgress uint64, tx kv.Tx, db kv.RwDB) (*CleanUpState, error) {
var pruneProgress uint64
var err error
if errV := CreateView(context.Background(), db, tx, func(tx kv.Tx) error {
if errV := CreateView(ctx, db, tx, func(tx kv.Tx) error {
pruneProgress, err = GetStageCleanUpProgress(tx, id, s.isBeacon)
if err != nil {
return err
@ -215,10 +213,10 @@ func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error {
}
// StageState retrieves the latest stage state from db
func (s *StagedStreamSync) StageState(stage SyncStageID, tx kv.Tx, db kv.RwDB) (*StageState, error) {
func (s *StagedStreamSync) StageState(ctx context.Context, stage SyncStageID, tx kv.Tx, db kv.RwDB) (*StageState, error) {
var blockNum uint64
var err error
if errV := CreateView(context.Background(), db, tx, func(rtx kv.Tx) error {
if errV := CreateView(ctx, db, tx, func(rtx kv.Tx) error {
blockNum, err = GetStageProgress(rtx, stage, s.isBeacon)
if err != nil {
return err
@ -232,7 +230,7 @@ func (s *StagedStreamSync) StageState(stage SyncStageID, tx kv.Tx, db kv.RwDB) (
}
// cleanUp cleans up the stage by calling pruneStage
func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
func (s *StagedStreamSync) cleanUp(ctx context.Context, fromStage int, db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
found := false
for i := 0; i < len(s.pruningOrder); i++ {
if s.pruningOrder[i].ID == s.stages[fromStage].ID {
@ -241,7 +239,7 @@ func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstC
if !found || s.pruningOrder[i] == nil || s.pruningOrder[i].Disabled {
continue
}
if err := s.pruneStage(firstCycle, s.pruningOrder[i], db, tx); err != nil {
if err := s.pruneStage(ctx, firstCycle, s.pruningOrder[i], db, tx); err != nil {
panic(err)
}
}
@ -249,7 +247,7 @@ func (s *StagedStreamSync) cleanUp(fromStage int, db kv.RwDB, tx kv.RwTx, firstC
}
// New creates a new StagedStreamSync instance
func New(ctx context.Context,
func New(
bc core.BlockChain,
db kv.RwDB,
stagesList []*Stage,
@ -288,7 +286,6 @@ func New(ctx context.Context,
status := newStatus()
return &StagedStreamSync{
ctx: ctx,
bc: bc,
isBeacon: isBeacon,
db: db,
@ -309,8 +306,8 @@ func New(ctx context.Context,
}
// doGetCurrentNumberRequest returns estimated current block number and corresponding stream
func (s *StagedStreamSync) doGetCurrentNumberRequest() (uint64, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(s.ctx, 10*time.Second)
func (s *StagedStreamSync) doGetCurrentNumberRequest(ctx context.Context) (uint64, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
bn, stid, err := s.protocol.GetCurrentBlockNumber(ctx, syncproto.WithHighPriority())
@ -336,16 +333,8 @@ func (s *StagedStreamSync) checkHaveEnoughStreams() error {
return nil
}
// SetNewContext sets a new context for all stages
func (s *StagedStreamSync) SetNewContext(ctx context.Context) error {
for _, s := range s.stages {
s.Handler.SetStageContext(ctx)
}
return nil
}
// Run runs a full cycle of stages
func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
s.prevRevertPoint = nil
s.timings = s.timings[:0]
@ -358,7 +347,7 @@ func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
if s.revertOrder[j] == nil || s.revertOrder[j].Disabled {
continue
}
if err := s.revertStage(firstCycle, s.revertOrder[j], db, tx); err != nil {
if err := s.revertStage(ctx, firstCycle, s.revertOrder[j], db, tx); err != nil {
utils.Logger().Error().
Err(err).
Interface("stage id", s.revertOrder[j].ID).
@ -383,7 +372,7 @@ func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
continue
}
if err := s.runStage(stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil {
if err := s.runStage(ctx, stage, db, tx, firstCycle, s.invalidBlock.Active); err != nil {
utils.Logger().Error().
Err(err).
Interface("stage id", stage.ID).
@ -393,7 +382,7 @@ func (s *StagedStreamSync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
s.NextStage()
}
if err := s.cleanUp(0, db, tx, firstCycle); err != nil {
if err := s.cleanUp(ctx, 0, db, tx, firstCycle); err != nil {
utils.Logger().Error().
Err(err).
Msgf(WrapStagedSyncMsg("stages cleanup failed"))
@ -414,7 +403,7 @@ func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) erro
if tx != nil {
return f(tx)
}
return db.View(context.Background(), func(etx kv.Tx) error {
return db.View(ctx, func(etx kv.Tx) error {
return f(etx)
})
}
@ -466,14 +455,14 @@ func printLogs(tx kv.RwTx, timings []Timing) error {
}
// runStage executes stage
func (s *StagedStreamSync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstCycle bool, invalidBlockRevert bool) (err error) {
func (s *StagedStreamSync) runStage(ctx context.Context, stage *Stage, db kv.RwDB, tx kv.RwTx, firstCycle bool, invalidBlockRevert bool) (err error) {
start := time.Now()
stageState, err := s.StageState(stage.ID, tx, db)
stageState, err := s.StageState(ctx, stage.ID, tx, db)
if err != nil {
return err
}
if err = stage.Handler.Exec(firstCycle, invalidBlockRevert, stageState, s, tx); err != nil {
if err = stage.Handler.Exec(ctx, firstCycle, invalidBlockRevert, stageState, s, tx); err != nil {
utils.Logger().Error().
Err(err).
Interface("stage id", stage.ID).
@ -493,9 +482,9 @@ func (s *StagedStreamSync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstC
}
// revertStage reverts stage
func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error {
func (s *StagedStreamSync) revertStage(ctx context.Context, firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error {
start := time.Now()
stageState, err := s.StageState(stage.ID, tx, db)
stageState, err := s.StageState(ctx, stage.ID, tx, db)
if err != nil {
return err
}
@ -510,7 +499,7 @@ func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB
return err
}
err = stage.Handler.Revert(firstCycle, revert, stageState, tx)
err = stage.Handler.Revert(ctx, firstCycle, revert, stageState, tx)
if err != nil {
return fmt.Errorf("[%s] %w", s.LogPrefix(), err)
}
@ -526,15 +515,15 @@ func (s *StagedStreamSync) revertStage(firstCycle bool, stage *Stage, db kv.RwDB
}
// pruneStage cleans up the stage and logs the timing
func (s *StagedStreamSync) pruneStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error {
func (s *StagedStreamSync) pruneStage(ctx context.Context, firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx) error {
start := time.Now()
stageState, err := s.StageState(stage.ID, tx, db)
stageState, err := s.StageState(ctx, stage.ID, tx, db)
if err != nil {
return err
}
prune, err := s.CleanUpStageState(stage.ID, stageState.BlockNumber, tx, db)
prune, err := s.CleanUpStageState(ctx, stage.ID, stageState.BlockNumber, tx, db)
if err != nil {
return err
}
@ -542,7 +531,7 @@ func (s *StagedStreamSync) pruneStage(firstCycle bool, stage *Stage, db kv.RwDB,
return err
}
err = stage.Handler.CleanUp(firstCycle, prune, tx)
err = stage.Handler.CleanUp(ctx, firstCycle, prune, tx)
if err != nil {
return fmt.Errorf("[%s] %w", s.LogPrefix(), err)
}

@ -68,12 +68,12 @@ func CreateStagedSync(ctx context.Context,
return nil, errInitDB
}
stageHeadsCfg := NewStageHeadersCfg(ctx, bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(ctx, bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(ctx, bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(ctx, bc, mainDB, dbs, config.Concurrency, protocol, isBeacon, logProgress)
stageStatesCfg := NewStageStatesCfg(ctx, bc, mainDB, dbs, config.Concurrency, logger, logProgress)
stageFinishCfg := NewStageFinishCfg(ctx, mainDB)
stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeacon, logProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, logProgress)
stageFinishCfg := NewStageFinishCfg(mainDB)
stages := DefaultStages(ctx,
stageHeadsCfg,
@ -84,7 +84,7 @@ func CreateStagedSync(ctx context.Context,
stageFinishCfg,
)
return New(ctx,
return New(
bc,
mainDB,
stages,
@ -178,7 +178,7 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
var estimatedHeight uint64
if initSync {
if h, err := s.estimateCurrentNumber(); err != nil {
if h, err := s.estimateCurrentNumber(downloaderContext); err != nil {
return 0, err
} else {
estimatedHeight = h
@ -197,8 +197,6 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
for {
ctx, cancel := context.WithCancel(downloaderContext)
s.ctx = ctx
s.SetNewContext(ctx)
n, err := s.doSyncCycle(ctx, initSync)
if err != nil {
@ -234,7 +232,7 @@ func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int,
var tx kv.RwTx
if canRunCycleInOneTransaction {
var err error
if tx, err = s.DB().BeginRw(context.Background()); err != nil {
if tx, err = s.DB().BeginRw(ctx); err != nil {
return totalInserted, err
}
defer tx.Rollback()
@ -244,7 +242,7 @@ func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int,
// Do one cycle of staged sync
initialCycle := s.currentCycle.Number == 0
if err := s.Run(s.DB(), tx, initialCycle); err != nil {
if err := s.Run(ctx, s.DB(), tx, initialCycle); err != nil {
utils.Logger().Error().
Err(err).
Bool("isBeacon", s.isBeacon).
@ -294,7 +292,7 @@ func (s *StagedStreamSync) checkPrerequisites() error {
// estimateCurrentNumber roughly estimates the current block number.
// The block number does not need to be exact, but just a temporary target of the iteration
func (s *StagedStreamSync) estimateCurrentNumber() (uint64, error) {
func (s *StagedStreamSync) estimateCurrentNumber(ctx context.Context) (uint64, error) {
var (
cnResults = make(map[sttypes.StreamID]uint64)
lock sync.Mutex
@ -304,7 +302,7 @@ func (s *StagedStreamSync) estimateCurrentNumber() (uint64, error) {
for i := 0; i != s.config.Concurrency; i++ {
go func() {
defer wg.Done()
bn, stid, err := s.doGetCurrentNumberRequest()
bn, stid, err := s.doGetCurrentNumberRequest(ctx)
if err != nil {
s.logger.Err(err).Str("streamID", string(stid)).
Msg(WrapStagedSyncMsg("getCurrentNumber request failed"))
@ -322,8 +320,8 @@ func (s *StagedStreamSync) estimateCurrentNumber() (uint64, error) {
if len(cnResults) == 0 {
select {
case <-s.ctx.Done():
return 0, s.ctx.Err()
case <-ctx.Done():
return 0, ctx.Err()
default:
}
return 0, ErrZeroBlockResponse

@ -682,7 +682,7 @@ func VerifyNewBlock(hooks *webhooks.Hooks, blockChain core.BlockChain, beaconCha
Int("numTx", len(newBlock.Transactions())).
Int("numStakingTx", len(newBlock.StakingTransactions())).
Err(err).
Msg("[VerifyNewBlock] Cannot Verify New Block!!!")
Msgf("[VerifyNewBlock] Cannot Verify New Block!!!, blockHeight %d, myHeight %d", newBlock.NumberU64(), blockChain.CurrentHeader().NumberU64())
return errors.Errorf(
"[VerifyNewBlock] Cannot Verify New Block!!! block-hash %s txn-count %d",
newBlock.Hash().Hex(),

Loading…
Cancel
Save