staged dns sync v1.0 (#4316)
* staged dns sync v1.0 * enabled stream downloader for localnet * fix code review issues * remove extra lock Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”>pull/4276/head
parent
4f5102a12c
commit
54742e73e1
@ -0,0 +1,86 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
) |
||||
|
||||
type ForwardOrder []SyncStageID |
||||
type RevertOrder []SyncStageID |
||||
type CleanUpOrder []SyncStageID |
||||
|
||||
var DefaultForwardOrder = ForwardOrder{ |
||||
Heads, |
||||
BlockHashes, |
||||
BlockBodies, |
||||
// Stages below don't use Internet
|
||||
States, |
||||
LastMile, |
||||
Finish, |
||||
} |
||||
|
||||
var DefaultRevertOrder = RevertOrder{ |
||||
Finish, |
||||
LastMile, |
||||
States, |
||||
BlockBodies, |
||||
BlockHashes, |
||||
Heads, |
||||
} |
||||
|
||||
var DefaultCleanUpOrder = CleanUpOrder{ |
||||
Finish, |
||||
LastMile, |
||||
States, |
||||
BlockBodies, |
||||
BlockHashes, |
||||
Heads, |
||||
} |
||||
|
||||
func DefaultStages(ctx context.Context, |
||||
headsCfg StageHeadsCfg, |
||||
blockHashesCfg StageBlockHashesCfg, |
||||
bodiesCfg StageBodiesCfg, |
||||
statesCfg StageStatesCfg, |
||||
lastMileCfg StageLastMileCfg, |
||||
finishCfg StageFinishCfg) []*Stage { |
||||
|
||||
handlerStageHeads := NewStageHeads(headsCfg) |
||||
handlerStageBlockHashes := NewStageBlockHashes(blockHashesCfg) |
||||
handlerStageBodies := NewStageBodies(bodiesCfg) |
||||
handleStageStates := NewStageStates(statesCfg) |
||||
handlerStageLastMile := NewStageLastMile(lastMileCfg) |
||||
handlerStageFinish := NewStageFinish(finishCfg) |
||||
|
||||
return []*Stage{ |
||||
{ |
||||
ID: Heads, |
||||
Description: "Retrieve Chain Heads", |
||||
Handler: handlerStageHeads, |
||||
}, |
||||
{ |
||||
ID: BlockHashes, |
||||
Description: "Download block hashes", |
||||
Handler: handlerStageBlockHashes, |
||||
}, |
||||
{ |
||||
ID: BlockBodies, |
||||
Description: "Download block bodies", |
||||
Handler: handlerStageBodies, |
||||
}, |
||||
{ |
||||
ID: States, |
||||
Description: "Insert new blocks and update blockchain states", |
||||
Handler: handleStageStates, |
||||
}, |
||||
{ |
||||
ID: LastMile, |
||||
Description: "update status for blocks after sync and update last mile blocks as well", |
||||
Handler: handlerStageLastMile, |
||||
}, |
||||
{ |
||||
ID: Finish, |
||||
Description: "Final stage to update current block for the RPC API", |
||||
Handler: handlerStageFinish, |
||||
}, |
||||
} |
||||
} |
@ -0,0 +1,51 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"fmt" |
||||
) |
||||
|
||||
// Errors ...
|
||||
var ( |
||||
ErrRegistrationFail = WrapStagedSyncError("registration failed") |
||||
ErrGetBlock = WrapStagedSyncError("get block failed") |
||||
ErrGetBlockHash = WrapStagedSyncError("get block hash failed") |
||||
ErrGetConsensusHashes = WrapStagedSyncError("get consensus hashes failed") |
||||
ErrGenStateSyncTaskQueue = WrapStagedSyncError("generate state sync task queue failed") |
||||
ErrDownloadBlocks = WrapStagedSyncError("get download blocks failed") |
||||
ErrUpdateBlockAndStatus = WrapStagedSyncError("update block and status failed") |
||||
ErrGenerateNewState = WrapStagedSyncError("get generate new state failed") |
||||
ErrFetchBlockHashProgressFail = WrapStagedSyncError("fetch cache progress for block hashes stage failed") |
||||
ErrFetchCachedBlockHashFail = WrapStagedSyncError("fetch cached block hashes failed") |
||||
ErrNotEnoughBlockHashes = WrapStagedSyncError("peers haven't sent all requested block hashes") |
||||
ErrRetrieveCachedProgressFail = WrapStagedSyncError("retrieving cache progress for block hashes stage failed") |
||||
ErrRetrieveCachedHashProgressFail = WrapStagedSyncError("retrieving cache progress for block hashes stage failed") |
||||
ErrSaveBlockHashesProgressFail = WrapStagedSyncError("saving progress for block hashes stage failed") |
||||
ErrSaveCachedBlockHashesProgressFail = WrapStagedSyncError("saving cache progress for block hashes stage failed") |
||||
ErrSavingCacheLastBlockHashFail = WrapStagedSyncError("saving cache last block hash for block hashes stage failed") |
||||
ErrCachingBlockHashFail = WrapStagedSyncError("caching downloaded block hashes failed") |
||||
ErrCommitTransactionFail = WrapStagedSyncError("failed to write db commit") |
||||
ErrUnexpectedNumberOfBlocks = WrapStagedSyncError("unexpected number of block delivered") |
||||
ErrSavingBodiesProgressFail = WrapStagedSyncError("saving progress for block bodies stage failed") |
||||
ErrAddTasksToQueueFail = WrapStagedSyncError("cannot add task to queue") |
||||
ErrSavingCachedBodiesProgressFail = WrapStagedSyncError("saving cache progress for blocks stage failed") |
||||
ErrRetrievingCachedBodiesProgressFail = WrapStagedSyncError("retrieving cache progress for blocks stage failed") |
||||
ErrNoConnectedPeers = WrapStagedSyncError("haven't connected to any peer yet!") |
||||
ErrNotEnoughConnectedPeers = WrapStagedSyncError("not enough connected peers") |
||||
ErrSaveStateProgressFail = WrapStagedSyncError("saving progress for block States stage failed") |
||||
ErrPruningCursorCreationFail = WrapStagedSyncError("failed to create cursor for pruning") |
||||
ErrInvalidBlockNumber = WrapStagedSyncError("invalid block number") |
||||
ErrInvalidBlockBytes = WrapStagedSyncError("invalid block bytes to insert into chain") |
||||
ErrAddTaskFailed = WrapStagedSyncError("cannot add task to queue") |
||||
ErrNodeNotEnoughBlockHashes = WrapStagedSyncError("some of the nodes didn't provide all block hashes") |
||||
ErrCachingBlocksFail = WrapStagedSyncError("caching downloaded block bodies failed") |
||||
ErrSaveBlocksFail = WrapStagedSyncError("save downloaded block bodies failed") |
||||
ErrStageNotFound = WrapStagedSyncError("stage not found") |
||||
ErrSomeNodesNotReady = WrapStagedSyncError("some nodes are not ready") |
||||
ErrSomeNodesBlockHashFail = WrapStagedSyncError("some nodes failed to download block hashes") |
||||
ErrMaxPeerHeightFail = WrapStagedSyncError("get max peer height failed") |
||||
) |
||||
|
||||
// WrapStagedSyncError wraps errors for staged sync and returns error object
|
||||
func WrapStagedSyncError(context string) error { |
||||
return fmt.Errorf("[STAGED_SYNC]: %s", context) |
||||
} |
@ -0,0 +1,106 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
) |
||||
|
||||
type ExecFunc func(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error |
||||
|
||||
type StageHandler interface { |
||||
// Exec is the execution function for the stage to move forward.
|
||||
// * firstCycle - is it the first cycle of syncing.
|
||||
// * invalidBlockRevert - whether the execution is to solve the invalid block
|
||||
// * s - is the current state of the stage and contains stage data.
|
||||
// * reverter - if the stage needs to cause reverting, `reverter` methods can be used.
|
||||
Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error |
||||
|
||||
// Revert is the reverting logic of the stage.
|
||||
// * firstCycle - is it the first cycle of syncing.
|
||||
// * u - contains information about the revert itself.
|
||||
// * s - represents the state of this stage at the beginning of revert.
|
||||
Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) error |
||||
|
||||
// CleanUp is the execution function for the stage to prune old data.
|
||||
// * firstCycle - is it the first cycle of syncing.
|
||||
// * p - is the current state of the stage and contains stage data.
|
||||
CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) error |
||||
} |
||||
|
||||
// Stage is a single sync stage in staged sync.
|
||||
type Stage struct { |
||||
// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
|
||||
ID SyncStageID |
||||
// Handler handles the logic for the stage
|
||||
Handler StageHandler |
||||
// Description is a string that is shown in the logs.
|
||||
Description string |
||||
// DisabledDescription shows in the log with a message if the stage is disabled. Here, you can show which command line flags should be provided to enable the page.
|
||||
DisabledDescription string |
||||
// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
|
||||
Disabled bool |
||||
} |
||||
|
||||
// StageState is the state of the stage.
|
||||
type StageState struct { |
||||
state *StagedSync |
||||
ID SyncStageID |
||||
BlockNumber uint64 // BlockNumber is the current block number of the stage at the beginning of the state execution.
|
||||
} |
||||
|
||||
func (s *StageState) LogPrefix() string { return s.state.LogPrefix() } |
||||
|
||||
func (s *StageState) CurrentStageProgress(db kv.Getter) (uint64, error) { |
||||
return GetStageProgress(db, s.ID, s.state.isBeacon) |
||||
} |
||||
|
||||
func (s *StageState) StageProgress(db kv.Getter, id SyncStageID) (uint64, error) { |
||||
return GetStageProgress(db, id, s.state.isBeacon) |
||||
} |
||||
|
||||
// Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.
|
||||
func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error { |
||||
return SaveStageProgress(db, s.ID, s.state.isBeacon, newBlockNum) |
||||
} |
||||
func (s *StageState) UpdateCleanUp(db kv.Putter, blockNum uint64) error { |
||||
return SaveStageCleanUpProgress(db, s.ID, s.state.isBeacon, blockNum) |
||||
} |
||||
|
||||
// Reverter allows the stage to cause an revert.
|
||||
type Reverter interface { |
||||
// RevertTo begins staged sync revert to the specified block.
|
||||
RevertTo(revertPoint uint64, invalidBlock common.Hash) |
||||
} |
||||
|
||||
// RevertState contains the information about revert.
|
||||
type RevertState struct { |
||||
ID SyncStageID |
||||
// RevertPoint is the block to revert to.
|
||||
RevertPoint uint64 |
||||
CurrentBlockNumber uint64 |
||||
// If revert is caused by a bad block, this hash is not empty
|
||||
InvalidBlock common.Hash |
||||
state *StagedSync |
||||
} |
||||
|
||||
func (u *RevertState) LogPrefix() string { return u.state.LogPrefix() } |
||||
|
||||
// Done updates the DB state of the stage.
|
||||
func (u *RevertState) Done(db kv.Putter) error { |
||||
return SaveStageProgress(db, u.ID, u.state.isBeacon, u.RevertPoint) |
||||
} |
||||
|
||||
type CleanUpState struct { |
||||
ID SyncStageID |
||||
ForwardProgress uint64 // progress of stage forward move
|
||||
CleanUpProgress uint64 // progress of stage prune move. after sync cycle it become equal to ForwardProgress by Done() method
|
||||
state *StagedSync |
||||
} |
||||
|
||||
func (s *CleanUpState) LogPrefix() string { return s.state.LogPrefix() + " CleanUp" } |
||||
func (s *CleanUpState) Done(db kv.Putter) error { |
||||
return SaveStageCleanUpProgress(db, s.ID, s.state.isBeacon, s.ForwardProgress) |
||||
} |
||||
func (s *CleanUpState) DoneAt(db kv.Putter, blockNum uint64) error { |
||||
return SaveStageCleanUpProgress(db, s.ID, s.state.isBeacon, blockNum) |
||||
} |
@ -0,0 +1,698 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/hex" |
||||
"fmt" |
||||
"strconv" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
"github.com/ledgerwatch/erigon-lib/kv/mdbx" |
||||
"github.com/ledgerwatch/log/v3" |
||||
) |
||||
|
||||
type StageBlockHashes struct { |
||||
configs StageBlockHashesCfg |
||||
} |
||||
|
||||
type StageBlockHashesCfg struct { |
||||
ctx context.Context |
||||
bc core.BlockChain |
||||
db kv.RwDB |
||||
turbo bool |
||||
turboModeCh chan struct{} |
||||
bgProcRunning bool |
||||
isBeacon bool |
||||
cachedb kv.RwDB |
||||
logProgress bool |
||||
} |
||||
|
||||
func NewStageBlockHashes(cfg StageBlockHashesCfg) *StageBlockHashes { |
||||
return &StageBlockHashes{ |
||||
configs: cfg, |
||||
} |
||||
} |
||||
|
||||
func NewStageBlockHashesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, isBeacon bool, turbo bool, logProgress bool) StageBlockHashesCfg { |
||||
cachedb, err := initHashesCacheDB(ctx, isBeacon) |
||||
if err != nil { |
||||
panic("can't initialize sync caches") |
||||
} |
||||
return StageBlockHashesCfg{ |
||||
ctx: ctx, |
||||
bc: bc, |
||||
db: db, |
||||
turbo: turbo, |
||||
isBeacon: isBeacon, |
||||
cachedb: cachedb, |
||||
logProgress: logProgress, |
||||
} |
||||
} |
||||
|
||||
func initHashesCacheDB(ctx context.Context, isBeacon bool) (db kv.RwDB, err error) { |
||||
// create caches db
|
||||
cachedbName := BlockHashesCacheDB |
||||
if isBeacon { |
||||
cachedbName = "beacon_" + cachedbName |
||||
} |
||||
cachedb := mdbx.NewMDBX(log.New()).Path(cachedbName).MustOpen() |
||||
// create transaction on cachedb
|
||||
tx, errRW := cachedb.BeginRw(ctx) |
||||
if errRW != nil { |
||||
utils.Logger().Error(). |
||||
Err(errRW). |
||||
Msg("[STAGED_SYNC] initializing sync caches failed") |
||||
return nil, errRW |
||||
} |
||||
defer tx.Rollback() |
||||
if err := tx.CreateBucket(BlockHashesBucket); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("[STAGED_SYNC] creating cache bucket failed") |
||||
return nil, err |
||||
} |
||||
if err := tx.CreateBucket(StageProgressBucket); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("[STAGED_SYNC] creating progress bucket failed") |
||||
return nil, err |
||||
} |
||||
if err := tx.Commit(); err != nil { |
||||
return nil, err |
||||
} |
||||
return cachedb, nil |
||||
} |
||||
|
||||
func (bh *StageBlockHashes) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { |
||||
|
||||
if len(s.state.syncConfig.peers) < NumPeersLowBound { |
||||
return ErrNotEnoughConnectedPeers |
||||
} |
||||
|
||||
maxPeersHeight := s.state.syncStatus.MaxPeersHeight |
||||
currentHead := bh.configs.bc.CurrentBlock().NumberU64() |
||||
if currentHead >= maxPeersHeight { |
||||
return nil |
||||
} |
||||
currProgress := uint64(0) |
||||
targetHeight := s.state.syncStatus.currentCycle.TargetHeight |
||||
isBeacon := s.state.isBeacon |
||||
startHash := bh.configs.bc.CurrentBlock().Hash() |
||||
isLastCycle := targetHeight >= maxPeersHeight |
||||
canRunInTurboMode := bh.configs.turbo && !isLastCycle |
||||
// retrieve the progress
|
||||
if errV := CreateView(bh.configs.ctx, bh.configs.db, tx, func(etx kv.Tx) error { |
||||
if currProgress, err = s.CurrentStageProgress(etx); err != nil { //GetStageProgress(etx, BlockHashes, isBeacon); err != nil {
|
||||
return err |
||||
} |
||||
if currProgress > 0 { |
||||
key := strconv.FormatUint(currProgress, 10) |
||||
bucketName := GetBucketName(BlockHashesBucket, isBeacon) |
||||
currHash := []byte{} |
||||
if currHash, err = etx.GetOne(bucketName, []byte(key)); err != nil || len(currHash[:]) == 0 { |
||||
//TODO: currProgress and DB don't match. Either re-download all or verify db and set currProgress to last
|
||||
return err |
||||
} |
||||
startHash.SetBytes(currHash[:]) |
||||
} |
||||
return nil |
||||
}); errV != nil { |
||||
return errV |
||||
} |
||||
|
||||
if currProgress == 0 { |
||||
if err := bh.clearBlockHashesBucket(tx, s.state.isBeacon); err != nil { |
||||
return err |
||||
} |
||||
startHash = bh.configs.bc.CurrentBlock().Hash() |
||||
currProgress = currentHead |
||||
} |
||||
|
||||
if currProgress >= targetHeight { |
||||
if canRunInTurboMode && currProgress < maxPeersHeight { |
||||
bh.configs.turboModeCh = make(chan struct{}) |
||||
go bh.runBackgroundProcess(nil, s, isBeacon, currProgress, maxPeersHeight, startHash) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// check whether any block hashes after curr height is cached
|
||||
if bh.configs.turbo && !firstCycle { |
||||
var cacheHash []byte |
||||
if cacheHash, err = bh.getHashFromCache(currProgress + 1); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] fetch cache progress for block hashes stage failed") |
||||
} else { |
||||
if len(cacheHash[:]) > 0 { |
||||
// get blocks from cached db rather than calling peers, and update current progress
|
||||
newProgress, newStartHash, err := bh.loadBlockHashesFromCache(s, cacheHash, currProgress, targetHeight, tx) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] fetch cached block hashes failed") |
||||
bh.clearCache() |
||||
bh.clearBlockHashesBucket(tx, isBeacon) |
||||
} else { |
||||
currProgress = newProgress |
||||
startHash.SetBytes(newStartHash[:]) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
if currProgress >= targetHeight { |
||||
if canRunInTurboMode && currProgress < maxPeersHeight { |
||||
bh.configs.turboModeCh = make(chan struct{}) |
||||
go bh.runBackgroundProcess(nil, s, isBeacon, currProgress, maxPeersHeight, startHash) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
size := uint32(0) |
||||
|
||||
startTime := time.Now() |
||||
startBlock := currProgress |
||||
if bh.configs.logProgress { |
||||
fmt.Print("\033[s") // save the cursor position
|
||||
} |
||||
|
||||
for ok := true; ok; ok = currProgress < targetHeight { |
||||
size = uint32(targetHeight - currProgress) |
||||
if size > SyncLoopBatchSize { |
||||
size = SyncLoopBatchSize |
||||
} |
||||
// Gets consensus hashes.
|
||||
if err := s.state.getConsensusHashes(startHash[:], size, false); err != nil { |
||||
return err |
||||
} |
||||
// selects the most common peer config based on their block hashes and doing the clean up
|
||||
if err := s.state.syncConfig.GetBlockHashesConsensusAndCleanUp(false); err != nil { |
||||
return err |
||||
} |
||||
// double check block hashes
|
||||
if s.state.DoubleCheckBlockHashes { |
||||
invalidPeersMap, validBlockHashes, err := s.state.getInvalidPeersByBlockHashes(tx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if validBlockHashes < int(size) { |
||||
return ErrNotEnoughBlockHashes |
||||
} |
||||
s.state.syncConfig.cleanUpInvalidPeers(invalidPeersMap) |
||||
} |
||||
// save the downloaded files to db
|
||||
if currProgress, startHash, err = bh.saveDownloadedBlockHashes(s, currProgress, startHash, tx); err != nil { |
||||
return err |
||||
} |
||||
// log the stage progress in console
|
||||
if bh.configs.logProgress { |
||||
//calculating block speed
|
||||
dt := time.Now().Sub(startTime).Seconds() |
||||
speed := float64(0) |
||||
if dt > 0 { |
||||
speed = float64(currProgress-startBlock) / dt |
||||
} |
||||
blockSpeed := fmt.Sprintf("%.2f", speed) |
||||
fmt.Print("\033[u\033[K") // restore the cursor position and clear the line
|
||||
fmt.Println("downloading block hash progress:", currProgress, "/", targetHeight, "(", blockSpeed, "blocks/s", ")") |
||||
} |
||||
} |
||||
|
||||
// continue downloading in background
|
||||
if canRunInTurboMode && currProgress < maxPeersHeight { |
||||
bh.configs.turboModeCh = make(chan struct{}) |
||||
go bh.runBackgroundProcess(nil, s, isBeacon, currProgress, maxPeersHeight, startHash) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// runBackgroundProcess continues downloading block hashes in the background and caching them on disk while next stages are running.
|
||||
// In the next sync cycle, this stage will use cached block hashes rather than download them from peers.
|
||||
// This helps performance and reduces stage duration. It also helps to use the resources more efficiently.
|
||||
func (bh *StageBlockHashes) runBackgroundProcess(tx kv.RwTx, s *StageState, isBeacon bool, startHeight uint64, targetHeight uint64, startHash common.Hash) error { |
||||
size := uint32(0) |
||||
currProgress := startHeight |
||||
currHash := startHash |
||||
bh.configs.bgProcRunning = true |
||||
|
||||
defer func() { |
||||
if bh.configs.bgProcRunning { |
||||
close(bh.configs.turboModeCh) |
||||
bh.configs.bgProcRunning = false |
||||
} |
||||
}() |
||||
|
||||
// retrieve bg progress and last hash
|
||||
errV := bh.configs.cachedb.View(context.Background(), func(rtx kv.Tx) error { |
||||
|
||||
if progressBytes, err := rtx.GetOne(StageProgressBucket, []byte(LastBlockHeight)); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] retrieving cache progress for block hashes stage failed") |
||||
return ErrRetrieveCachedProgressFail |
||||
} else { |
||||
if len(progressBytes[:]) > 0 { |
||||
savedProgress, _ := unmarshalData(progressBytes) |
||||
if savedProgress > startHeight { |
||||
currProgress = savedProgress |
||||
// retrieve start hash
|
||||
if lastBlockHash, err := rtx.GetOne(StageProgressBucket, []byte(LastBlockHash)); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] retrieving cache progress for block hashes stage failed") |
||||
return ErrRetrieveCachedHashProgressFail |
||||
} else { |
||||
currHash.SetBytes(lastBlockHash[:]) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
return nil |
||||
|
||||
}) |
||||
if errV != nil { |
||||
return errV |
||||
} |
||||
|
||||
for { |
||||
select { |
||||
case <-bh.configs.turboModeCh: |
||||
return nil |
||||
default: |
||||
if currProgress >= targetHeight { |
||||
return nil |
||||
} |
||||
|
||||
size = uint32(targetHeight - currProgress) |
||||
if size > SyncLoopBatchSize { |
||||
size = SyncLoopBatchSize |
||||
} |
||||
|
||||
// Gets consensus hashes.
|
||||
if err := s.state.getConsensusHashes(currHash[:], size, true); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// selects the most common peer config based on their block hashes and doing the clean up
|
||||
if err := s.state.syncConfig.GetBlockHashesConsensusAndCleanUp(true); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// save the downloaded files to db
|
||||
var err error |
||||
if currProgress, currHash, err = bh.saveBlockHashesInCacheDB(s, currProgress, currHash); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
//TODO: do we need sleep a few milliseconds? ex: time.Sleep(1 * time.Millisecond)
|
||||
} |
||||
} |
||||
|
||||
func (bh *StageBlockHashes) clearBlockHashesBucket(tx kv.RwTx, isBeacon bool) error { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = bh.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
bucketName := GetBucketName(BlockHashesBucket, isBeacon) |
||||
if err := tx.ClearBucket(bucketName); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// saveDownloadedBlockHashes saves block hashes to db (map from block heigh to block hash)
|
||||
func (bh *StageBlockHashes) saveDownloadedBlockHashes(s *StageState, progress uint64, startHash common.Hash, tx kv.RwTx) (p uint64, h common.Hash, err error) { |
||||
p = progress |
||||
h.SetBytes(startHash.Bytes()) |
||||
lastAddedID := int(0) // the first id won't be added
|
||||
saved := false |
||||
|
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = bh.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return p, h, err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
s.state.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) { |
||||
if len(configPeer.blockHashes) == 0 { |
||||
return //fetch the rest from other peer
|
||||
} |
||||
|
||||
for id := 0; id < len(configPeer.blockHashes); id++ { |
||||
if id <= lastAddedID { |
||||
continue |
||||
} |
||||
blockHash := configPeer.blockHashes[id] |
||||
if len(blockHash) == 0 { |
||||
return //fetch the rest from other peer
|
||||
} |
||||
key := strconv.FormatUint(p+1, 10) |
||||
bucketName := GetBucketName(BlockHashesBucket, s.state.isBeacon) |
||||
if err := tx.Put(bucketName, []byte(key), blockHash); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Int("block hash index", id). |
||||
Str("block hash", hex.EncodeToString(blockHash)). |
||||
Msg("[STAGED_SYNC] adding block hash to db failed") |
||||
return |
||||
} |
||||
p++ |
||||
h.SetBytes(blockHash[:]) |
||||
lastAddedID = id |
||||
} |
||||
// check if all block hashes are added to db break the loop
|
||||
if lastAddedID == len(configPeer.blockHashes)-1 { |
||||
saved = true |
||||
brk = true |
||||
} |
||||
return |
||||
}) |
||||
|
||||
// save progress
|
||||
if err = s.Update(tx, p); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving progress for block hashes stage failed") |
||||
return progress, startHash, ErrSaveBlockHashesProgressFail |
||||
} |
||||
|
||||
if len(s.state.syncConfig.peers) > 0 && len(s.state.syncConfig.peers[0].blockHashes) > 0 && !saved { |
||||
return progress, startHash, ErrSaveBlockHashesProgressFail |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return progress, startHash, err |
||||
} |
||||
} |
||||
return p, h, nil |
||||
} |
||||
|
||||
// saveBlockHashesInCacheDB saves block hashes to cache db (map from block heigh to block hash)
|
||||
func (bh *StageBlockHashes) saveBlockHashesInCacheDB(s *StageState, progress uint64, startHash common.Hash) (p uint64, h common.Hash, err error) { |
||||
p = progress |
||||
h.SetBytes(startHash[:]) |
||||
lastAddedID := int(0) // the first id won't be added
|
||||
saved := false |
||||
|
||||
etx, err := bh.configs.cachedb.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return p, h, err |
||||
} |
||||
defer etx.Rollback() |
||||
|
||||
s.state.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) { |
||||
for id, blockHash := range configPeer.blockHashes { |
||||
if id <= lastAddedID { |
||||
continue |
||||
} |
||||
key := strconv.FormatUint(p+1, 10) |
||||
if err := etx.Put(BlockHashesBucket, []byte(key), blockHash); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Int("block hash index", id). |
||||
Str("block hash", hex.EncodeToString(blockHash)). |
||||
Msg("[STAGED_SYNC] adding block hash to db failed") |
||||
return |
||||
} |
||||
p++ |
||||
h.SetBytes(blockHash[:]) |
||||
lastAddedID = id |
||||
} |
||||
|
||||
// check if all block hashes are added to db break the loop
|
||||
if lastAddedID == len(configPeer.blockHashes)-1 { |
||||
saved = true |
||||
brk = true |
||||
} |
||||
return |
||||
}) |
||||
|
||||
// save cache progress (last block height)
|
||||
if err = etx.Put(StageProgressBucket, []byte(LastBlockHeight), marshalData(p)); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving cache progress for block hashes stage failed") |
||||
return p, h, ErrSaveCachedBlockHashesProgressFail |
||||
} |
||||
|
||||
// save cache progress
|
||||
if err = etx.Put(StageProgressBucket, []byte(LastBlockHash), h.Bytes()); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving cache last block hash for block hashes stage failed") |
||||
return p, h, ErrSavingCacheLastBlockHashFail |
||||
} |
||||
|
||||
// if node was connected to other peers and had some hashes to store in db, but it failed to save the blocks, return error
|
||||
if len(s.state.syncConfig.peers) > 0 && len(s.state.syncConfig.peers[0].blockHashes) > 0 && !saved { |
||||
return p, h, ErrCachingBlockHashFail |
||||
} |
||||
|
||||
// commit transaction to db to cache all downloaded blocks
|
||||
if err := etx.Commit(); err != nil { |
||||
return p, h, err |
||||
} |
||||
|
||||
// it cached block hashes successfully, so, it returns the cache progress and last cached block hash
|
||||
return p, h, nil |
||||
} |
||||
|
||||
// clearCache removes block hashes from cache db
|
||||
func (bh *StageBlockHashes) clearCache() error { |
||||
tx, err := bh.configs.cachedb.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
defer tx.Rollback() |
||||
if err := tx.ClearBucket(BlockHashesBucket); err != nil { |
||||
return nil |
||||
} |
||||
|
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// getHashFromCache fetches block hashes from cache db
|
||||
func (bh *StageBlockHashes) getHashFromCache(height uint64) (h []byte, err error) { |
||||
|
||||
tx, err := bh.configs.cachedb.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer tx.Rollback() |
||||
|
||||
var cacheHash []byte |
||||
key := strconv.FormatUint(height, 10) |
||||
if exist, err := tx.Has(BlockHashesBucket, []byte(key)); !exist || err != nil { |
||||
return nil, ErrFetchBlockHashProgressFail |
||||
} |
||||
if cacheHash, err = tx.GetOne(BlockHashesBucket, []byte(key)); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] fetch cache progress for block hashes stage failed") |
||||
return nil, ErrFetchBlockHashProgressFail |
||||
} |
||||
hv, _ := unmarshalData(cacheHash) |
||||
if len(cacheHash) <= 1 || hv == 0 { |
||||
return nil, ErrFetchBlockHashProgressFail |
||||
} |
||||
if err := tx.Commit(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return cacheHash[:], nil |
||||
} |
||||
|
||||
// loadBlockHashesFromCache loads block hashes from cache db to main sync db and update the progress
|
||||
func (bh *StageBlockHashes) loadBlockHashesFromCache(s *StageState, startHash []byte, startHeight uint64, targetHeight uint64, tx kv.RwTx) (p uint64, h common.Hash, err error) { |
||||
|
||||
p = startHeight |
||||
h.SetBytes(startHash[:]) |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = bh.configs.db.BeginRw(bh.configs.ctx) |
||||
if err != nil { |
||||
return p, h, err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if errV := bh.configs.cachedb.View(context.Background(), func(rtx kv.Tx) error { |
||||
// load block hashes from cache db and copy them to main sync db
|
||||
for ok := true; ok; ok = p < targetHeight { |
||||
key := strconv.FormatUint(p+1, 10) |
||||
lastHash, err := rtx.GetOne(BlockHashesBucket, []byte(key)) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Str("block height", key). |
||||
Msg("[STAGED_SYNC] retrieve block hash from cache failed") |
||||
return err |
||||
} |
||||
if len(lastHash[:]) == 0 { |
||||
return nil |
||||
} |
||||
bucketName := GetBucketName(BlockHashesBucket, s.state.isBeacon) |
||||
if err = tx.Put(bucketName, []byte(key), lastHash); err != nil { |
||||
return err |
||||
} |
||||
h.SetBytes(lastHash[:]) |
||||
p++ |
||||
} |
||||
// load extra block hashes from cache db and copy them to bg db to be downloaded in background by block stage
|
||||
s.state.syncStatus.currentCycle.lock.Lock() |
||||
defer s.state.syncStatus.currentCycle.lock.Unlock() |
||||
pExtraHashes := p |
||||
s.state.syncStatus.currentCycle.ExtraHashes = make(map[uint64][]byte) |
||||
for ok := true; ok; ok = pExtraHashes < p+s.state.MaxBackgroundBlocks { |
||||
key := strconv.FormatUint(pExtraHashes+1, 10) |
||||
newHash, err := rtx.GetOne(BlockHashesBucket, []byte(key)) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Str("block height", key). |
||||
Msg("[STAGED_SYNC] retrieve extra block hashes for background process failed") |
||||
break |
||||
} |
||||
if len(newHash[:]) == 0 { |
||||
return nil |
||||
} |
||||
s.state.syncStatus.currentCycle.ExtraHashes[pExtraHashes+1] = newHash |
||||
pExtraHashes++ |
||||
} |
||||
return nil |
||||
}); errV != nil { |
||||
return startHeight, h, errV |
||||
} |
||||
|
||||
// save progress
|
||||
if err = s.Update(tx, p); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving retrieved cached progress for block hashes stage failed") |
||||
h.SetBytes(startHash[:]) |
||||
return startHeight, h, err |
||||
} |
||||
|
||||
// update the progress
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
h.SetBytes(startHash[:]) |
||||
return startHeight, h, err |
||||
} |
||||
} |
||||
return p, h, nil |
||||
} |
||||
|
||||
func (bh *StageBlockHashes) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = bh.configs.db.BeginRw(bh.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
// terminate background process in turbo mode
|
||||
if bh.configs.bgProcRunning { |
||||
bh.configs.bgProcRunning = false |
||||
bh.configs.turboModeCh <- struct{}{} |
||||
close(bh.configs.turboModeCh) |
||||
} |
||||
|
||||
// clean block hashes db
|
||||
hashesBucketName := GetBucketName(BlockHashesBucket, bh.configs.isBeacon) |
||||
if err = tx.ClearBucket(hashesBucketName); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] clear block hashes bucket after revert failed") |
||||
return err |
||||
} |
||||
|
||||
// clean cache db as well
|
||||
if err := bh.clearCache(); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] clear block hashes cache failed") |
||||
return err |
||||
} |
||||
|
||||
// clear extra block hashes
|
||||
s.state.syncStatus.currentCycle.ExtraHashes = make(map[uint64][]byte) |
||||
|
||||
// save progress
|
||||
currentHead := bh.configs.bc.CurrentBlock().NumberU64() |
||||
if err = s.Update(tx, currentHead); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving progress for block hashes stage after revert failed") |
||||
return err |
||||
} |
||||
|
||||
if err = u.Done(tx); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] reset after revert failed") |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return ErrCommitTransactionFail |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (bh *StageBlockHashes) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = bh.configs.db.BeginRw(bh.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
// terminate background process in turbo mode
|
||||
if bh.configs.bgProcRunning { |
||||
bh.configs.bgProcRunning = false |
||||
bh.configs.turboModeCh <- struct{}{} |
||||
close(bh.configs.turboModeCh) |
||||
} |
||||
|
||||
hashesBucketName := GetBucketName(BlockHashesBucket, bh.configs.isBeacon) |
||||
tx.ClearBucket(hashesBucketName) |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,784 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/hex" |
||||
"fmt" |
||||
"strconv" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/Workiva/go-datastructures/queue" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
"github.com/ledgerwatch/erigon-lib/kv/mdbx" |
||||
"github.com/ledgerwatch/log/v3" |
||||
) |
||||
|
||||
type StageBodies struct { |
||||
configs StageBodiesCfg |
||||
} |
||||
type StageBodiesCfg struct { |
||||
ctx context.Context |
||||
bc core.BlockChain |
||||
db kv.RwDB |
||||
turbo bool |
||||
turboModeCh chan struct{} |
||||
bgProcRunning bool |
||||
isBeacon bool |
||||
cachedb kv.RwDB |
||||
logProgress bool |
||||
} |
||||
|
||||
func NewStageBodies(cfg StageBodiesCfg) *StageBodies { |
||||
return &StageBodies{ |
||||
configs: cfg, |
||||
} |
||||
} |
||||
|
||||
func NewStageBodiesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, isBeacon bool, turbo bool, logProgress bool) StageBodiesCfg { |
||||
cachedb, err := initBlocksCacheDB(ctx, isBeacon) |
||||
if err != nil { |
||||
panic("can't initialize sync caches") |
||||
} |
||||
return StageBodiesCfg{ |
||||
ctx: ctx, |
||||
bc: bc, |
||||
db: db, |
||||
turbo: turbo, |
||||
isBeacon: isBeacon, |
||||
cachedb: cachedb, |
||||
logProgress: logProgress, |
||||
} |
||||
} |
||||
|
||||
func initBlocksCacheDB(ctx context.Context, isBeacon bool) (db kv.RwDB, err error) { |
||||
// create caches db
|
||||
cachedbName := BlockCacheDB |
||||
if isBeacon { |
||||
cachedbName = "beacon_" + cachedbName |
||||
} |
||||
cachedb := mdbx.NewMDBX(log.New()).Path(cachedbName).MustOpen() |
||||
tx, errRW := cachedb.BeginRw(ctx) |
||||
if errRW != nil { |
||||
utils.Logger().Error(). |
||||
Err(errRW). |
||||
Msg("[STAGED_SYNC] initializing sync caches failed") |
||||
return nil, errRW |
||||
} |
||||
defer tx.Rollback() |
||||
if err := tx.CreateBucket(DownloadedBlocksBucket); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("[STAGED_SYNC] creating cache bucket failed") |
||||
return nil, err |
||||
} |
||||
if err := tx.CreateBucket(StageProgressBucket); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("[STAGED_SYNC] creating progress bucket failed") |
||||
return nil, err |
||||
} |
||||
if err := tx.Commit(); err != nil { |
||||
return nil, err |
||||
} |
||||
return cachedb, nil |
||||
} |
||||
|
||||
// Exec progresses Bodies stage in the forward direction
|
||||
func (b *StageBodies) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { |
||||
|
||||
maxPeersHeight := s.state.syncStatus.MaxPeersHeight |
||||
currentHead := b.configs.bc.CurrentBlock().NumberU64() |
||||
if currentHead >= maxPeersHeight { |
||||
return nil |
||||
} |
||||
currProgress := uint64(0) |
||||
targetHeight := s.state.syncStatus.currentCycle.TargetHeight |
||||
isBeacon := s.state.isBeacon |
||||
isLastCycle := targetHeight >= maxPeersHeight |
||||
canRunInTurboMode := b.configs.turbo && !isLastCycle |
||||
|
||||
if errV := CreateView(b.configs.ctx, b.configs.db, tx, func(etx kv.Tx) error { |
||||
if currProgress, err = s.CurrentStageProgress(etx); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
}); errV != nil { |
||||
return errV |
||||
} |
||||
|
||||
if currProgress == 0 { |
||||
if err := b.clearBlocksBucket(tx, s.state.isBeacon); err != nil { |
||||
return err |
||||
} |
||||
currProgress = currentHead |
||||
} |
||||
|
||||
if currProgress >= targetHeight { |
||||
return nil |
||||
} |
||||
|
||||
// load cached blocks to main sync db
|
||||
if b.configs.turbo && !firstCycle { |
||||
if currProgress, err = b.loadBlocksFromCache(s, currProgress, tx); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
if currProgress >= targetHeight { |
||||
return nil |
||||
} |
||||
|
||||
size := uint64(0) |
||||
startTime := time.Now() |
||||
startBlock := currProgress |
||||
if b.configs.logProgress { |
||||
fmt.Print("\033[s") // save the cursor position
|
||||
} |
||||
|
||||
for ok := true; ok; ok = currProgress < targetHeight { |
||||
maxSize := targetHeight - currProgress |
||||
size = uint64(downloadTaskBatch * len(s.state.syncConfig.peers)) |
||||
if size > maxSize { |
||||
size = maxSize |
||||
} |
||||
if err = b.loadBlockHashesToTaskQueue(s, currProgress+1, size, tx); err != nil { |
||||
s.state.RevertTo(b.configs.bc.CurrentBlock().NumberU64(), b.configs.bc.CurrentBlock().Hash()) |
||||
return err |
||||
} |
||||
|
||||
// Download blocks.
|
||||
verifyAllSig := true //TODO: move it to configs
|
||||
if err = b.downloadBlocks(s, verifyAllSig, tx); err != nil { |
||||
return nil |
||||
} |
||||
// save blocks and update current progress
|
||||
if currProgress, err = b.saveDownloadedBlocks(s, currProgress, tx); err != nil { |
||||
return err |
||||
} |
||||
// log the stage progress in console
|
||||
if b.configs.logProgress { |
||||
//calculating block speed
|
||||
dt := time.Now().Sub(startTime).Seconds() |
||||
speed := float64(0) |
||||
if dt > 0 { |
||||
speed = float64(currProgress-startBlock) / dt |
||||
} |
||||
blockSpeed := fmt.Sprintf("%.2f", speed) |
||||
fmt.Print("\033[u\033[K") // restore the cursor position and clear the line
|
||||
fmt.Println("downloading blocks progress:", currProgress, "/", targetHeight, "(", blockSpeed, "blocks/s", ")") |
||||
} |
||||
} |
||||
|
||||
// Run background process in turbo mode
|
||||
if canRunInTurboMode && currProgress < maxPeersHeight { |
||||
b.configs.turboModeCh = make(chan struct{}) |
||||
go b.runBackgroundProcess(tx, s, isBeacon, currProgress, currProgress+s.state.MaxBackgroundBlocks) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// runBackgroundProcess continues downloading blocks in the background and caching them on disk while next stages are running.
|
||||
// In the next sync cycle, this stage will use cached blocks rather than download them from peers.
|
||||
// This helps performance and reduces stage duration. It also helps to use the resources more efficiently.
|
||||
func (b *StageBodies) runBackgroundProcess(tx kv.RwTx, s *StageState, isBeacon bool, startHeight uint64, targetHeight uint64) error { |
||||
|
||||
s.state.syncStatus.currentCycle.lock.RLock() |
||||
defer s.state.syncStatus.currentCycle.lock.RUnlock() |
||||
|
||||
if s.state.syncStatus.currentCycle.Number == 0 || len(s.state.syncStatus.currentCycle.ExtraHashes) == 0 { |
||||
return nil |
||||
} |
||||
currProgress := startHeight |
||||
var err error |
||||
size := uint64(0) |
||||
b.configs.bgProcRunning = true |
||||
|
||||
defer func() { |
||||
if b.configs.bgProcRunning { |
||||
close(b.configs.turboModeCh) |
||||
b.configs.bgProcRunning = false |
||||
} |
||||
}() |
||||
|
||||
for ok := true; ok; ok = currProgress < targetHeight { |
||||
select { |
||||
case <-b.configs.turboModeCh: |
||||
return nil |
||||
default: |
||||
if currProgress >= targetHeight { |
||||
return nil |
||||
} |
||||
|
||||
maxSize := targetHeight - currProgress |
||||
size = uint64(downloadTaskBatch * len(s.state.syncConfig.peers)) |
||||
if size > maxSize { |
||||
size = maxSize |
||||
} |
||||
if err = b.loadExtraBlockHashesToTaskQueue(s, currProgress+1, size); err != nil { |
||||
return err |
||||
} |
||||
// Download blocks.
|
||||
verifyAllSig := true //TODO: move it to configs
|
||||
if err = b.downloadBlocks(s, verifyAllSig, nil); err != nil { |
||||
return nil |
||||
} |
||||
// save blocks and update current progress
|
||||
if currProgress, err = b.cacheBlocks(s, currProgress); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (b *StageBodies) clearBlocksBucket(tx kv.RwTx, isBeacon bool) error { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = b.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
bucketName := GetBucketName(DownloadedBlocksBucket, isBeacon) |
||||
if err := tx.ClearBucket(bucketName); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// downloadBlocks downloads blocks from state sync task queue.
|
||||
func (b *StageBodies) downloadBlocks(s *StageState, verifyAllSig bool, tx kv.RwTx) (err error) { |
||||
ss := s.state |
||||
var wg sync.WaitGroup |
||||
taskQueue := downloadTaskQueue{ss.stateSyncTaskQueue} |
||||
s.state.InitDownloadedBlocksMap() |
||||
|
||||
ss.syncConfig.ForEachPeer(func(peerConfig *SyncPeerConfig) (brk bool) { |
||||
wg.Add(1) |
||||
go func() { |
||||
defer wg.Done() |
||||
if !peerConfig.client.IsReady() { |
||||
// try to connect
|
||||
if ready := peerConfig.client.WaitForConnection(1000 * time.Millisecond); !ready { |
||||
if !peerConfig.client.IsConnecting() { // if it's idle or closed then remove it
|
||||
ss.syncConfig.RemovePeer(peerConfig, "not ready to download blocks") |
||||
} |
||||
return |
||||
} |
||||
} |
||||
for !taskQueue.empty() { |
||||
tasks, err := taskQueue.poll(downloadTaskBatch, time.Millisecond) |
||||
if err != nil || len(tasks) == 0 { |
||||
if err == queue.ErrDisposed { |
||||
continue |
||||
} |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("[STAGED_SYNC] downloadBlocks: ss.stateSyncTaskQueue poll timeout") |
||||
break |
||||
} |
||||
payload, err := peerConfig.GetBlocks(tasks.blockHashes()) |
||||
if err != nil { |
||||
isBrokenPeer := peerConfig.AddFailedTime(downloadBlocksRetryLimit) |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Str("peerID", peerConfig.ip). |
||||
Str("port", peerConfig.port). |
||||
Msg("[STAGED_SYNC] downloadBlocks: GetBlocks failed") |
||||
if err := taskQueue.put(tasks); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Interface("taskIndexes", tasks.indexes()). |
||||
Msg("cannot add task back to queue") |
||||
} |
||||
if isBrokenPeer { |
||||
ss.syncConfig.RemovePeer(peerConfig, "get blocks failed") |
||||
} |
||||
return |
||||
} |
||||
if len(payload) == 0 { |
||||
isBrokenPeer := peerConfig.AddFailedTime(downloadBlocksRetryLimit) |
||||
utils.Logger().Error(). |
||||
Str("peerID", peerConfig.ip). |
||||
Str("port", peerConfig.port). |
||||
Msg("[STAGED_SYNC] downloadBlocks: no more retrievable blocks") |
||||
if err := taskQueue.put(tasks); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Interface("taskIndexes", tasks.indexes()). |
||||
Interface("taskBlockes", tasks.blockHashesStr()). |
||||
Msg("downloadBlocks: cannot add task") |
||||
} |
||||
if isBrokenPeer { |
||||
ss.syncConfig.RemovePeer(peerConfig, "no blocks in payload") |
||||
} |
||||
return |
||||
} |
||||
// node received blocks from peer, so it is working now
|
||||
peerConfig.failedTimes = 0 |
||||
|
||||
failedTasks, err := b.handleBlockSyncResult(s, payload, tasks, verifyAllSig, tx) |
||||
if err != nil { |
||||
isBrokenPeer := peerConfig.AddFailedTime(downloadBlocksRetryLimit) |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Str("peerID", peerConfig.ip). |
||||
Str("port", peerConfig.port). |
||||
Msg("[STAGED_SYNC] downloadBlocks: handleBlockSyncResult failed") |
||||
if err := taskQueue.put(tasks); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Interface("taskIndexes", tasks.indexes()). |
||||
Interface("taskBlockes", tasks.blockHashesStr()). |
||||
Msg("downloadBlocks: cannot add task") |
||||
} |
||||
if isBrokenPeer { |
||||
ss.syncConfig.RemovePeer(peerConfig, "handleBlockSyncResult failed") |
||||
} |
||||
return |
||||
} |
||||
|
||||
if len(failedTasks) != 0 { |
||||
isBrokenPeer := peerConfig.AddFailedTime(downloadBlocksRetryLimit) |
||||
utils.Logger().Error(). |
||||
Str("peerID", peerConfig.ip). |
||||
Str("port", peerConfig.port). |
||||
Msg("[STAGED_SYNC] downloadBlocks: some tasks failed") |
||||
if err := taskQueue.put(failedTasks); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Interface("task Indexes", failedTasks.indexes()). |
||||
Interface("task Blocks", tasks.blockHashesStr()). |
||||
Msg("cannot add task") |
||||
} |
||||
if isBrokenPeer { |
||||
ss.syncConfig.RemovePeer(peerConfig, "some blocks failed to handle") |
||||
} |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
return |
||||
}) |
||||
wg.Wait() |
||||
return nil |
||||
} |
||||
|
||||
func (b *StageBodies) handleBlockSyncResult(s *StageState, payload [][]byte, tasks syncBlockTasks, verifyAllSig bool, tx kv.RwTx) (syncBlockTasks, error) { |
||||
if len(payload) > len(tasks) { |
||||
utils.Logger().Error(). |
||||
Err(ErrUnexpectedNumberOfBlocks). |
||||
Int("expect", len(tasks)). |
||||
Int("got", len(payload)) |
||||
return tasks, ErrUnexpectedNumberOfBlocks |
||||
} |
||||
|
||||
var failedTasks syncBlockTasks |
||||
if len(payload) < len(tasks) { |
||||
utils.Logger().Warn(). |
||||
Err(ErrUnexpectedNumberOfBlocks). |
||||
Int("expect", len(tasks)). |
||||
Int("got", len(payload)) |
||||
failedTasks = append(failedTasks, tasks[len(payload):]...) |
||||
} |
||||
|
||||
s.state.lockBlocks.Lock() |
||||
defer s.state.lockBlocks.Unlock() |
||||
|
||||
for i, blockBytes := range payload { |
||||
if len(blockBytes[:]) <= 1 { |
||||
failedTasks = append(failedTasks, tasks[i]) |
||||
continue |
||||
} |
||||
k := uint64(tasks[i].index) // fmt.Sprintf("%d", tasks[i].index) //fmt.Sprintf("%020d", tasks[i].index)
|
||||
s.state.downloadedBlocks[k] = make([]byte, len(blockBytes)) |
||||
copy(s.state.downloadedBlocks[k], blockBytes[:]) |
||||
} |
||||
|
||||
return failedTasks, nil |
||||
} |
||||
|
||||
func (b *StageBodies) saveProgress(s *StageState, progress uint64, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = b.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
// save progress
|
||||
if err = s.Update(tx, progress); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving progress for block bodies stage failed") |
||||
return ErrSavingBodiesProgressFail |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (b *StageBodies) loadBlockHashesToTaskQueue(s *StageState, startIndex uint64, size uint64, tx kv.RwTx) error { |
||||
s.state.stateSyncTaskQueue = queue.New(0) |
||||
if errV := CreateView(b.configs.ctx, b.configs.db, tx, func(etx kv.Tx) error { |
||||
|
||||
for i := startIndex; i < startIndex+size; i++ { |
||||
key := strconv.FormatUint(i, 10) |
||||
id := int(i - startIndex) |
||||
bucketName := GetBucketName(BlockHashesBucket, s.state.isBeacon) |
||||
blockHash, err := etx.GetOne(bucketName, []byte(key)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if blockHash == nil || len(blockHash) == 0 { |
||||
break |
||||
} |
||||
if err := s.state.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}); err != nil { |
||||
s.state.stateSyncTaskQueue = queue.New(0) |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Int("taskIndex", id). |
||||
Str("taskBlock", hex.EncodeToString(blockHash)). |
||||
Msg("[STAGED_SYNC] loadBlockHashesToTaskQueue: cannot add task") |
||||
break |
||||
} |
||||
} |
||||
return nil |
||||
|
||||
}); errV != nil { |
||||
return errV |
||||
} |
||||
|
||||
if s.state.stateSyncTaskQueue.Len() != int64(size) { |
||||
return ErrAddTaskFailed |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (b *StageBodies) loadExtraBlockHashesToTaskQueue(s *StageState, startIndex uint64, size uint64) error { |
||||
|
||||
s.state.stateSyncTaskQueue = queue.New(0) |
||||
|
||||
for i := startIndex; i < startIndex+size; i++ { |
||||
id := int(i - startIndex) |
||||
blockHash := s.state.syncStatus.currentCycle.ExtraHashes[i] |
||||
if len(blockHash[:]) == 0 { |
||||
break |
||||
} |
||||
if err := s.state.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}); err != nil { |
||||
s.state.stateSyncTaskQueue = queue.New(0) |
||||
utils.Logger().Warn(). |
||||
Err(err). |
||||
Int("taskIndex", id). |
||||
Str("taskBlock", hex.EncodeToString(blockHash)). |
||||
Msg("[STAGED_SYNC] loadBlockHashesToTaskQueue: cannot add task") |
||||
break |
||||
} |
||||
} |
||||
|
||||
if s.state.stateSyncTaskQueue.Len() != int64(size) { |
||||
return ErrAddTasksToQueueFail |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (b *StageBodies) saveDownloadedBlocks(s *StageState, progress uint64, tx kv.RwTx) (p uint64, err error) { |
||||
p = progress |
||||
|
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = b.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return p, err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
downloadedBlocks := s.state.GetDownloadedBlocks() |
||||
|
||||
for i := uint64(0); i < uint64(len(downloadedBlocks)); i++ { |
||||
blockBytes := downloadedBlocks[i] |
||||
n := progress + i + 1 |
||||
blkNumber := marshalData(n) |
||||
bucketName := GetBucketName(DownloadedBlocksBucket, s.state.isBeacon) |
||||
if err := tx.Put(bucketName, blkNumber, blockBytes); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Uint64("block height", n). |
||||
Msg("[STAGED_SYNC] adding block to db failed") |
||||
return p, err |
||||
} |
||||
p++ |
||||
} |
||||
// check if all block hashes are added to db break the loop
|
||||
if p-progress != uint64(len(downloadedBlocks)) { |
||||
return progress, ErrSaveBlocksFail |
||||
} |
||||
// save progress
|
||||
if err = s.Update(tx, p); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving progress for block bodies stage failed") |
||||
return progress, ErrSavingBodiesProgressFail |
||||
} |
||||
// if it's using its own transaction, commit transaction to db to cache all downloaded blocks
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return progress, err |
||||
} |
||||
} |
||||
// it cached blocks successfully, so, it returns the cache progress
|
||||
return p, nil |
||||
} |
||||
|
||||
func (b *StageBodies) cacheBlocks(s *StageState, progress uint64) (p uint64, err error) { |
||||
p = progress |
||||
|
||||
tx, err := b.configs.cachedb.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return p, err |
||||
} |
||||
defer tx.Rollback() |
||||
|
||||
downloadedBlocks := s.state.GetDownloadedBlocks() |
||||
|
||||
for i := uint64(0); i < uint64(len(downloadedBlocks)); i++ { |
||||
blockBytes := downloadedBlocks[i] |
||||
n := progress + i + 1 |
||||
blkNumber := marshalData(n) // fmt.Sprintf("%020d", p+1)
|
||||
if err := tx.Put(DownloadedBlocksBucket, blkNumber, blockBytes); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Uint64("block height", p). |
||||
Msg("[STAGED_SYNC] caching block failed") |
||||
return p, err |
||||
} |
||||
p++ |
||||
} |
||||
// check if all block hashes are added to db break the loop
|
||||
if p-progress != uint64(len(downloadedBlocks)) { |
||||
return p, ErrCachingBlocksFail |
||||
} |
||||
|
||||
// save progress
|
||||
if err = tx.Put(StageProgressBucket, []byte(LastBlockHeight), marshalData(p)); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving cache progress for blocks stage failed") |
||||
return p, ErrSavingCachedBodiesProgressFail |
||||
} |
||||
|
||||
if err := tx.Commit(); err != nil { |
||||
return p, err |
||||
} |
||||
|
||||
return p, nil |
||||
} |
||||
|
||||
// clearCache removes block hashes from cache db
|
||||
func (b *StageBodies) clearCache() error { |
||||
tx, err := b.configs.cachedb.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
defer tx.Rollback() |
||||
|
||||
if err := tx.ClearBucket(DownloadedBlocksBucket); err != nil { |
||||
return nil |
||||
} |
||||
|
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// load blocks from cache db to main sync db and update the progress
|
||||
func (b *StageBodies) loadBlocksFromCache(s *StageState, startHeight uint64, tx kv.RwTx) (p uint64, err error) { |
||||
|
||||
p = startHeight |
||||
|
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = b.configs.db.BeginRw(b.configs.ctx) |
||||
if err != nil { |
||||
return p, err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
defer func() { |
||||
// Clear cache db
|
||||
b.configs.cachedb.Update(context.Background(), func(etx kv.RwTx) error { |
||||
if err := etx.ClearBucket(DownloadedBlocksBucket); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
}) |
||||
}() |
||||
|
||||
errV := b.configs.cachedb.View(context.Background(), func(rtx kv.Tx) error { |
||||
lastCachedHeightBytes, err := rtx.GetOne(StageProgressBucket, []byte(LastBlockHeight)) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] retrieving cache progress for blocks stage failed") |
||||
return ErrRetrievingCachedBodiesProgressFail |
||||
} |
||||
lastHeight, err := unmarshalData(lastCachedHeightBytes) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] retrieving cache progress for blocks stage failed") |
||||
return ErrRetrievingCachedBodiesProgressFail |
||||
} |
||||
|
||||
if startHeight >= lastHeight { |
||||
return nil |
||||
} |
||||
|
||||
// load block hashes from cache db snd copy them to main sync db
|
||||
for ok := true; ok; ok = p < lastHeight { |
||||
key := marshalData(p + 1) |
||||
blkBytes, err := rtx.GetOne(DownloadedBlocksBucket, []byte(key)) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Uint64("block height", p+1). |
||||
Msg("[STAGED_SYNC] retrieve block from cache failed") |
||||
return err |
||||
} |
||||
if len(blkBytes[:]) <= 1 { |
||||
break |
||||
} |
||||
bucketName := GetBucketName(DownloadedBlocksBucket, s.state.isBeacon) |
||||
if err = tx.Put(bucketName, []byte(key), blkBytes); err != nil { |
||||
return err |
||||
} |
||||
p++ |
||||
} |
||||
return nil |
||||
}) |
||||
if errV != nil { |
||||
return startHeight, errV |
||||
} |
||||
|
||||
// save progress
|
||||
if err = s.Update(tx, p); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving retrieved cached progress for blocks stage failed") |
||||
return startHeight, ErrSavingCachedBodiesProgressFail |
||||
} |
||||
|
||||
// update the progress
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return startHeight, err |
||||
} |
||||
} |
||||
|
||||
return p, nil |
||||
} |
||||
|
||||
func (b *StageBodies) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = b.configs.db.BeginRw(b.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
// terminate background process in turbo mode
|
||||
if b.configs.bgProcRunning { |
||||
b.configs.bgProcRunning = false |
||||
b.configs.turboModeCh <- struct{}{} |
||||
close(b.configs.turboModeCh) |
||||
} |
||||
|
||||
// clean block hashes db
|
||||
blocksBucketName := GetBucketName(DownloadedBlocksBucket, b.configs.isBeacon) |
||||
if err = tx.ClearBucket(blocksBucketName); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] clear blocks bucket after revert failed") |
||||
return err |
||||
} |
||||
|
||||
// clean cache db as well
|
||||
if err := b.clearCache(); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] clear blocks cache failed") |
||||
return err |
||||
} |
||||
|
||||
// save progress
|
||||
currentHead := b.configs.bc.CurrentBlock().NumberU64() |
||||
if err = s.Update(tx, currentHead); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving progress for block bodies stage after revert failed") |
||||
return err |
||||
} |
||||
|
||||
if err = u.Done(tx); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (b *StageBodies) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = b.configs.db.BeginRw(b.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
// terminate background process in turbo mode
|
||||
if b.configs.bgProcRunning { |
||||
b.configs.bgProcRunning = false |
||||
b.configs.turboModeCh <- struct{}{} |
||||
close(b.configs.turboModeCh) |
||||
} |
||||
blocksBucketName := GetBucketName(DownloadedBlocksBucket, b.configs.isBeacon) |
||||
tx.ClearBucket(blocksBucketName) |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,114 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
) |
||||
|
||||
type StageFinish struct { |
||||
configs StageFinishCfg |
||||
} |
||||
|
||||
type StageFinishCfg struct { |
||||
ctx context.Context |
||||
db kv.RwDB |
||||
} |
||||
|
||||
func NewStageFinish(cfg StageFinishCfg) *StageFinish { |
||||
return &StageFinish{ |
||||
configs: cfg, |
||||
} |
||||
} |
||||
|
||||
func NewStageFinishCfg(ctx context.Context, db kv.RwDB) StageFinishCfg { |
||||
return StageFinishCfg{ |
||||
ctx: ctx, |
||||
db: db, |
||||
} |
||||
} |
||||
|
||||
func (finish *StageFinish) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = finish.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
// TODO: prepare indices (useful for RPC) and finalize
|
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (bh *StageFinish) clearBucket(tx kv.RwTx, isBeacon bool) error { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = bh.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
bucketName := GetBucketName(BlockHashesBucket, isBeacon) |
||||
if err := tx.ClearBucket(bucketName); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (finish *StageFinish) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = finish.configs.db.BeginRw(finish.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if err = u.Done(tx); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (finish *StageFinish) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = finish.configs.db.BeginRw(finish.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,146 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
) |
||||
|
||||
type StageHeads struct { |
||||
configs StageHeadsCfg |
||||
} |
||||
|
||||
type StageHeadsCfg struct { |
||||
ctx context.Context |
||||
bc core.BlockChain |
||||
db kv.RwDB |
||||
} |
||||
|
||||
func NewStageHeads(cfg StageHeadsCfg) *StageHeads { |
||||
return &StageHeads{ |
||||
configs: cfg, |
||||
} |
||||
} |
||||
|
||||
func NewStageHeadersCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageHeadsCfg { |
||||
return StageHeadsCfg{ |
||||
ctx: ctx, |
||||
bc: bc, |
||||
db: db, |
||||
} |
||||
} |
||||
|
||||
func (heads *StageHeads) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error { |
||||
|
||||
if len(s.state.syncConfig.peers) < NumPeersLowBound { |
||||
return ErrNotEnoughConnectedPeers |
||||
} |
||||
|
||||
// no need to update target if we are redoing the stages because of bad block
|
||||
if invalidBlockRevert { |
||||
return nil |
||||
} |
||||
|
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = heads.configs.db.BeginRw(heads.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
maxPeersHeight := s.state.syncStatus.MaxPeersHeight |
||||
maxBlocksPerSyncCycle := s.state.MaxBlocksPerSyncCycle |
||||
currentHeight := heads.configs.bc.CurrentBlock().NumberU64() |
||||
s.state.syncStatus.currentCycle.TargetHeight = maxPeersHeight |
||||
targetHeight := uint64(0) |
||||
if errV := CreateView(heads.configs.ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) { |
||||
if targetHeight, err = s.CurrentStageProgress(etx); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
}); errV != nil { |
||||
return errV |
||||
} |
||||
|
||||
// if current height is ahead of target height, we need recalculate target height
|
||||
if targetHeight <= currentHeight { |
||||
if maxPeersHeight <= currentHeight { |
||||
return nil |
||||
} |
||||
utils.Logger().Info(). |
||||
Uint64("max blocks per sync cycle", maxBlocksPerSyncCycle). |
||||
Uint64("maxPeersHeight", maxPeersHeight). |
||||
Msgf("[STAGED_SYNC] current height is ahead of target height, target height is readjusted to max peers height") |
||||
targetHeight = maxPeersHeight |
||||
} |
||||
|
||||
if targetHeight > maxPeersHeight { |
||||
targetHeight = maxPeersHeight |
||||
} |
||||
|
||||
if maxBlocksPerSyncCycle > 0 && targetHeight-currentHeight > maxBlocksPerSyncCycle { |
||||
targetHeight = currentHeight + maxBlocksPerSyncCycle |
||||
} |
||||
|
||||
s.state.syncStatus.currentCycle.TargetHeight = targetHeight |
||||
|
||||
if err := s.Update(tx, targetHeight); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving progress for headers stage failed") |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (heads *StageHeads) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = heads.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if err = u.Done(tx); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (heads *StageHeads) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = heads.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,121 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
) |
||||
|
||||
type StageLastMile struct { |
||||
configs StageLastMileCfg |
||||
} |
||||
|
||||
type StageLastMileCfg struct { |
||||
ctx context.Context |
||||
bc core.BlockChain |
||||
db kv.RwDB |
||||
} |
||||
|
||||
func NewStageLastMile(cfg StageLastMileCfg) *StageLastMile { |
||||
return &StageLastMile{ |
||||
configs: cfg, |
||||
} |
||||
} |
||||
|
||||
func NewStageLastMileCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageLastMileCfg { |
||||
return StageLastMileCfg{ |
||||
ctx: ctx, |
||||
bc: bc, |
||||
db: db, |
||||
} |
||||
} |
||||
|
||||
func (lm *StageLastMile) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { |
||||
|
||||
maxPeersHeight := s.state.syncStatus.MaxPeersHeight |
||||
targetHeight := s.state.syncStatus.currentCycle.TargetHeight |
||||
isLastCycle := targetHeight >= maxPeersHeight |
||||
if !isLastCycle { |
||||
return nil |
||||
} |
||||
|
||||
bc := lm.configs.bc |
||||
// update blocks after node start sync
|
||||
parentHash := bc.CurrentBlock().Hash() |
||||
for { |
||||
block := s.state.getMaxConsensusBlockFromParentHash(parentHash) |
||||
if block == nil { |
||||
break |
||||
} |
||||
err = s.state.UpdateBlockAndStatus(block, bc, true) |
||||
if err != nil { |
||||
break |
||||
} |
||||
parentHash = block.Hash() |
||||
} |
||||
// TODO ek – Do we need to hold syncMux now that syncConfig has its own mutex?
|
||||
s.state.syncMux.Lock() |
||||
s.state.syncConfig.ForEachPeer(func(peer *SyncPeerConfig) (brk bool) { |
||||
peer.newBlocks = []*types.Block{} |
||||
return |
||||
}) |
||||
s.state.syncMux.Unlock() |
||||
|
||||
// update last mile blocks if any
|
||||
parentHash = bc.CurrentBlock().Hash() |
||||
for { |
||||
block := s.state.getBlockFromLastMileBlocksByParentHash(parentHash) |
||||
if block == nil { |
||||
break |
||||
} |
||||
err = s.state.UpdateBlockAndStatus(block, bc, false) |
||||
if err != nil { |
||||
break |
||||
} |
||||
parentHash = block.Hash() |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (lm *StageLastMile) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = lm.configs.db.BeginRw(lm.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if err = u.Done(tx); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (lm *StageLastMile) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = lm.configs.db.BeginRw(lm.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,330 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
"github.com/harmony-one/harmony/internal/chain" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
"github.com/pkg/errors" |
||||
) |
||||
|
||||
type StageStates struct { |
||||
configs StageStatesCfg |
||||
} |
||||
type StageStatesCfg struct { |
||||
ctx context.Context |
||||
bc core.BlockChain |
||||
db kv.RwDB |
||||
logProgress bool |
||||
} |
||||
|
||||
func NewStageStates(cfg StageStatesCfg) *StageStates { |
||||
return &StageStates{ |
||||
configs: cfg, |
||||
} |
||||
} |
||||
|
||||
func NewStageStatesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, logProgress bool) StageStatesCfg { |
||||
return StageStatesCfg{ |
||||
ctx: ctx, |
||||
bc: bc, |
||||
db: db, |
||||
logProgress: logProgress, |
||||
} |
||||
} |
||||
|
||||
func getBlockHashByHeight(h uint64, isBeacon bool, tx kv.RwTx) common.Hash { |
||||
var invalidBlockHash common.Hash |
||||
hashesBucketName := GetBucketName(BlockHashesBucket, isBeacon) |
||||
blockHeight := marshalData(h) |
||||
if invalidBlockHashBytes, err := tx.GetOne(hashesBucketName, blockHeight); err == nil { |
||||
invalidBlockHash.SetBytes(invalidBlockHashBytes) |
||||
} |
||||
return invalidBlockHash |
||||
} |
||||
|
||||
// Exec progresses States stage in the forward direction
|
||||
func (stg *StageStates) Exec(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { |
||||
|
||||
maxPeersHeight := s.state.syncStatus.MaxPeersHeight |
||||
currentHead := stg.configs.bc.CurrentBlock().NumberU64() |
||||
if currentHead >= maxPeersHeight { |
||||
return nil |
||||
} |
||||
currProgress := stg.configs.bc.CurrentBlock().NumberU64() |
||||
targetHeight := s.state.syncStatus.currentCycle.TargetHeight |
||||
if currProgress >= targetHeight { |
||||
return nil |
||||
} |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = stg.configs.db.BeginRw(stg.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
blocksBucketName := GetBucketName(DownloadedBlocksBucket, s.state.isBeacon) |
||||
isLastCycle := targetHeight >= maxPeersHeight |
||||
verifyAllSig := s.state.VerifyAllSig || isLastCycle //if it's last cycle, we have to check all signatures
|
||||
startTime := time.Now() |
||||
startBlock := currProgress |
||||
var newBlocks types.Blocks |
||||
nBlock := int(0) |
||||
|
||||
if stg.configs.logProgress { |
||||
fmt.Print("\033[s") // save the cursor position
|
||||
} |
||||
|
||||
for i := currProgress + 1; i <= targetHeight; i++ { |
||||
key := marshalData(i) |
||||
blockBytes, err := tx.GetOne(blocksBucketName, key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// if block size is invalid, we have to break the updating state loop
|
||||
// we don't need to do rollback, because the latest batch haven't added to chain yet
|
||||
sz := len(blockBytes) |
||||
if sz <= 1 { |
||||
utils.Logger().Error(). |
||||
Uint64("block number", i). |
||||
Msg("block size invalid") |
||||
invalidBlockHash := getBlockHashByHeight(i, s.state.isBeacon, tx) |
||||
s.state.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), invalidBlockHash) |
||||
return ErrInvalidBlockBytes |
||||
} |
||||
|
||||
block, err := RlpDecodeBlockOrBlockWithSig(blockBytes) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Uint64("block number", i). |
||||
Msg("block RLP decode failed") |
||||
invalidBlockHash := getBlockHashByHeight(i, s.state.isBeacon, tx) |
||||
s.state.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), invalidBlockHash) |
||||
return err |
||||
} |
||||
|
||||
/* |
||||
// TODO: use hash as key and here check key (which is hash) against block.header.hash
|
||||
gotHash := block.Hash() |
||||
if !bytes.Equal(gotHash[:], tasks[i].blockHash) { |
||||
utils.Logger().Warn(). |
||||
Err(errors.New("wrong block delivery")). |
||||
Str("expectHash", hex.EncodeToString(tasks[i].blockHash)). |
||||
Str("gotHash", hex.EncodeToString(gotHash[:])) |
||||
continue |
||||
} |
||||
*/ |
||||
if block.NumberU64() != i { |
||||
invalidBlockHash := getBlockHashByHeight(i, s.state.isBeacon, tx) |
||||
s.state.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), invalidBlockHash) |
||||
return ErrInvalidBlockNumber |
||||
} |
||||
if block.NumberU64() <= currProgress { |
||||
continue |
||||
} |
||||
|
||||
// Verify block signatures
|
||||
if block.NumberU64() > 1 { |
||||
// Verify signature every N blocks (which N is verifyHeaderBatchSize and can be adjusted in configs)
|
||||
haveCurrentSig := len(block.GetCurrentCommitSig()) != 0 |
||||
verifySeal := block.NumberU64()%s.state.VerifyHeaderBatchSize == 0 || verifyAllSig |
||||
verifyCurrentSig := verifyAllSig && haveCurrentSig |
||||
bc := stg.configs.bc |
||||
if err = stg.verifyBlockSignatures(bc, block, verifyCurrentSig, verifySeal, verifyAllSig); err != nil { |
||||
invalidBlockHash := getBlockHashByHeight(i, s.state.isBeacon, tx) |
||||
s.state.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), invalidBlockHash) |
||||
return err |
||||
} |
||||
|
||||
/* |
||||
//TODO: we are handling the bad blocks and already blocks are verified, so do we need verify header?
|
||||
err := stg.configs.bc.Engine().VerifyHeader(stg.configs.bc, block.Header(), verifySeal) |
||||
if err == engine.ErrUnknownAncestor { |
||||
return err |
||||
} else if err != nil { |
||||
utils.Logger().Error().Err(err).Msgf("[STAGED_SYNC] failed verifying signatures for new block %d", block.NumberU64()) |
||||
if !verifyAllSig { |
||||
utils.Logger().Info().Interface("block", stg.configs.bc.CurrentBlock()).Msg("[STAGED_SYNC] Rolling back last 99 blocks!") |
||||
for i := uint64(0); i < s.state.VerifyHeaderBatchSize-1; i++ { |
||||
if rbErr := stg.configs.bc.Rollback([]common.Hash{stg.configs.bc.CurrentBlock().Hash()}); rbErr != nil { |
||||
utils.Logger().Err(rbErr).Msg("[STAGED_SYNC] UpdateBlockAndStatus: failed to rollback") |
||||
return err |
||||
} |
||||
} |
||||
currProgress = stg.configs.bc.CurrentBlock().NumberU64() |
||||
} |
||||
return err |
||||
} |
||||
*/ |
||||
} |
||||
|
||||
newBlocks = append(newBlocks, block) |
||||
if nBlock < s.state.InsertChainBatchSize-1 && block.NumberU64() < targetHeight { |
||||
nBlock++ |
||||
continue |
||||
} |
||||
|
||||
// insert downloaded block into chain
|
||||
headBeforeNewBlocks := stg.configs.bc.CurrentBlock().NumberU64() |
||||
headHashBeforeNewBlocks := stg.configs.bc.CurrentBlock().Hash() |
||||
_, err = stg.configs.bc.InsertChain(newBlocks, false) //TODO: verifyHeaders can be done here
|
||||
if err != nil { |
||||
// TODO: handle chain rollback because of bad block
|
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Uint64("block number", block.NumberU64()). |
||||
Uint32("shard", block.ShardID()). |
||||
Msgf("[STAGED_SYNC] UpdateBlockAndStatus: Error adding new block to blockchain") |
||||
// rollback bc
|
||||
utils.Logger().Info(). |
||||
Interface("block", stg.configs.bc.CurrentBlock()). |
||||
Msg("[STAGED_SYNC] Rolling back last added blocks!") |
||||
if rbErr := stg.configs.bc.Rollback([]common.Hash{headHashBeforeNewBlocks}); rbErr != nil { |
||||
utils.Logger().Error(). |
||||
Err(rbErr). |
||||
Msg("[STAGED_SYNC] UpdateBlockAndStatus: failed to rollback") |
||||
return err |
||||
} |
||||
s.state.RevertTo(headBeforeNewBlocks, headHashBeforeNewBlocks) |
||||
return err |
||||
} |
||||
utils.Logger().Info(). |
||||
Uint64("blockHeight", block.NumberU64()). |
||||
Uint64("blockEpoch", block.Epoch().Uint64()). |
||||
Str("blockHex", block.Hash().Hex()). |
||||
Uint32("ShardID", block.ShardID()). |
||||
Msg("[STAGED_SYNC] UpdateBlockAndStatus: New Block Added to Blockchain") |
||||
|
||||
// update cur progress
|
||||
currProgress = stg.configs.bc.CurrentBlock().NumberU64() |
||||
|
||||
for i, tx := range block.StakingTransactions() { |
||||
utils.Logger().Info(). |
||||
Msgf( |
||||
"StakingTxn %d: %s, %v", i, tx.StakingType().String(), tx.StakingMessage(), |
||||
) |
||||
} |
||||
|
||||
nBlock = 0 |
||||
newBlocks = newBlocks[:0] |
||||
// log the stage progress in console
|
||||
if stg.configs.logProgress { |
||||
//calculating block speed
|
||||
dt := time.Now().Sub(startTime).Seconds() |
||||
speed := float64(0) |
||||
if dt > 0 { |
||||
speed = float64(currProgress-startBlock) / dt |
||||
} |
||||
blockSpeed := fmt.Sprintf("%.2f", speed) |
||||
fmt.Print("\033[u\033[K") // restore the cursor position and clear the line
|
||||
fmt.Println("insert blocks progress:", currProgress, "/", targetHeight, "(", blockSpeed, "blocks/s", ")") |
||||
} |
||||
|
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
//verifyBlockSignatures verifies block signatures
|
||||
func (stg *StageStates) verifyBlockSignatures(bc core.BlockChain, block *types.Block, verifyCurrentSig bool, verifySeal bool, verifyAllSig bool) (err error) { |
||||
if verifyCurrentSig { |
||||
sig, bitmap, err := chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig()) |
||||
if err != nil { |
||||
return errors.Wrap(err, "parse commitSigAndBitmap") |
||||
} |
||||
|
||||
startTime := time.Now() |
||||
if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sig, bitmap); err != nil { |
||||
return errors.Wrapf(err, "verify header signature %v", block.Hash().String()) |
||||
} |
||||
utils.Logger().Debug(). |
||||
Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()). |
||||
Msg("[STAGED_SYNC] VerifyHeaderSignature") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// saveProgress saves the stage progress
|
||||
func (stg *StageStates) saveProgress(s *StageState, tx kv.RwTx) (err error) { |
||||
|
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
var err error |
||||
tx, err = stg.configs.db.BeginRw(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
// save progress
|
||||
if err = s.Update(tx, stg.configs.bc.CurrentBlock().NumberU64()); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msgf("[STAGED_SYNC] saving progress for block States stage failed") |
||||
return ErrSaveStateProgressFail |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (stg *StageStates) Revert(firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = stg.configs.db.BeginRw(stg.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if err = u.Done(tx); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (stg *StageStates) CleanUp(firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { |
||||
useInternalTx := tx == nil |
||||
if useInternalTx { |
||||
tx, err = stg.configs.db.BeginRw(stg.configs.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
|
||||
if useInternalTx { |
||||
if err = tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,94 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"fmt" |
||||
|
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
) |
||||
|
||||
// SyncStageID represents the stages in the Mode.StagedSync mode
|
||||
type SyncStageID string |
||||
|
||||
const ( |
||||
Heads SyncStageID = "Heads" // Heads are downloaded
|
||||
BlockHashes SyncStageID = "BlockHashes" // block hashes are downloaded from peers
|
||||
BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
|
||||
States SyncStageID = "States" // will construct most recent state from downloaded blocks
|
||||
LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well
|
||||
Finish SyncStageID = "Finish" // Nominal stage after all other stages
|
||||
) |
||||
|
||||
func GetStageName(stage string, isBeacon bool, prune bool) string { |
||||
name := stage |
||||
if isBeacon { |
||||
name = "beacon_" + name |
||||
} |
||||
if prune { |
||||
name = "prune_" + name |
||||
} |
||||
return name |
||||
} |
||||
|
||||
func GetStageID(stage SyncStageID, isBeacon bool, prune bool) []byte { |
||||
return []byte(GetStageName(string(stage), isBeacon, prune)) |
||||
} |
||||
|
||||
func GetBucketName(bucketName string, isBeacon bool) string { |
||||
name := bucketName |
||||
if isBeacon { |
||||
name = "Beacon" + name |
||||
} |
||||
return name |
||||
} |
||||
|
||||
// GetStageProgress retrieves saved progress of given sync stage from the database
|
||||
func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) { |
||||
stgID := GetStageID(stage, isBeacon, false) |
||||
v, err := db.GetOne(kv.SyncStageProgress, stgID) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return unmarshalData(v) |
||||
} |
||||
|
||||
// SaveStageProgress saves progress of given sync stage
|
||||
func SaveStageProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error { |
||||
stgID := GetStageID(stage, isBeacon, false) |
||||
return db.Put(kv.SyncStageProgress, stgID, marshalData(progress)) |
||||
} |
||||
|
||||
// GetStageCleanUpProgress retrieves saved progress of given sync stage from the database
|
||||
func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) { |
||||
stgID := GetStageID(stage, isBeacon, true) |
||||
v, err := db.GetOne(kv.SyncStageProgress, stgID) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return unmarshalData(v) |
||||
} |
||||
|
||||
func SaveStageCleanUpProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error { |
||||
stgID := GetStageID(stage, isBeacon, true) |
||||
return db.Put(kv.SyncStageProgress, stgID, marshalData(progress)) |
||||
} |
||||
|
||||
func marshalData(blockNumber uint64) []byte { |
||||
return encodeBigEndian(blockNumber) |
||||
} |
||||
|
||||
func unmarshalData(data []byte) (uint64, error) { |
||||
if len(data) == 0 { |
||||
return 0, nil |
||||
} |
||||
if len(data) < 8 { |
||||
return 0, fmt.Errorf("value must be at least 8 bytes, got %d", len(data)) |
||||
} |
||||
return binary.BigEndian.Uint64(data[:8]), nil |
||||
} |
||||
|
||||
func encodeBigEndian(n uint64) []byte { |
||||
var v [8]byte |
||||
binary.BigEndian.PutUint64(v[:], n) |
||||
return v[:] |
||||
} |
@ -0,0 +1,401 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/hex" |
||||
"errors" |
||||
"math/rand" |
||||
"reflect" |
||||
"sort" |
||||
"sync" |
||||
|
||||
"github.com/harmony-one/harmony/api/service/legacysync/downloader" |
||||
pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
// Constants for syncing.
|
||||
const ( |
||||
downloadBlocksRetryLimit = 3 // downloadBlocks service retry limit
|
||||
RegistrationNumber = 3 |
||||
SyncingPortDifference = 3000 |
||||
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
|
||||
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
|
||||
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
|
||||
LastMileBlocksSize = 50 |
||||
|
||||
// after cutting off a number of connected peers, the result number of peers
|
||||
// shall be between numPeersLowBound and numPeersHighBound
|
||||
NumPeersLowBound = 3 |
||||
numPeersHighBound = 5 |
||||
|
||||
// NumPeersReserved is the number reserved peers which will be replaced with any broken peer
|
||||
NumPeersReserved = 2 |
||||
|
||||
// downloadTaskBatch is the number of tasks per each downloader request
|
||||
downloadTaskBatch = 5 |
||||
) |
||||
|
||||
// SyncPeerConfig is peer config to sync.
|
||||
type SyncPeerConfig struct { |
||||
ip string |
||||
port string |
||||
peerHash []byte |
||||
client *downloader.Client |
||||
blockHashes [][]byte // block hashes before node doing sync
|
||||
newBlocks []*types.Block // blocks after node doing sync
|
||||
mux sync.RWMutex |
||||
failedTimes uint64 |
||||
} |
||||
|
||||
// CreateTestSyncPeerConfig used for testing.
|
||||
func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig { |
||||
return &SyncPeerConfig{ |
||||
client: client, |
||||
blockHashes: blockHashes, |
||||
} |
||||
} |
||||
|
||||
// GetClient returns client pointer of downloader.Client
|
||||
func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client { |
||||
return peerConfig.client |
||||
} |
||||
|
||||
// AddFailedTime considers one more peer failure and checks against max allowed failed times
|
||||
func (peerConfig *SyncPeerConfig) AddFailedTime(maxFailures uint64) (mustStop bool) { |
||||
peerConfig.mux.Lock() |
||||
defer peerConfig.mux.Unlock() |
||||
peerConfig.failedTimes++ |
||||
if peerConfig.failedTimes > maxFailures { |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
// IsEqual checks the equality between two sync peers
|
||||
func (peerConfig *SyncPeerConfig) IsEqual(pc2 *SyncPeerConfig) bool { |
||||
return peerConfig.ip == pc2.ip && peerConfig.port == pc2.port |
||||
} |
||||
|
||||
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
|
||||
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) { |
||||
response := peerConfig.client.GetBlocksAndSigs(hashes) |
||||
if response == nil { |
||||
return nil, ErrGetBlock |
||||
} |
||||
return response.Payload, nil |
||||
} |
||||
|
||||
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error { |
||||
response := peerConfig.client.Register(peerHash, ip, port) |
||||
if response == nil || response.Type == pb.DownloaderResponse_FAIL { |
||||
return ErrRegistrationFail |
||||
} else if response.Type == pb.DownloaderResponse_SUCCESS { |
||||
return nil |
||||
} |
||||
return ErrRegistrationFail |
||||
} |
||||
|
||||
// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes.
|
||||
func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int { |
||||
if len(a.blockHashes) != len(b.blockHashes) { |
||||
if len(a.blockHashes) < len(b.blockHashes) { |
||||
return -1 |
||||
} |
||||
return 1 |
||||
} |
||||
for id := range a.blockHashes { |
||||
if !reflect.DeepEqual(a.blockHashes[id], b.blockHashes[id]) { |
||||
return bytes.Compare(a.blockHashes[id], b.blockHashes[id]) |
||||
} |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
// SyncBlockTask is the task struct to sync a specific block.
|
||||
type SyncBlockTask struct { |
||||
index int |
||||
blockHash []byte |
||||
} |
||||
|
||||
type syncBlockTasks []SyncBlockTask |
||||
|
||||
func (tasks syncBlockTasks) blockHashes() [][]byte { |
||||
hashes := make([][]byte, 0, len(tasks)) |
||||
for _, task := range tasks { |
||||
hash := make([]byte, len(task.blockHash)) |
||||
copy(hash, task.blockHash) |
||||
hashes = append(hashes, task.blockHash) |
||||
} |
||||
return hashes |
||||
} |
||||
|
||||
func (tasks syncBlockTasks) blockHashesStr() []string { |
||||
hashes := make([]string, 0, len(tasks)) |
||||
for _, task := range tasks { |
||||
hash := hex.EncodeToString(task.blockHash) |
||||
hashes = append(hashes, hash) |
||||
} |
||||
return hashes |
||||
} |
||||
|
||||
func (tasks syncBlockTasks) indexes() []int { |
||||
indexes := make([]int, 0, len(tasks)) |
||||
for _, task := range tasks { |
||||
indexes = append(indexes, task.index) |
||||
} |
||||
return indexes |
||||
} |
||||
|
||||
// SyncConfig contains an array of SyncPeerConfig.
|
||||
type SyncConfig struct { |
||||
// mtx locks peers, and *SyncPeerConfig pointers in peers.
|
||||
// SyncPeerConfig itself is guarded by its own mutex.
|
||||
mtx sync.RWMutex |
||||
reservedPeers []*SyncPeerConfig |
||||
peers []*SyncPeerConfig |
||||
} |
||||
|
||||
// AddPeer adds the given sync peer.
|
||||
func (sc *SyncConfig) AddPeer(peer *SyncPeerConfig) { |
||||
sc.mtx.Lock() |
||||
defer sc.mtx.Unlock() |
||||
|
||||
// Ensure no duplicate peers
|
||||
for _, p2 := range sc.peers { |
||||
if peer.IsEqual(p2) { |
||||
return |
||||
} |
||||
} |
||||
sc.peers = append(sc.peers, peer) |
||||
} |
||||
|
||||
// SelectRandomPeers limits number of peers to release some server end sources.
|
||||
func (sc *SyncConfig) SelectRandomPeers(peers []p2p.Peer, randSeed int64) int { |
||||
numPeers := len(peers) |
||||
targetSize := calcNumPeersWithBound(numPeers, NumPeersLowBound, numPeersHighBound) |
||||
// if number of peers is less than required number, keep all in list
|
||||
if numPeers <= targetSize { |
||||
utils.Logger().Warn(). |
||||
Int("num connected peers", numPeers). |
||||
Msg("[STAGED_SYNC] not enough connected peers to sync, still sync will on going") |
||||
return numPeers |
||||
} |
||||
//shuffle peers list
|
||||
r := rand.New(rand.NewSource(randSeed)) |
||||
r.Shuffle(numPeers, func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) |
||||
|
||||
return targetSize |
||||
} |
||||
|
||||
// calcNumPeersWithBound calculates the number of connected peers with bound
|
||||
// peers are expected to limited at half of the size, capped between lowBound and highBound.
|
||||
func calcNumPeersWithBound(size int, lowBound, highBound int) int { |
||||
if size < lowBound { |
||||
return size |
||||
} |
||||
expLen := size / 2 |
||||
if expLen < lowBound { |
||||
expLen = lowBound |
||||
} |
||||
if expLen > highBound { |
||||
expLen = highBound |
||||
} |
||||
return expLen |
||||
} |
||||
|
||||
// ForEachPeer calls the given function with each peer.
|
||||
// It breaks the iteration iff the function returns true.
|
||||
func (sc *SyncConfig) ForEachPeer(f func(peer *SyncPeerConfig) (brk bool)) { |
||||
sc.mtx.RLock() |
||||
peers := make([]*SyncPeerConfig, len(sc.peers)) |
||||
copy(peers, sc.peers) |
||||
sc.mtx.RUnlock() |
||||
|
||||
for _, peer := range peers { |
||||
if f(peer) { |
||||
break |
||||
} |
||||
} |
||||
} |
||||
|
||||
// RemovePeer removes a peer from SyncConfig
|
||||
func (sc *SyncConfig) RemovePeer(peer *SyncPeerConfig, reason string) { |
||||
sc.mtx.Lock() |
||||
defer sc.mtx.Unlock() |
||||
|
||||
peer.client.Close() |
||||
for i, p := range sc.peers { |
||||
if p == peer { |
||||
sc.peers = append(sc.peers[:i], sc.peers[i+1:]...) |
||||
break |
||||
} |
||||
} |
||||
utils.Logger().Info(). |
||||
Str("peerIP", peer.ip). |
||||
Str("peerPortMsg", peer.port). |
||||
Str("reason", reason). |
||||
Msg("[STAGED_SYNC] remove GRPC peer") |
||||
} |
||||
|
||||
// ReplacePeerWithReserved tries to replace a peer from reserved peer list
|
||||
func (sc *SyncConfig) ReplacePeerWithReserved(peer *SyncPeerConfig, reason string) { |
||||
sc.mtx.Lock() |
||||
defer sc.mtx.Unlock() |
||||
|
||||
peer.client.Close() |
||||
for i, p := range sc.peers { |
||||
if p == peer { |
||||
if len(sc.reservedPeers) > 0 { |
||||
sc.peers = append(sc.peers[:i], sc.peers[i+1:]...) |
||||
sc.peers = append(sc.peers, sc.reservedPeers[0]) |
||||
utils.Logger().Info(). |
||||
Str("peerIP", peer.ip). |
||||
Str("peerPort", peer.port). |
||||
Str("reservedPeerIP", sc.reservedPeers[0].ip). |
||||
Str("reservedPeerPort", sc.reservedPeers[0].port). |
||||
Str("reason", reason). |
||||
Msg("[STAGED_SYNC] replaced GRPC peer by reserved") |
||||
sc.reservedPeers = sc.reservedPeers[1:] |
||||
} else { |
||||
sc.peers = append(sc.peers[:i], sc.peers[i+1:]...) |
||||
utils.Logger().Info(). |
||||
Str("peerIP", peer.ip). |
||||
Str("peerPortMsg", peer.port). |
||||
Str("reason", reason). |
||||
Msg("[STAGED_SYNC] remove GRPC peer without replacement") |
||||
} |
||||
break |
||||
} |
||||
} |
||||
} |
||||
|
||||
// CloseConnections close grpc connections for state sync clients
|
||||
func (sc *SyncConfig) CloseConnections() { |
||||
sc.mtx.RLock() |
||||
defer sc.mtx.RUnlock() |
||||
for _, pc := range sc.peers { |
||||
pc.client.Close() |
||||
} |
||||
} |
||||
|
||||
// FindPeerByHash returns the peer with the given hash, or nil if not found.
|
||||
func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig { |
||||
sc.mtx.RLock() |
||||
defer sc.mtx.RUnlock() |
||||
for _, pc := range sc.peers { |
||||
if bytes.Equal(pc.peerHash, peerHash) { |
||||
return pc |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
|
||||
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
|
||||
// Caller shall ensure mtx is locked for reading.
|
||||
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) { |
||||
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
|
||||
if len(sc.peers) == 0 { |
||||
return -1, 0 |
||||
} else if len(sc.peers) == 1 { |
||||
return 0, 1 |
||||
} |
||||
maxFirstID := len(sc.peers) - 1 |
||||
for i := maxFirstID - 1; i >= 0; i-- { |
||||
if CompareSyncPeerConfigByblockHashes(sc.peers[maxFirstID], sc.peers[i]) != 0 { |
||||
break |
||||
} |
||||
maxFirstID = i |
||||
} |
||||
maxCount := len(sc.peers) - maxFirstID |
||||
return maxFirstID, maxCount |
||||
} |
||||
|
||||
// InitForTesting used for testing.
|
||||
func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { |
||||
sc.mtx.RLock() |
||||
defer sc.mtx.RUnlock() |
||||
for i := range sc.peers { |
||||
sc.peers[i].blockHashes = blockHashes |
||||
sc.peers[i].client = client |
||||
} |
||||
} |
||||
|
||||
// cleanUpPeers cleans up all peers whose blockHashes are not equal to
|
||||
// consensus block hashes. Caller shall ensure mtx is locked for RW.
|
||||
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) { |
||||
fixedPeer := sc.peers[maxFirstID] |
||||
countBeforeCleanUp := len(sc.peers) |
||||
for i := 0; i < len(sc.peers); i++ { |
||||
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 { |
||||
// TODO: move it into a util delete func.
|
||||
// See tip https://github.com/golang/go/wiki/SliceTricks
|
||||
// Close the client and remove the peer out of the
|
||||
sc.peers[i].client.Close() |
||||
copy(sc.peers[i:], sc.peers[i+1:]) |
||||
sc.peers[len(sc.peers)-1] = nil |
||||
sc.peers = sc.peers[:len(sc.peers)-1] |
||||
} |
||||
} |
||||
if len(sc.peers) < countBeforeCleanUp { |
||||
utils.Logger().Debug(). |
||||
Int("removed peers", len(sc.peers)-countBeforeCleanUp). |
||||
Msg("[STAGED_SYNC] cleanUpPeers: a few peers removed") |
||||
} |
||||
} |
||||
|
||||
// cleanUpInvalidPeers cleans up all peers whose missed any required block hash or sent any invalid block hash
|
||||
// Caller shall ensure mtx is locked for RW.
|
||||
func (sc *SyncConfig) cleanUpInvalidPeers(ipm map[string]bool) { |
||||
sc.mtx.Lock() |
||||
defer sc.mtx.Unlock() |
||||
countBeforeCleanUp := len(sc.peers) |
||||
for i := 0; i < len(sc.peers); i++ { |
||||
if ipm[string(sc.peers[i].peerHash)] == true { |
||||
sc.peers[i].client.Close() |
||||
copy(sc.peers[i:], sc.peers[i+1:]) |
||||
sc.peers[len(sc.peers)-1] = nil |
||||
sc.peers = sc.peers[:len(sc.peers)-1] |
||||
} |
||||
} |
||||
if len(sc.peers) < countBeforeCleanUp { |
||||
utils.Logger().Debug(). |
||||
Int("removed peers", len(sc.peers)-countBeforeCleanUp). |
||||
Msg("[STAGED_SYNC] cleanUpPeers: a few peers removed") |
||||
} |
||||
} |
||||
|
||||
// GetBlockHashesConsensusAndCleanUp selects the most common peer config based on their block hashes to download/sync.
|
||||
// Note that choosing the most common peer config does not guarantee that the blocks to be downloaded are the correct ones.
|
||||
// The subsequent node syncing steps of verifying the block header chain will give such confirmation later.
|
||||
// If later block header verification fails with the sync peer config chosen here, the entire sync loop gets retried with a new peer set.
|
||||
func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp(bgMode bool) error { |
||||
sc.mtx.Lock() |
||||
defer sc.mtx.Unlock() |
||||
// Sort all peers by the blockHashes.
|
||||
sort.Slice(sc.peers, func(i, j int) bool { |
||||
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1 |
||||
}) |
||||
maxFirstID, maxCount := sc.getHowManyMaxConsensus() |
||||
if maxFirstID == -1 { |
||||
return errors.New("invalid peer index -1 for block hashes query") |
||||
} |
||||
utils.Logger().Info(). |
||||
Int("maxFirstID", maxFirstID). |
||||
Str("targetPeerIP", sc.peers[maxFirstID].ip). |
||||
Int("maxCount", maxCount). |
||||
Int("hashSize", len(sc.peers[maxFirstID].blockHashes)). |
||||
Msg("[STAGED_SYNC] block consensus hashes") |
||||
|
||||
if bgMode { |
||||
if maxCount != len(sc.peers) { |
||||
return ErrNodeNotEnoughBlockHashes |
||||
} |
||||
} else { |
||||
sc.cleanUpPeers(maxFirstID) |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,90 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"sync" |
||||
"time" |
||||
|
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
) |
||||
|
||||
const ( |
||||
// syncStatusExpiration is the expiration time out of a sync status.
|
||||
// If last sync result in memory is before the expiration, the sync status
|
||||
// will be updated.
|
||||
syncStatusExpiration = 6 * time.Second |
||||
|
||||
// syncStatusExpirationNonValidator is the expiration of sync cache for non-validators.
|
||||
// Compared with non-validator, the sync check is not as strict as validator nodes.
|
||||
// TODO: add this field to harmony config
|
||||
syncStatusExpirationNonValidator = 12 * time.Second |
||||
) |
||||
|
||||
type ( |
||||
syncStatus struct { |
||||
lastResult SyncCheckResult |
||||
MaxPeersHeight uint64 |
||||
currentCycle SyncCycle |
||||
lastUpdateTime time.Time |
||||
lock sync.RWMutex |
||||
expiration time.Duration |
||||
} |
||||
|
||||
SyncCheckResult struct { |
||||
IsSynchronized bool |
||||
OtherHeight uint64 |
||||
HeightDiff uint64 |
||||
} |
||||
|
||||
SyncCycle struct { |
||||
Number uint64 |
||||
StartHash []byte |
||||
TargetHeight uint64 |
||||
ExtraHashes map[uint64][]byte |
||||
lock sync.RWMutex |
||||
} |
||||
) |
||||
|
||||
func NewSyncStatus(role nodeconfig.Role) syncStatus { |
||||
expiration := getSyncStatusExpiration(role) |
||||
return syncStatus{ |
||||
expiration: expiration, |
||||
} |
||||
} |
||||
|
||||
func getSyncStatusExpiration(role nodeconfig.Role) time.Duration { |
||||
switch role { |
||||
case nodeconfig.Validator: |
||||
return syncStatusExpiration |
||||
case nodeconfig.ExplorerNode: |
||||
return syncStatusExpirationNonValidator |
||||
default: |
||||
return syncStatusExpirationNonValidator |
||||
} |
||||
} |
||||
|
||||
func (status *syncStatus) Get(fallback func() SyncCheckResult) SyncCheckResult { |
||||
status.lock.RLock() |
||||
if !status.expired() { |
||||
result := status.lastResult |
||||
status.lock.RUnlock() |
||||
return result |
||||
} |
||||
status.lock.RUnlock() |
||||
|
||||
status.lock.Lock() |
||||
defer status.lock.Unlock() |
||||
if status.expired() { |
||||
result := fallback() |
||||
status.update(result) |
||||
} |
||||
return status.lastResult |
||||
} |
||||
|
||||
func (status *syncStatus) expired() bool { |
||||
return time.Since(status.lastUpdateTime) > status.expiration |
||||
} |
||||
|
||||
func (status *syncStatus) update(result SyncCheckResult) { |
||||
status.lastUpdateTime = time.Now() |
||||
status.lastResult = result |
||||
} |
@ -0,0 +1,292 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/c2h5oh/datasize" |
||||
"github.com/harmony-one/harmony/consensus" |
||||
"github.com/harmony-one/harmony/core" |
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/node/worker" |
||||
"github.com/harmony-one/harmony/shard" |
||||
"github.com/ledgerwatch/erigon-lib/kv" |
||||
|
||||
"github.com/ledgerwatch/erigon-lib/kv/mdbx" |
||||
"github.com/ledgerwatch/log/v3" |
||||
) |
||||
|
||||
const ( |
||||
BlockHashesBucket = "BlockHashes" |
||||
BeaconBlockHashesBucket = "BeaconBlockHashes" |
||||
DownloadedBlocksBucket = "BlockBodies" |
||||
BeaconDownloadedBlocksBucket = "BeaconBlockBodies" // Beacon Block bodies are downloaded, TxHash and UncleHash are getting verified
|
||||
LastMileBlocksBucket = "LastMileBlocks" // last mile blocks to catch up with the consensus
|
||||
StageProgressBucket = "StageProgress" |
||||
|
||||
// cache db keys
|
||||
LastBlockHeight = "LastBlockHeight" |
||||
LastBlockHash = "LastBlockHash" |
||||
|
||||
// cache db names
|
||||
BlockHashesCacheDB = "cache_block_hashes" |
||||
BlockCacheDB = "cache_blocks" |
||||
) |
||||
|
||||
var Buckets = []string{ |
||||
BlockHashesBucket, |
||||
BeaconBlockHashesBucket, |
||||
DownloadedBlocksBucket, |
||||
BeaconDownloadedBlocksBucket, |
||||
LastMileBlocksBucket, |
||||
StageProgressBucket, |
||||
} |
||||
|
||||
// CreateStagedSync creates an instance of staged sync
|
||||
func CreateStagedSync( |
||||
ip string, |
||||
port string, |
||||
peerHash [20]byte, |
||||
bc core.BlockChain, |
||||
role nodeconfig.Role, |
||||
isExplorer bool, |
||||
TurboMode bool, |
||||
UseMemDB bool, |
||||
doubleCheckBlockHashes bool, |
||||
maxBlocksPerCycle uint64, |
||||
maxBackgroundBlocks uint64, |
||||
maxMemSyncCycleSize uint64, |
||||
verifyAllSig bool, |
||||
verifyHeaderBatchSize uint64, |
||||
insertChainBatchSize int, |
||||
logProgress bool, |
||||
) (*StagedSync, error) { |
||||
|
||||
ctx := context.Background() |
||||
isBeacon := bc.ShardID() == shard.BeaconChainShardID |
||||
|
||||
var db kv.RwDB |
||||
if UseMemDB { |
||||
// maximum Blocks in memory is maxMemSyncCycleSize + maxBackgroundBlocks
|
||||
var dbMapSize datasize.ByteSize |
||||
if isBeacon { |
||||
// for memdb, maximum 512 kb for beacon chain each block (in average) should be enough
|
||||
dbMapSize = datasize.ByteSize(maxMemSyncCycleSize+maxBackgroundBlocks) * 512 * datasize.KB |
||||
} else { |
||||
// for memdb, maximum 256 kb for each shard chains block (in average) should be enough
|
||||
dbMapSize = datasize.ByteSize(maxMemSyncCycleSize+maxBackgroundBlocks) * 256 * datasize.KB |
||||
} |
||||
// we manually create memory db because "db = memdb.New()" sets the default map size (64 MB) which is not enough for some cases
|
||||
db = mdbx.NewMDBX(log.New()).MapSize(dbMapSize).InMem("cache_db").MustOpen() |
||||
} else { |
||||
if isBeacon { |
||||
db = mdbx.NewMDBX(log.New()).Path("cache_beacon_db").MustOpen() |
||||
} else { |
||||
db = mdbx.NewMDBX(log.New()).Path("cache_shard_db").MustOpen() |
||||
} |
||||
} |
||||
|
||||
if errInitDB := initDB(ctx, db); errInitDB != nil { |
||||
return nil, errInitDB |
||||
} |
||||
|
||||
headsCfg := NewStageHeadersCfg(ctx, bc, db) |
||||
blockHashesCfg := NewStageBlockHashesCfg(ctx, bc, db, isBeacon, TurboMode, logProgress) |
||||
bodiesCfg := NewStageBodiesCfg(ctx, bc, db, isBeacon, TurboMode, logProgress) |
||||
statesCfg := NewStageStatesCfg(ctx, bc, db, logProgress) |
||||
lastMileCfg := NewStageLastMileCfg(ctx, bc, db) |
||||
finishCfg := NewStageFinishCfg(ctx, db) |
||||
|
||||
stages := DefaultStages(ctx, |
||||
headsCfg, |
||||
blockHashesCfg, |
||||
bodiesCfg, |
||||
statesCfg, |
||||
lastMileCfg, |
||||
finishCfg, |
||||
) |
||||
|
||||
return New(ctx, |
||||
ip, |
||||
port, |
||||
peerHash, |
||||
bc, |
||||
role, |
||||
isBeacon, |
||||
isExplorer, |
||||
db, |
||||
stages, |
||||
DefaultRevertOrder, |
||||
DefaultCleanUpOrder, |
||||
TurboMode, |
||||
UseMemDB, |
||||
doubleCheckBlockHashes, |
||||
maxBlocksPerCycle, |
||||
maxBackgroundBlocks, |
||||
maxMemSyncCycleSize, |
||||
verifyAllSig, |
||||
verifyHeaderBatchSize, |
||||
insertChainBatchSize, |
||||
logProgress, |
||||
), nil |
||||
} |
||||
|
||||
// initDB inits sync loop main database and create buckets
|
||||
func initDB(ctx context.Context, db kv.RwDB) error { |
||||
tx, errRW := db.BeginRw(ctx) |
||||
if errRW != nil { |
||||
return errRW |
||||
} |
||||
defer tx.Rollback() |
||||
for _, name := range Buckets { |
||||
// create bucket
|
||||
if err := tx.CreateBucket(GetStageName(name, false, false)); err != nil { |
||||
return err |
||||
} |
||||
// create bucket for beacon
|
||||
if err := tx.CreateBucket(GetStageName(name, true, false)); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
if err := tx.Commit(); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// SyncLoop will keep syncing with peers until catches up
|
||||
func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) { |
||||
|
||||
utils.Logger().Info(). |
||||
Uint64("current height", bc.CurrentBlock().NumberU64()). |
||||
Msgf("staged sync is executing ... ") |
||||
|
||||
if !s.IsBeacon() { |
||||
s.RegisterNodeInfo() |
||||
} |
||||
|
||||
// get max peers height
|
||||
maxPeersHeight, err := s.getMaxPeerHeight() |
||||
if err != nil { |
||||
return |
||||
} |
||||
utils.Logger().Info(). |
||||
Uint64("maxPeersHeight", maxPeersHeight). |
||||
Msgf("[STAGED_SYNC] max peers height") |
||||
s.syncStatus.MaxPeersHeight = maxPeersHeight |
||||
|
||||
for { |
||||
if len(s.syncConfig.peers) < NumPeersLowBound { |
||||
// TODO: try to use reserved nodes
|
||||
utils.Logger().Warn(). |
||||
Int("num peers", len(s.syncConfig.peers)). |
||||
Msgf("[STAGED_SYNC] Not enough connected peers") |
||||
break |
||||
} |
||||
startHead := bc.CurrentBlock().NumberU64() |
||||
|
||||
if startHead >= maxPeersHeight { |
||||
utils.Logger().Info(). |
||||
Bool("isBeacon", isBeacon). |
||||
Uint32("shard", bc.ShardID()). |
||||
Uint64("maxPeersHeight", maxPeersHeight). |
||||
Uint64("currentHeight", startHead). |
||||
Msgf("[STAGED_SYNC] Node is now IN SYNC!") |
||||
break |
||||
} |
||||
startTime := time.Now() |
||||
|
||||
if err := s.runSyncCycle(bc, worker, isBeacon, consensus, maxPeersHeight); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Bool("isBeacon", isBeacon). |
||||
Uint32("shard", bc.ShardID()). |
||||
Uint64("currentHeight", startHead). |
||||
Msgf("[STAGED_SYNC] sync cycle failed") |
||||
break |
||||
} |
||||
|
||||
if loopMinTime != 0 { |
||||
waitTime := loopMinTime - time.Since(startTime) |
||||
utils.Logger().Debug(). |
||||
Bool("isBeacon", isBeacon). |
||||
Uint32("shard", bc.ShardID()). |
||||
Interface("duration", waitTime). |
||||
Msgf("[STAGED SYNC] Node is syncing ..., it's waiting a few seconds until next loop") |
||||
c := time.After(waitTime) |
||||
select { |
||||
case <-s.Context().Done(): |
||||
return |
||||
case <-c: |
||||
} |
||||
} |
||||
|
||||
// calculating sync speed (blocks/second)
|
||||
currHead := bc.CurrentBlock().NumberU64() |
||||
if s.LogProgress && currHead-startHead > 0 { |
||||
dt := time.Now().Sub(startTime).Seconds() |
||||
speed := float64(0) |
||||
if dt > 0 { |
||||
speed = float64(currHead-startHead) / dt |
||||
} |
||||
syncSpeed := fmt.Sprintf("%.2f", speed) |
||||
fmt.Println("sync speed:", syncSpeed, "blocks/s (", currHead, "/", maxPeersHeight, ")") |
||||
} |
||||
|
||||
s.syncStatus.currentCycle.lock.Lock() |
||||
s.syncStatus.currentCycle.Number++ |
||||
s.syncStatus.currentCycle.lock.Unlock() |
||||
|
||||
} |
||||
|
||||
if consensus != nil { |
||||
if err := s.addConsensusLastMile(s.Blockchain(), consensus); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("[STAGED_SYNC] Add consensus last mile") |
||||
} |
||||
// TODO: move this to explorer handler code.
|
||||
if s.isExplorer { |
||||
consensus.UpdateConsensusInformation() |
||||
} |
||||
} |
||||
s.purgeAllBlocksFromCache() |
||||
utils.Logger().Info(). |
||||
Uint64("new height", bc.CurrentBlock().NumberU64()). |
||||
Msgf("staged sync is executed") |
||||
return |
||||
} |
||||
|
||||
// runSyncCycle will run one cycle of staged syncing
|
||||
func (s *StagedSync) runSyncCycle(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, maxPeersHeight uint64) error { |
||||
canRunCycleInOneTransaction := s.MaxBlocksPerSyncCycle > 0 && s.MaxBlocksPerSyncCycle <= s.MaxMemSyncCycleSize |
||||
var tx kv.RwTx |
||||
if canRunCycleInOneTransaction { |
||||
var err error |
||||
if tx, err = s.DB().BeginRw(context.Background()); err != nil { |
||||
return err |
||||
} |
||||
defer tx.Rollback() |
||||
} |
||||
// Do one cycle of staged sync
|
||||
initialCycle := s.syncStatus.currentCycle.Number == 0 |
||||
syncErr := s.Run(s.DB(), tx, initialCycle) |
||||
if syncErr != nil { |
||||
utils.Logger().Error(). |
||||
Err(syncErr). |
||||
Bool("isBeacon", s.IsBeacon()). |
||||
Uint32("shard", s.Blockchain().ShardID()). |
||||
Msgf("[STAGED_SYNC] Sync loop failed") |
||||
s.purgeOldBlocksFromCache() |
||||
return syncErr |
||||
} |
||||
if tx != nil { |
||||
errTx := tx.Commit() |
||||
if errTx != nil { |
||||
return errTx |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,38 @@ |
||||
package stagedsync |
||||
|
||||
import ( |
||||
"time" |
||||
|
||||
"github.com/Workiva/go-datastructures/queue" |
||||
) |
||||
|
||||
// downloadTaskQueue is wrapper around Queue with item to be SyncBlockTask
|
||||
type downloadTaskQueue struct { |
||||
q *queue.Queue |
||||
} |
||||
|
||||
func (queue downloadTaskQueue) poll(num int64, timeOut time.Duration) (syncBlockTasks, error) { |
||||
items, err := queue.q.Poll(num, timeOut) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
tasks := make(syncBlockTasks, 0, len(items)) |
||||
for _, item := range items { |
||||
task := item.(SyncBlockTask) |
||||
tasks = append(tasks, task) |
||||
} |
||||
return tasks, nil |
||||
} |
||||
|
||||
func (queue downloadTaskQueue) put(tasks syncBlockTasks) error { |
||||
for _, task := range tasks { |
||||
if err := queue.q.Put(task); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (queue downloadTaskQueue) empty() bool { |
||||
return queue.q.Empty() |
||||
} |
Loading…
Reference in new issue