|
|
|
@ -149,9 +149,8 @@ func (lsi *lrSyncIter) fetchAndInsertBlocks(targetBN uint64) error { |
|
|
|
|
worker := &getBlocksWorker{ |
|
|
|
|
gbm: gbm, |
|
|
|
|
protocol: lsi.p, |
|
|
|
|
ctx: lsi.ctx, |
|
|
|
|
} |
|
|
|
|
go worker.workLoop() |
|
|
|
|
go worker.workLoop(lsi.ctx) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// insert the blocks to chain. Return when the target block number is reached.
|
|
|
|
@ -243,28 +242,26 @@ func (lsi *lrSyncIter) checkHaveEnoughStreams() error { |
|
|
|
|
type getBlocksWorker struct { |
|
|
|
|
gbm *getBlocksManager |
|
|
|
|
protocol syncProtocol |
|
|
|
|
|
|
|
|
|
ctx context.Context |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *getBlocksWorker) workLoop() { |
|
|
|
|
func (w *getBlocksWorker) workLoop(ctx context.Context) { |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-w.ctx.Done(): |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
return |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
batch := w.gbm.GetNextBatch() |
|
|
|
|
if len(batch) == 0 { |
|
|
|
|
select { |
|
|
|
|
case <-w.ctx.Done(): |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
return |
|
|
|
|
case <-time.After(100 * time.Millisecond): |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blocks, stid, err := w.doBatch(batch) |
|
|
|
|
blocks, stid, err := w.doBatch(ctx, batch) |
|
|
|
|
if err != nil { |
|
|
|
|
if !errors.Is(err, context.Canceled) { |
|
|
|
|
w.protocol.RemoveStream(stid) |
|
|
|
@ -277,8 +274,8 @@ func (w *getBlocksWorker) workLoop() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *getBlocksWorker) doBatch(bns []uint64) ([]*types.Block, sttypes.StreamID, error) { |
|
|
|
|
ctx, cancel := context.WithTimeout(w.ctx, 10*time.Second) |
|
|
|
|
func (w *getBlocksWorker) doBatch(ctx context.Context, bns []uint64) ([]*types.Block, sttypes.StreamID, error) { |
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |
|
|
|
|
defer cancel() |
|
|
|
|
|
|
|
|
|
blocks, stid, err := w.protocol.GetBlocksByNumber(ctx, bns) |
|
|
|
|