revert stream stages context to background context

fix/streamsync_context
“GheisMohammadi” 1 year ago
parent e4088f9e83
commit ba2e482f7a
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 8
      api/service/stagedstreamsync/stage_bodies.go
  2. 4
      api/service/stagedstreamsync/stage_epoch.go
  3. 4
      api/service/stagedstreamsync/stage_finish.go
  4. 4
      api/service/stagedstreamsync/stage_heads.go
  5. 4
      api/service/stagedstreamsync/stage_short_range.go
  6. 4
      api/service/stagedstreamsync/stage_state.go
  7. 6
      api/service/stagedstreamsync/staged_stream_sync.go
  8. 2
      api/service/stagedstreamsync/syncing.go

@ -97,7 +97,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
tx, err = b.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
@ -218,7 +218,7 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err
continue
}
s.state.gbm.SetDownloadDetails(batch, 0, stid)
if errU := b.configs.blockDBs[0].Update(ctx, func(tx kv.RwTx) error {
if errU := b.configs.blockDBs[0].Update(context.Background(), func(tx kv.RwTx) error {
if err = b.saveBlocks(ctx, tx, batch, blockBytes, sigBytes, 0, stid); err != nil {
return errors.Errorf("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed.")
}
@ -267,7 +267,7 @@ func validateGetBlocksResult(requested []uint64, result []*types.Block) error {
// saveBlocks saves the blocks into db
func (b *StageBodies) saveBlocks(ctx context.Context, tx kv.RwTx, bns []uint64, blockBytes [][]byte, sigBytes [][]byte, loopID int, stid sttypes.StreamID) error {
tx, err := b.configs.blockDBs[loopID].BeginRw(ctx)
tx, err := b.configs.blockDBs[loopID].BeginRw(context.Background())
if err != nil {
return err
}
@ -310,7 +310,7 @@ func (b *StageBodies) saveProgress(ctx context.Context, s *StageState, progress
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
tx, err = b.configs.db.BeginRw(context.Background())
if err != nil {
return err
}

@ -153,7 +153,7 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage
func (sr *StageEpoch) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(ctx)
tx, err = sr.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
@ -175,7 +175,7 @@ func (sr *StageEpoch) Revert(ctx context.Context, firstCycle bool, u *RevertStat
func (sr *StageEpoch) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(ctx)
tx, err = sr.configs.db.BeginRw(context.Background())
if err != nil {
return err
}

@ -30,7 +30,7 @@ func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlo
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = finish.configs.db.BeginRw(ctx)
tx, err = finish.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
@ -52,7 +52,7 @@ func (finish *StageFinish) clearBucket(ctx context.Context, tx kv.RwTx, isBeacon
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = finish.configs.db.BeginRw(ctx)
tx, err = finish.configs.db.BeginRw(context.Background())
if err != nil {
return err
}

@ -110,7 +110,7 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock
func (heads *StageHeads) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = heads.configs.db.BeginRw(ctx)
tx, err = heads.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
@ -132,7 +132,7 @@ func (heads *StageHeads) Revert(ctx context.Context, firstCycle bool, u *RevertS
func (heads *StageHeads) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = heads.configs.db.BeginRw(ctx)
tx, err = heads.configs.db.BeginRw(context.Background())
if err != nil {
return err
}

@ -152,7 +152,7 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
func (sr *StageShortRange) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(ctx)
tx, err = sr.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
@ -174,7 +174,7 @@ func (sr *StageShortRange) Revert(ctx context.Context, firstCycle bool, u *Rever
func (sr *StageShortRange) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = sr.configs.db.BeginRw(ctx)
tx, err = sr.configs.db.BeginRw(context.Background())
if err != nil {
return err
}

@ -70,7 +70,7 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = stg.configs.db.BeginRw(ctx)
tx, err = stg.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
@ -215,7 +215,7 @@ func (stg *StageStates) saveProgress(ctx context.Context, s *StageState, tx kv.R
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = stg.configs.db.BeginRw(ctx)
tx, err = stg.configs.db.BeginRw(context.Background())
if err != nil {
return err
}

@ -120,7 +120,7 @@ func (s *StagedStreamSync) CleanUpStageState(ctx context.Context, id SyncStageID
var pruneProgress uint64
var err error
if errV := CreateView(ctx, db, tx, func(tx kv.Tx) error {
if errV := CreateView(context.Background(), db, tx, func(tx kv.Tx) error {
pruneProgress, err = GetStageCleanUpProgress(tx, id, s.isBeacon)
if err != nil {
return err
@ -216,7 +216,7 @@ func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error {
func (s *StagedStreamSync) StageState(ctx context.Context, stage SyncStageID, tx kv.Tx, db kv.RwDB) (*StageState, error) {
var blockNum uint64
var err error
if errV := CreateView(ctx, db, tx, func(rtx kv.Tx) error {
if errV := CreateView(context.Background(), db, tx, func(rtx kv.Tx) error {
blockNum, err = GetStageProgress(rtx, stage, s.isBeacon)
if err != nil {
return err
@ -403,7 +403,7 @@ func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) erro
if tx != nil {
return f(tx)
}
return db.View(ctx, func(etx kv.Tx) error {
return db.View(context.Background(), func(etx kv.Tx) error {
return f(etx)
})
}

@ -232,7 +232,7 @@ func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int,
var tx kv.RwTx
if canRunCycleInOneTransaction {
var err error
if tx, err = s.DB().BeginRw(ctx); err != nil {
if tx, err = s.DB().BeginRw(context.Background()); err != nil {
return totalInserted, err
}
defer tx.Rollback()

Loading…
Cancel
Save