refactor stage receipts and change the stages sorting

pull/4465/head
“GheisMohammadi” 1 year ago
parent 12d930fd97
commit 6f3aa67b88
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 25
      api/service/stagedstreamsync/default_stages.go
  2. 5
      api/service/stagedstreamsync/receipt_download_manager.go
  3. 327
      api/service/stagedstreamsync/stage_receipts.go
  4. 3
      api/service/stagedstreamsync/stages.go

@ -13,9 +13,9 @@ var DefaultForwardOrder = ForwardOrder{
SyncEpoch, SyncEpoch,
ShortRange, ShortRange,
BlockBodies, BlockBodies,
StateSync,
// Stages below don't use Internet
States, States,
StateSync,
Receipts,
LastMile, LastMile,
Finish, Finish,
} }
@ -23,8 +23,9 @@ var DefaultForwardOrder = ForwardOrder{
var DefaultRevertOrder = RevertOrder{ var DefaultRevertOrder = RevertOrder{
Finish, Finish,
LastMile, LastMile,
States, Receipts,
StateSync, StateSync,
States,
BlockBodies, BlockBodies,
ShortRange, ShortRange,
SyncEpoch, SyncEpoch,
@ -34,8 +35,9 @@ var DefaultRevertOrder = RevertOrder{
var DefaultCleanUpOrder = CleanUpOrder{ var DefaultCleanUpOrder = CleanUpOrder{
Finish, Finish,
LastMile, LastMile,
States, Receipts,
StateSync, StateSync,
States,
BlockBodies, BlockBodies,
ShortRange, ShortRange,
SyncEpoch, SyncEpoch,
@ -49,6 +51,7 @@ func DefaultStages(ctx context.Context,
bodiesCfg StageBodiesCfg, bodiesCfg StageBodiesCfg,
stateSyncCfg StageStateSyncCfg, stateSyncCfg StageStateSyncCfg,
statesCfg StageStatesCfg, statesCfg StageStatesCfg,
receiptsCfg StageReceiptsCfg,
lastMileCfg StageLastMileCfg, lastMileCfg StageLastMileCfg,
finishCfg StageFinishCfg, finishCfg StageFinishCfg,
) []*Stage { ) []*Stage {
@ -57,8 +60,9 @@ func DefaultStages(ctx context.Context,
handlerStageShortRange := NewStageShortRange(srCfg) handlerStageShortRange := NewStageShortRange(srCfg)
handlerStageEpochSync := NewStageEpoch(seCfg) handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageBodies := NewStageBodies(bodiesCfg) handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStateSync := NewStageStateSync(stateSyncCfg)
handlerStageStates := NewStageStates(statesCfg) handlerStageStates := NewStageStates(statesCfg)
handlerStageStateSync := NewStageStateSync(stateSyncCfg)
handlerStageReceipts := NewStageReceipts(receiptsCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg) handlerStageFinish := NewStageFinish(finishCfg)
@ -83,15 +87,20 @@ func DefaultStages(ctx context.Context,
Description: "Retrieve Block Bodies", Description: "Retrieve Block Bodies",
Handler: handlerStageBodies, Handler: handlerStageBodies,
}, },
{
ID: States,
Description: "Update Blockchain State",
Handler: handlerStageStates,
},
{ {
ID: StateSync, ID: StateSync,
Description: "Retrieve States", Description: "Retrieve States",
Handler: handlerStageStateSync, Handler: handlerStageStateSync,
}, },
{ {
ID: States, ID: Receipts,
Description: "Update Blockchain State", Description: "Retrieve Receipts",
Handler: handlerStageStates, Handler: handlerStageReceipts,
}, },
{ {
ID: LastMile, ID: LastMile,

@ -3,6 +3,7 @@ package stagedstreamsync
import ( import (
"sync" "sync"
"github.com/harmony-one/harmony/core/types"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -76,13 +77,13 @@ func (rdm *receiptDownloadManager) HandleRequestError(bns []uint64, err error, s
} }
// HandleRequestResult handles get receipts result // HandleRequestResult handles get receipts result
func (rdm *receiptDownloadManager) HandleRequestResult(bns []uint64, receiptBytes [][]byte, sigBytes [][]byte, loopID int, streamID sttypes.StreamID) error { func (rdm *receiptDownloadManager) HandleRequestResult(bns []uint64, receipts []*types.Receipt, loopID int, streamID sttypes.StreamID) error {
rdm.lock.Lock() rdm.lock.Lock()
defer rdm.lock.Unlock() defer rdm.lock.Unlock()
for i, bn := range bns { for i, bn := range bns {
delete(rdm.requesting, bn) delete(rdm.requesting, bn)
if indexExists(receiptBytes, i) && len(receiptBytes[i]) <= 1 { if indexExists(receipts, i) {
rdm.retries.push(bn) rdm.retries.push(bn)
} else { } else {
rdm.processing[bn] = struct{}{} rdm.processing[bn] = struct{}{}

@ -0,0 +1,327 @@
package stagedstreamsync
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
)
type StageReceipts struct {
configs StageReceiptsCfg
}
type StageReceiptsCfg struct {
bc core.BlockChain
db kv.RwDB
blockDBs []kv.RwDB
concurrency int
protocol syncProtocol
isBeacon bool
logProgress bool
}
func NewStageReceipts(cfg StageReceiptsCfg) *StageReceipts {
return &StageReceipts{
configs: cfg,
}
}
func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageReceiptsCfg {
return StageReceiptsCfg{
bc: bc,
db: db,
blockDBs: blockDBs,
concurrency: concurrency,
protocol: protocol,
isBeacon: isBeacon,
logProgress: logProgress,
}
}
// Exec progresses Bodies stage in the forward direction
func (b *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if invalidBlockRevert {
return nil
}
// for short range sync, skip this stage
if !s.state.initSync {
return nil
}
maxHeight := s.state.status.targetBN
currentHead := b.configs.bc.CurrentBlock().NumberU64()
if currentHead >= maxHeight {
return nil
}
currProgress := uint64(0)
targetHeight := s.state.currentCycle.TargetHeight
if errV := CreateView(ctx, b.configs.db, tx, func(etx kv.Tx) error {
if currProgress, err = s.CurrentStageProgress(etx); err != nil {
return err
}
return nil
}); errV != nil {
return errV
}
if currProgress == 0 {
currProgress = currentHead
}
if currProgress >= targetHeight {
return nil
}
// size := uint64(0)
startTime := time.Now()
// startBlock := currProgress
if b.configs.logProgress {
fmt.Print("\033[s") // save the cursor position
}
if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
// Fetch blocks from neighbors
s.state.rdm = newReceiptDownloadManager(tx, b.configs.bc, targetHeight, s.state.logger)
// Setup workers to fetch blocks from remote node
var wg sync.WaitGroup
for i := 0; i != s.state.config.Concurrency; i++ {
wg.Add(1)
go b.runReceiptWorkerLoop(ctx, s.state.rdm, &wg, i, startTime)
}
wg.Wait()
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
// runReceiptWorkerLoop creates a work loop for download receipts
func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time) {
currentBlock := int(b.configs.bc.CurrentBlock().NumberU64())
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
batch := rdm.GetNextBatch()
if len(batch) == 0 {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
return
}
}
var hashes []common.Hash
for _, bn := range batch {
header := b.configs.bc.GetHeaderByNumber(bn)
hashes = append(hashes, header.ReceiptHash())
}
receipts, stid, err := b.downloadReceipts(ctx, hashes)
if err != nil {
if !errors.Is(err, context.Canceled) {
b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed"))
err = errors.Wrap(err, "request error")
rdm.HandleRequestError(batch, err, stid)
} else if receipts == nil || len(receipts) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty reciptBytes"))
err := errors.New("downloadRawBlocks received empty reciptBytes")
rdm.HandleRequestError(batch, err, stid)
} else {
rdm.HandleRequestResult(batch, receipts, loopID, stid)
if b.configs.logProgress {
//calculating block download speed
dt := time.Now().Sub(startTime).Seconds()
speed := float64(0)
if dt > 0 {
speed = float64(len(rdm.rdd)) / dt
}
blockSpeed := fmt.Sprintf("%.2f", speed)
fmt.Print("\033[u\033[K") // restore the cursor position and clear the line
fmt.Println("downloaded blocks:", currentBlock+len(rdm.rdd), "/", int(rdm.targetBN), "(", blockSpeed, "blocks/s", ")")
}
}
}
}
func (b *StageReceipts) downloadReceipts(ctx context.Context, hs []common.Hash) ([]*types.Receipt, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
receipts, stid, err := b.configs.protocol.GetReceipts(ctx, hs)
if err != nil {
return nil, stid, err
}
if err := validateGetReceiptsResult(hs, receipts); err != nil {
return nil, stid, err
}
return receipts, stid, nil
}
func (b *StageReceipts) downloadRawBlocks(ctx context.Context, bns []uint64) ([][]byte, [][]byte, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return b.configs.protocol.GetRawBlocksByNumber(ctx, bns)
}
func validateGetReceiptsResult(requested []common.Hash, result []*types.Receipt) error {
// TODO: validate each receipt here
return nil
}
func (b *StageReceipts) saveProgress(ctx context.Context, s *StageState, progress uint64, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
// save progress
if err = s.Update(tx, progress); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_SYNC] saving progress for block bodies stage failed")
return ErrSavingBodiesProgressFail
}
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
func (b *StageReceipts) cleanBlocksDB(ctx context.Context, loopID int) (err error) {
tx, errb := b.configs.blockDBs[loopID].BeginRw(ctx)
if errb != nil {
return errb
}
defer tx.Rollback()
// clean block bodies db
if err = tx.ClearBucket(BlocksBucket); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_STREAM_SYNC] clear blocks bucket after revert failed")
return err
}
// clean block signatures db
if err = tx.ClearBucket(BlockSignaturesBucket); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_STREAM_SYNC] clear block signatures bucket after revert failed")
return err
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func (b *StageReceipts) cleanAllBlockDBs(ctx context.Context) (err error) {
//clean all blocks DBs
for i := 0; i < b.configs.concurrency; i++ {
if err := b.cleanBlocksDB(ctx, i); err != nil {
return err
}
}
return nil
}
func (b *StageReceipts) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
//clean all blocks DBs
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}
useInternalTx := tx == nil
if useInternalTx {
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
// save progress
currentHead := b.configs.bc.CurrentBlock().NumberU64()
if err = s.Update(tx, currentHead); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_SYNC] saving progress for block bodies stage after revert failed")
return err
}
if err = u.Done(tx); err != nil {
return err
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
func (b *StageReceipts) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
//clean all blocks DBs
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}
return nil
}

@ -12,8 +12,9 @@ const (
ShortRange SyncStageID = "ShortRange" // short range ShortRange SyncStageID = "ShortRange" // short range
SyncEpoch SyncStageID = "SyncEpoch" // epoch sync SyncEpoch SyncStageID = "SyncEpoch" // epoch sync
BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
StateSync SyncStageID = "StateSync" // State sync
States SyncStageID = "States" // will construct most recent state from downloaded blocks States SyncStageID = "States" // will construct most recent state from downloaded blocks
StateSync SyncStageID = "StateSync" // State sync
Receipts SyncStageID = "Receipts" // Receipts
LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well
Finish SyncStageID = "Finish" // Nominal stage after all other stages Finish SyncStageID = "Finish" // Nominal stage after all other stages
) )

Loading…
Cancel
Save