fix WriteHeadBlock, fix GetDownloadDetails index, improve fetching current block in a few stages, improve pivot calculation

pull/4465/head
“GheisMohammadi” 1 year ago
parent bdd7f142c7
commit 135c7da455
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 8
      api/service/stagedstreamsync/block_manager.go
  2. 2
      api/service/stagedstreamsync/const.go
  3. 3
      api/service/stagedstreamsync/stage_heads.go
  4. 9
      api/service/stagedstreamsync/stage_receipts.go
  5. 9
      api/service/stagedstreamsync/stage_state.go
  6. 52
      api/service/stagedstreamsync/stage_statesync.go
  7. 18
      api/service/stagedstreamsync/syncing.go
  8. 25
      core/blockchain_impl.go
  9. 2
      core/rawdb/accessors_offchain.go

@ -1,6 +1,7 @@
package stagedstreamsync
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
@ -118,11 +119,14 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
}
// GetDownloadDetails returns the download details for a block
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID) {
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID, err error) {
gbm.lock.Lock()
defer gbm.lock.Unlock()
return gbm.bdd[blockNumber].loopID, gbm.bdd[blockNumber].streamID
if dm, exist := gbm.bdd[blockNumber]; exist {
return dm.loopID, dm.streamID, nil
}
return 0, sttypes.StreamID(0), fmt.Errorf("there is no download details for the block number: %d", blockNumber)
}
// SetRootHash sets the root hash for a specific block

@ -40,7 +40,7 @@ const (
ShortRangeTimeout time.Duration = 1 * time.Minute
// pivot block distance ranges
MinPivotDistanceToHead uint64 = 1028
MinPivotDistanceToHead uint64 = 1024
MaxPivotDistanceToHead uint64 = 2048
)

@ -91,8 +91,7 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock
// check pivot: if chain hasn't reached to pivot yet
if s.state.status.cycleSyncMode != FullSync && s.state.status.pivotBlock != nil {
// set target height on the block before pivot
// pivot block would be downloaded by StateSync stage
// set target height on the pivot block
if !s.state.status.statesSynced && targetHeight > s.state.status.pivotBlock.NumberU64() {
targetHeight = s.state.status.pivotBlock.NumberU64()
}

@ -238,7 +238,14 @@ func (r *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDo
for _, bn := range batch {
blkKey := marshalData(bn)
loopID, _ := gbm.GetDownloadDetails(bn)
loopID, _, errBDD := gbm.GetDownloadDetails(bn)
if errBDD != nil {
utils.Logger().Warn().
Err(errBDD).
Interface("block numbers", bn).
Msg(WrapStagedSyncMsg("get block download details failed"))
return
}
blockBytes, err := txs[loopID].GetOne(BlocksBucket, blkKey)
if err != nil {
return

@ -69,11 +69,11 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
}
maxHeight := s.state.status.targetBN
currentHead := stg.configs.bc.CurrentBlock().NumberU64()
currentHead := s.state.CurrentBlockNumber()
if currentHead >= maxHeight {
return nil
}
currProgress := stg.configs.bc.CurrentBlock().NumberU64()
currProgress := currentHead
targetHeight := s.state.currentCycle.TargetHeight
if currProgress >= targetHeight {
return nil
@ -115,7 +115,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
for i := currProgress + 1; i <= targetHeight; i++ {
blkKey := marshalData(i)
loopID, streamID := gbm.GetDownloadDetails(i)
loopID, streamID, errBDD := gbm.GetDownloadDetails(i)
if errBDD != nil {
return errBDD
}
blockBytes, err := txs[loopID].GetOne(BlocksBucket, blkKey)
if err != nil {

@ -55,36 +55,37 @@ func NewStageStateSyncCfg(bc core.BlockChain,
// Exec progresses States stage in the forward direction
func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil || s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() {
return nil
}
// for short range sync, skip this step
if !s.state.initSync {
return nil
}
maxHeight := s.state.status.targetBN
currentHead := s.state.CurrentBlockNumber()
if currentHead >= maxHeight {
return nil
}
currProgress := s.state.CurrentBlockNumber()
targetHeight := s.state.currentCycle.TargetHeight
if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error {
if currProgress, err = s.CurrentStageProgress(etx); err != nil {
return err
}
} // only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced {
return nil
}); errV != nil {
return errV
}
if currProgress >= targetHeight {
return nil
}
// maxHeight := s.state.status.targetBN
// currentHead := s.state.CurrentBlockNumber()
// if currentHead >= maxHeight {
// return nil
// }
// currProgress := s.state.CurrentBlockNumber()
// targetHeight := s.state.currentCycle.TargetHeight
// if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error {
// if currProgress, err = s.CurrentStageProgress(etx); err != nil {
// return err
// }
// return nil
// }); errV != nil {
// return errV
// }
// if currProgress >= targetHeight {
// return nil
// }
useInternalTx := tx == nil
if useInternalTx {
var err error
@ -104,8 +105,9 @@ func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bo
// Fetch states from neighbors
pivotRootHash := s.state.status.pivotBlock.Root()
currentBlockRootHash := s.state.bc.CurrentFastBlock().Root()
sdm := newStateDownloadManager(tx, sss.configs.bc, sss.configs.concurrency, s.state.logger)
sdm.setRootHash(pivotRootHash)
sdm.setRootHash(currentBlockRootHash)
var wg sync.WaitGroup
for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1)

@ -237,20 +237,14 @@ func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint6
}
pivotBlockNumber := uint64(0)
if curPivot := rawdb.ReadLastPivotNumber(s.bc.ChainDb()); curPivot != nil {
var curPivot *uint64
if curPivot = rawdb.ReadLastPivotNumber(s.bc.ChainDb()); curPivot != nil {
// if head is behind pivot, that means it is still on fast/snap sync mode
if head := s.CurrentBlockNumber(); head < *curPivot {
pivotBlockNumber = *curPivot
// pivot could be moved forward if it is far from head
if pivotBlockNumber < estimatedHeight-MaxPivotDistanceToHead {
pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead
if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil {
s.logger.Warn().Err(err).
Uint64("current pivot number", *curPivot).
Uint64("new pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("update pivot number failed"))
pivotBlockNumber = *curPivot
}
}
}
} else {
@ -270,6 +264,14 @@ func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint6
Msg(WrapStagedSyncMsg("query peers for pivot block failed"))
return block, FastSync, err
} else {
if curPivot == nil || pivotBlockNumber != *curPivot {
if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil {
s.logger.Warn().Err(err).
Uint64("new pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("update pivot number failed"))
return block, FastSync, err
}
}
s.status.pivotBlock = block
s.logger.Info().
Uint64("estimatedHeight", estimatedHeight).

@ -852,6 +852,20 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
if err := rawdb.WriteHeadBlockHash(batch, block.Hash()); err != nil {
return err
}
if err := rawdb.WriteHeadHeaderHash(batch, block.Hash()); err != nil {
return err
}
isNewEpoch := block.IsLastBlockInEpoch()
if isNewEpoch {
epoch := block.Header().Epoch()
nextEpoch := epoch.Add(epoch, common.Big1)
if err := rawdb.WriteShardStateBytes(batch, nextEpoch, block.Header().ShardState()); err != nil {
utils.Logger().Error().Err(err).Msg("failed to store shard state")
return err
}
}
if err := batch.Write(); err != nil {
return err
}
@ -1328,6 +1342,17 @@ func (bc *BlockChainImpl) InsertReceiptChain(blockChain types.Blocks, receiptCha
return 0, err
}
isNewEpoch := block.IsLastBlockInEpoch()
if isNewEpoch {
epoch := block.Header().Epoch()
nextEpoch := epoch.Add(epoch, common.Big1)
err := rawdb.WriteShardStateBytes(batch, nextEpoch, block.Header().ShardState())
if err != nil {
utils.Logger().Error().Err(err).Msg("failed to store shard state")
return 0, err
}
}
stats.processed++
if batch.ValueSize() >= ethdb.IdealBatchSize {

@ -22,7 +22,7 @@ func ReadShardState(
data, err := db.Get(shardStateKey(epoch))
if err != nil {
return nil, errors.Errorf(
MsgNoShardStateFromDB, "epoch: %d", epoch,
MsgNoShardStateFromDB, "epoch: %d", epoch.Uint64(),
)
}
ss, err2 := shard.DecodeWrapper(data)

Loading…
Cancel
Save