You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
513 lines
12 KiB
513 lines
12 KiB
package downloader
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/harmony-one/harmony/core/types"
|
|
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
|
|
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
|
|
"github.com/pkg/errors"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
// doLongRangeSync does the long range sync.
|
|
// One LongRangeSync consists of several iterations.
|
|
// For each iteration, estimate the current block number, then fetch block & insert to blockchain
|
|
func (d *Downloader) doLongRangeSync() (int, error) {
|
|
var totalInserted int
|
|
|
|
d.startSyncing()
|
|
defer d.finishSyncing()
|
|
|
|
for {
|
|
ctx, cancel := context.WithCancel(d.ctx)
|
|
|
|
iter := &lrSyncIter{
|
|
bc: d.bc,
|
|
p: d.syncProtocol,
|
|
ih: d.ih,
|
|
d: d,
|
|
ctx: ctx,
|
|
config: d.config,
|
|
logger: d.logger.With().Str("mode", "long range").Logger(),
|
|
}
|
|
if err := iter.doLongRangeSync(); err != nil {
|
|
cancel()
|
|
return totalInserted + iter.inserted, err
|
|
}
|
|
cancel()
|
|
|
|
totalInserted += iter.inserted
|
|
|
|
if iter.inserted < lastMileThres {
|
|
return totalInserted, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// lrSyncIter run a single iteration of a full long range sync.
|
|
// First get a rough estimate of the current block height, and then sync to this
|
|
// block number
|
|
type lrSyncIter struct {
|
|
bc blockChain
|
|
p syncProtocol
|
|
ih insertHelper
|
|
d *Downloader
|
|
|
|
gbm *getBlocksManager // initialized when finished get block number
|
|
inserted int
|
|
|
|
config Config
|
|
ctx context.Context
|
|
logger zerolog.Logger
|
|
}
|
|
|
|
func (lsi *lrSyncIter) doLongRangeSync() error {
|
|
if err := lsi.checkPrerequisites(); err != nil {
|
|
return err
|
|
}
|
|
bn, err := lsi.estimateCurrentNumber()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lsi.logger.Info().Uint64("target number", bn).Msg("estimated remote current number")
|
|
lsi.d.status.setTargetBN(bn)
|
|
|
|
return lsi.fetchAndInsertBlocks(bn)
|
|
}
|
|
|
|
func (lsi *lrSyncIter) checkPrerequisites() error {
|
|
return lsi.checkHaveEnoughStreams()
|
|
}
|
|
|
|
// estimateCurrentNumber roughly estimate the current block number.
|
|
// The block number does not need to be exact, but just a temporary target of the iteration
|
|
func (lsi *lrSyncIter) estimateCurrentNumber() (uint64, error) {
|
|
var (
|
|
cnResults = make(map[sttypes.StreamID]uint64)
|
|
lock sync.Mutex
|
|
wg sync.WaitGroup
|
|
)
|
|
wg.Add(lsi.config.Concurrency)
|
|
for i := 0; i != lsi.config.Concurrency; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
bn, stid, err := lsi.doGetCurrentNumberRequest()
|
|
if err != nil {
|
|
lsi.logger.Err(err).Str("streamID", string(stid)).
|
|
Msg("getCurrentNumber request failed. Removing stream")
|
|
if !errors.Is(err, context.Canceled) {
|
|
lsi.p.RemoveStream(stid)
|
|
}
|
|
return
|
|
}
|
|
lock.Lock()
|
|
cnResults[stid] = bn
|
|
lock.Unlock()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
if len(cnResults) == 0 {
|
|
select {
|
|
case <-lsi.ctx.Done():
|
|
return 0, lsi.ctx.Err()
|
|
default:
|
|
}
|
|
return 0, errors.New("zero block number response from remote nodes")
|
|
}
|
|
bn := computeBNMaxVote(cnResults)
|
|
return bn, nil
|
|
}
|
|
|
|
func (lsi *lrSyncIter) doGetCurrentNumberRequest() (uint64, sttypes.StreamID, error) {
|
|
ctx, cancel := context.WithTimeout(lsi.ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
bn, stid, err := lsi.p.GetCurrentBlockNumber(ctx, syncproto.WithHighPriority())
|
|
if err != nil {
|
|
return 0, stid, err
|
|
}
|
|
return bn, stid, nil
|
|
}
|
|
|
|
// fetchAndInsertBlocks use the pipeline pattern to boost the performance of inserting blocks.
|
|
// TODO: For resharding, use the pipeline to do fast sync (epoch loop, header loop, body loop)
|
|
func (lsi *lrSyncIter) fetchAndInsertBlocks(targetBN uint64) error {
|
|
gbm := newGetBlocksManager(lsi.bc, targetBN, lsi.logger)
|
|
lsi.gbm = gbm
|
|
|
|
// Setup workers to fetch blocks from remote node
|
|
for i := 0; i != lsi.config.Concurrency; i++ {
|
|
worker := &getBlocksWorker{
|
|
gbm: gbm,
|
|
protocol: lsi.p,
|
|
ctx: lsi.ctx,
|
|
}
|
|
go worker.workLoop()
|
|
}
|
|
|
|
// insert the blocks to chain. Return when the target block number is reached.
|
|
lsi.insertChainLoop(targetBN)
|
|
|
|
select {
|
|
case <-lsi.ctx.Done():
|
|
return lsi.ctx.Err()
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (lsi *lrSyncIter) insertChainLoop(targetBN uint64) {
|
|
var (
|
|
gbm = lsi.gbm
|
|
t = time.NewTicker(100 * time.Millisecond)
|
|
resultC = make(chan struct{}, 1)
|
|
)
|
|
|
|
trigger := func() {
|
|
select {
|
|
case resultC <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-lsi.ctx.Done():
|
|
return
|
|
|
|
case <-t.C:
|
|
// Redundancy, periodically check whether there is blocks that can be processed
|
|
trigger()
|
|
|
|
case <-gbm.resultC:
|
|
// New block arrive in resultQueue
|
|
trigger()
|
|
|
|
case <-resultC:
|
|
blockResults := gbm.PullContinuousBlocks(blocksPerInsert)
|
|
if len(blockResults) > 0 {
|
|
lsi.processBlocks(blockResults, targetBN)
|
|
// more blocks is expected being able to be pulled from queue
|
|
trigger()
|
|
}
|
|
if lsi.bc.CurrentBlock().NumberU64() >= targetBN {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (lsi *lrSyncIter) processBlocks(results []*blockResult, targetBN uint64) {
|
|
blocks := blockResultsToBlocks(results)
|
|
|
|
for i, block := range blocks {
|
|
if err := lsi.ih.verifyAndInsertBlock(block); err != nil {
|
|
lsi.logger.Warn().Err(err).Uint64("target block", targetBN).
|
|
Uint64("block number", block.NumberU64()).
|
|
Msg("insert blocks failed in long range")
|
|
|
|
lsi.p.RemoveStream(results[i].stid)
|
|
lsi.gbm.HandleInsertError(results, i)
|
|
return
|
|
}
|
|
|
|
lsi.inserted++
|
|
}
|
|
lsi.gbm.HandleInsertResult(results)
|
|
}
|
|
|
|
func (lsi *lrSyncIter) checkHaveEnoughStreams() error {
|
|
numStreams := lsi.p.NumStreams()
|
|
if numStreams < lsi.config.MinStreams {
|
|
return fmt.Errorf("number of streams smaller than minimum: %v < %v",
|
|
numStreams, lsi.config.MinStreams)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getBlocksWorker does the request job
|
|
type getBlocksWorker struct {
|
|
gbm *getBlocksManager
|
|
protocol syncProtocol
|
|
|
|
ctx context.Context
|
|
}
|
|
|
|
func (w *getBlocksWorker) workLoop() {
|
|
for {
|
|
select {
|
|
case <-w.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
batch := w.gbm.GetNextBatch()
|
|
if len(batch) == 0 {
|
|
select {
|
|
case <-w.ctx.Done():
|
|
return
|
|
case <-time.After(100 * time.Millisecond):
|
|
continue
|
|
}
|
|
}
|
|
|
|
blocks, stid, err := w.doBatch(batch)
|
|
if err != nil {
|
|
if !errors.Is(err, context.Canceled) {
|
|
w.protocol.RemoveStream(stid)
|
|
}
|
|
err = errors.Wrap(err, "request error")
|
|
w.gbm.HandleRequestError(batch, err, stid)
|
|
} else {
|
|
w.gbm.HandleRequestResult(batch, blocks, stid)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *getBlocksWorker) doBatch(bns []uint64) ([]*types.Block, sttypes.StreamID, error) {
|
|
ctx, cancel := context.WithTimeout(w.ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
blocks, stid, err := w.protocol.GetBlocksByNumber(ctx, bns)
|
|
if err != nil {
|
|
return nil, stid, err
|
|
}
|
|
if err := validateGetBlocksResult(bns, blocks); err != nil {
|
|
return nil, stid, err
|
|
}
|
|
return blocks, stid, nil
|
|
}
|
|
|
|
// getBlocksManager is the helper structure for get blocks request management
|
|
type getBlocksManager struct {
|
|
chain blockChain
|
|
|
|
targetBN uint64
|
|
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
|
|
processing map[uint64]struct{} // block numbers received requests but not inserted
|
|
retries *prioritizedNumbers // requests where error happens
|
|
rq *resultQueue // result queue wait to be inserted into blockchain
|
|
|
|
resultC chan struct{}
|
|
logger zerolog.Logger
|
|
lock sync.Mutex
|
|
}
|
|
|
|
func newGetBlocksManager(chain blockChain, targetBN uint64, logger zerolog.Logger) *getBlocksManager {
|
|
return &getBlocksManager{
|
|
chain: chain,
|
|
targetBN: targetBN,
|
|
requesting: make(map[uint64]struct{}),
|
|
processing: make(map[uint64]struct{}),
|
|
retries: newPrioritizedNumbers(),
|
|
rq: newResultQueue(),
|
|
resultC: make(chan struct{}, 1),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// GetNextBatch get the next block numbers batch
|
|
func (gbm *getBlocksManager) GetNextBatch() []uint64 {
|
|
gbm.lock.Lock()
|
|
defer gbm.lock.Unlock()
|
|
|
|
cap := numBlocksByNumPerRequest
|
|
|
|
bns := gbm.getBatchFromRetries(cap)
|
|
cap -= len(bns)
|
|
gbm.addBatchToRequesting(bns)
|
|
|
|
if gbm.availableForMoreTasks() {
|
|
addBNs := gbm.getBatchFromUnprocessed(cap)
|
|
gbm.addBatchToRequesting(addBNs)
|
|
bns = append(bns, addBNs...)
|
|
}
|
|
|
|
return bns
|
|
}
|
|
|
|
// HandleRequestError handles the error result
|
|
func (gbm *getBlocksManager) HandleRequestError(bns []uint64, err error, stid sttypes.StreamID) {
|
|
gbm.lock.Lock()
|
|
defer gbm.lock.Unlock()
|
|
|
|
gbm.logger.Warn().Err(err).Str("stream", string(stid)).Msg("get blocks error")
|
|
|
|
// add requested block numbers to retries
|
|
for _, bn := range bns {
|
|
delete(gbm.requesting, bn)
|
|
gbm.retries.push(bn)
|
|
}
|
|
|
|
// remove results from result queue by the stream and add back to retries
|
|
removed := gbm.rq.removeResultsByStreamID(stid)
|
|
for _, bn := range removed {
|
|
delete(gbm.processing, bn)
|
|
gbm.retries.push(bn)
|
|
}
|
|
}
|
|
|
|
// HandleRequestResult handles get blocks result
|
|
func (gbm *getBlocksManager) HandleRequestResult(bns []uint64, blocks []*types.Block, stid sttypes.StreamID) {
|
|
gbm.lock.Lock()
|
|
defer gbm.lock.Unlock()
|
|
|
|
for i, bn := range bns {
|
|
delete(gbm.requesting, bn)
|
|
if blocks[i] == nil {
|
|
gbm.retries.push(bn)
|
|
} else {
|
|
gbm.processing[bn] = struct{}{}
|
|
}
|
|
}
|
|
gbm.rq.addBlockResults(blocks, stid)
|
|
select {
|
|
case gbm.resultC <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// HandleInsertResult handle the insert result
|
|
func (gbm *getBlocksManager) HandleInsertResult(inserted []*blockResult) {
|
|
gbm.lock.Lock()
|
|
defer gbm.lock.Unlock()
|
|
|
|
for _, block := range inserted {
|
|
delete(gbm.processing, block.getBlockNumber())
|
|
}
|
|
}
|
|
|
|
// HandleInsertError handles the error during InsertChain
|
|
func (gbm *getBlocksManager) HandleInsertError(results []*blockResult, n int) {
|
|
gbm.lock.Lock()
|
|
defer gbm.lock.Unlock()
|
|
|
|
var (
|
|
inserted []*blockResult
|
|
errResult *blockResult
|
|
abandoned []*blockResult
|
|
)
|
|
inserted = results[:n]
|
|
errResult = results[n]
|
|
if n != len(results) {
|
|
abandoned = results[n+1:]
|
|
}
|
|
|
|
for _, res := range inserted {
|
|
delete(gbm.processing, res.getBlockNumber())
|
|
}
|
|
for _, res := range abandoned {
|
|
gbm.rq.addBlockResults([]*types.Block{res.block}, res.stid)
|
|
}
|
|
|
|
delete(gbm.processing, errResult.getBlockNumber())
|
|
gbm.retries.push(errResult.getBlockNumber())
|
|
|
|
removed := gbm.rq.removeResultsByStreamID(errResult.stid)
|
|
for _, bn := range removed {
|
|
delete(gbm.processing, bn)
|
|
gbm.retries.push(bn)
|
|
}
|
|
}
|
|
|
|
// PullContinuousBlocks pull continuous blocks from request queue
|
|
func (gbm *getBlocksManager) PullContinuousBlocks(cap int) []*blockResult {
|
|
gbm.lock.Lock()
|
|
defer gbm.lock.Unlock()
|
|
|
|
expHeight := gbm.chain.CurrentBlock().NumberU64() + 1
|
|
results, stales := gbm.rq.popBlockResults(expHeight, cap)
|
|
// For stale blocks, we remove them from processing
|
|
for _, bn := range stales {
|
|
delete(gbm.processing, bn)
|
|
}
|
|
return results
|
|
}
|
|
|
|
func (gbm *getBlocksManager) getBatchFromRetries(cap int) []uint64 {
|
|
var (
|
|
requestBNs []uint64
|
|
curHeight = gbm.chain.CurrentBlock().NumberU64()
|
|
)
|
|
for cnt := 0; cnt < cap; cnt++ {
|
|
bn := gbm.retries.pop()
|
|
if bn == 0 {
|
|
break // no more retries
|
|
}
|
|
if bn <= curHeight {
|
|
continue
|
|
}
|
|
requestBNs = append(requestBNs, bn)
|
|
}
|
|
return requestBNs
|
|
}
|
|
|
|
func (gbm *getBlocksManager) getBatchFromUnprocessed(cap int) []uint64 {
|
|
var (
|
|
requestBNs []uint64
|
|
curHeight = gbm.chain.CurrentBlock().NumberU64()
|
|
)
|
|
bn := curHeight + 1
|
|
// TODO: this algorithm can be potentially optimized.
|
|
for cnt := 0; cnt < cap && bn <= gbm.targetBN; cnt++ {
|
|
for bn <= gbm.targetBN {
|
|
_, ok1 := gbm.requesting[bn]
|
|
_, ok2 := gbm.processing[bn]
|
|
if !ok1 && !ok2 {
|
|
requestBNs = append(requestBNs, bn)
|
|
bn++
|
|
break
|
|
}
|
|
bn++
|
|
}
|
|
}
|
|
return requestBNs
|
|
}
|
|
|
|
func (gbm *getBlocksManager) availableForMoreTasks() bool {
|
|
return gbm.rq.results.Len() < softQueueCap
|
|
}
|
|
|
|
func (gbm *getBlocksManager) addBatchToRequesting(bns []uint64) {
|
|
for _, bn := range bns {
|
|
gbm.requesting[bn] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func validateGetBlocksResult(requested []uint64, result []*types.Block) error {
|
|
if len(result) != len(requested) {
|
|
return fmt.Errorf("unexpected number of blocks delivered: %v / %v", len(result), len(requested))
|
|
}
|
|
for i, block := range result {
|
|
if block != nil && block.NumberU64() != requested[i] {
|
|
return fmt.Errorf("block with unexpected number delivered: %v / %v", block.NumberU64(), requested[i])
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func computeBNMaxVote(votes map[sttypes.StreamID]uint64) uint64 {
|
|
var (
|
|
nm = make(map[uint64]int)
|
|
res uint64
|
|
maxCnt int
|
|
)
|
|
for _, bn := range votes {
|
|
_, ok := nm[bn]
|
|
if !ok {
|
|
nm[bn] = 0
|
|
}
|
|
nm[bn]++
|
|
cnt := nm[bn]
|
|
|
|
if cnt > maxCnt || (cnt == maxCnt && bn > res) {
|
|
res = bn
|
|
maxCnt = cnt
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|