Merge pull request #4465 from harmony-one/feature/fastsync

Initial Version of Fast Sync
pull/4589/head
Adam Androulidakis 12 months ago committed by GitHub
commit c82599b6f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      api/service/stagedstreamsync/adapter.go
  2. 48
      api/service/stagedstreamsync/block_manager.go
  3. 29
      api/service/stagedstreamsync/const.go
  4. 78
      api/service/stagedstreamsync/default_stages.go
  5. 21
      api/service/stagedstreamsync/helpers.go
  6. 146
      api/service/stagedstreamsync/proof.go
  7. 84
      api/service/stagedstreamsync/range.go
  8. 180
      api/service/stagedstreamsync/receipt_download_manager.go
  9. 13
      api/service/stagedstreamsync/sig_verify.go
  10. 46
      api/service/stagedstreamsync/stage_bodies.go
  11. 10
      api/service/stagedstreamsync/stage_heads.go
  12. 398
      api/service/stagedstreamsync/stage_receipts.go
  13. 2
      api/service/stagedstreamsync/stage_short_range.go
  14. 14
      api/service/stagedstreamsync/stage_state.go
  15. 310
      api/service/stagedstreamsync/stage_statesync.go
  16. 469
      api/service/stagedstreamsync/stage_statesync_full.go
  17. 35
      api/service/stagedstreamsync/staged_stream_sync.go
  18. 2
      api/service/stagedstreamsync/stages.go
  19. 432
      api/service/stagedstreamsync/state_download_manager.go
  20. 2418
      api/service/stagedstreamsync/state_sync_full.go
  21. 164
      api/service/stagedstreamsync/syncing.go
  22. 3
      api/service/stagedstreamsync/types.go
  23. 5
      cmd/harmony/default.go
  24. 12
      cmd/harmony/flags.go
  25. 3
      cmd/harmony/main.go
  26. 24
      core/blockchain.go
  27. 271
      core/blockchain_impl.go
  28. 10
      core/blockchain_stub.go
  29. 5
      core/rawdb/accessors_chain.go
  30. 2
      core/rawdb/accessors_offchain.go
  31. 1
      internal/configs/harmony/harmony.go
  32. 7
      p2p/stream/protocols/sync/chain.go
  33. 27
      p2p/stream/protocols/sync/client.go

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
)
@ -20,6 +21,10 @@ type syncProtocol interface {
GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error)
GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error)
GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error)
GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error)
GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error)
GetByteCodes(ctx context.Context, hs []common.Hash, bytes uint64, opts ...syncproto.Option) (codes [][]byte, stid sttypes.StreamID, err error)
GetTrieNodes(ctx context.Context, root common.Hash, paths []*message.TrieNodePathSet, bytes uint64, opts ...syncproto.Option) (nodes [][]byte, stid sttypes.StreamID, err error)
RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string)

@ -1,8 +1,10 @@
package stagedstreamsync
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
@ -11,6 +13,7 @@ import (
type BlockDownloadDetails struct {
loopID int
streamID sttypes.StreamID
rootHash common.Hash
}
// blockDownloadManager is the helper structure for get blocks request management
@ -23,7 +26,7 @@ type blockDownloadManager struct {
processing map[uint64]struct{} // block numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rq *resultQueue // result queue wait to be inserted into blockchain
bdd map[uint64]BlockDownloadDetails // details about how this block was downloaded
bdd map[uint64]*BlockDownloadDetails // details about how this block was downloaded
logger zerolog.Logger
lock sync.Mutex
@ -38,26 +41,26 @@ func newBlockDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logg
processing: make(map[uint64]struct{}),
retries: newPrioritizedNumbers(),
rq: newResultQueue(),
bdd: make(map[uint64]BlockDownloadDetails),
bdd: make(map[uint64]*BlockDownloadDetails),
logger: logger,
}
}
// GetNextBatch get the next block numbers batch
func (gbm *blockDownloadManager) GetNextBatch() []uint64 {
func (gbm *blockDownloadManager) GetNextBatch(curHeight uint64) []uint64 {
gbm.lock.Lock()
defer gbm.lock.Unlock()
cap := BlocksPerRequest
bns := gbm.getBatchFromRetries(cap)
bns := gbm.getBatchFromRetries(cap, curHeight)
if len(bns) > 0 {
cap -= len(bns)
gbm.addBatchToRequesting(bns)
}
if gbm.availableForMoreTasks() {
addBNs := gbm.getBatchFromUnprocessed(cap)
addBNs := gbm.getBatchFromUnprocessed(cap, curHeight)
gbm.addBatchToRequesting(addBNs)
bns = append(bns, addBNs...)
}
@ -88,7 +91,7 @@ func (gbm *blockDownloadManager) HandleRequestResult(bns []uint64, blockBytes []
gbm.retries.push(bn)
} else {
gbm.processing[bn] = struct{}{}
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
@ -107,7 +110,7 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
defer gbm.lock.Unlock()
for _, bn := range bns {
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
@ -116,25 +119,43 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
}
// GetDownloadDetails returns the download details for a block
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID) {
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID, err error) {
gbm.lock.Lock()
defer gbm.lock.Unlock()
return gbm.bdd[blockNumber].loopID, gbm.bdd[blockNumber].streamID
if dm, exist := gbm.bdd[blockNumber]; exist {
return dm.loopID, dm.streamID, nil
}
return 0, sttypes.StreamID(fmt.Sprint(0)), fmt.Errorf("there is no download details for the block number: %d", blockNumber)
}
// SetRootHash sets the root hash for a specific block
func (gbm *blockDownloadManager) SetRootHash(blockNumber uint64, root common.Hash) {
gbm.lock.Lock()
defer gbm.lock.Unlock()
gbm.bdd[blockNumber].rootHash = root
}
// GetRootHash returns the root hash for a specific block
func (gbm *blockDownloadManager) GetRootHash(blockNumber uint64) common.Hash {
gbm.lock.Lock()
defer gbm.lock.Unlock()
return gbm.bdd[blockNumber].rootHash
}
// getBatchFromRetries get the block number batch to be requested from retries.
func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromRetries(cap int, fromBlockNumber uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
for cnt := 0; cnt < cap; cnt++ {
bn := gbm.retries.pop()
if bn == 0 {
break // no more retries
}
if bn <= curHeight {
if bn <= fromBlockNumber {
continue
}
requestBNs = append(requestBNs, bn)
@ -143,10 +164,9 @@ func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
}
// getBatchFromUnprocessed returns a batch of block numbers to be requested from unprocessed.
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int, curHeight uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
bn := curHeight + 1
// TODO: this algorithm can be potentially optimized.

@ -23,9 +23,35 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100
// number of get nodes by hashes for each request
StatesPerRequest int = 100
// maximum number of blocks for get receipts request
ReceiptsPerRequest int = 10
// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4
// MaxTriesToFetchNodeData is the maximum number of tries to fetch node data
MaxTriesToFetchNodeData int = 5
// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute
// pivot block distance ranges
MinPivotDistanceToHead uint64 = 1024
MaxPivotDistanceToHead uint64 = 2048
)
// SyncMode represents the synchronization mode of the downloader.
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32
const (
FullSync SyncMode = iota // Synchronize the entire blockchain history from full blocks
FastSync // Download all blocks and states
SnapSync // Download the chain and the state via compact snapshots
)
type (
@ -35,6 +61,9 @@ type (
// TODO: remove this when stream sync is fully up.
ServerOnly bool
// Synchronization mode of the downloader
SyncMode SyncMode
// parameters
Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests

@ -8,43 +8,101 @@ type ForwardOrder []SyncStageID
type RevertOrder []SyncStageID
type CleanUpOrder []SyncStageID
var DefaultForwardOrder = ForwardOrder{
var (
StagesForwardOrder ForwardOrder
StagesRevertOrder RevertOrder
StagesCleanUpOrder CleanUpOrder
)
func initStagesOrder(syncMode SyncMode) {
switch syncMode {
case FullSync:
initFullSyncStagesOrder()
case FastSync:
initFastSyncStagesOrder()
default:
panic("not supported sync mode")
}
}
func initFullSyncStagesOrder() {
StagesForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
States,
LastMile,
Finish,
}
StagesRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
StagesCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
}
func initFastSyncStagesOrder() {
StagesForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
// Stages below don't use Internet
Receipts,
StateSync,
States,
LastMile,
Finish,
}
var DefaultRevertOrder = RevertOrder{
StagesRevertOrder = RevertOrder{
Finish,
LastMile,
States,
StateSync,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
var DefaultCleanUpOrder = CleanUpOrder{
StagesCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
StateSync,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
}
func DefaultStages(ctx context.Context,
headsCfg StageHeadsCfg,
seCfg StageEpochCfg,
srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg,
stateSyncCfg StageStateSyncCfg,
statesCfg StageStatesCfg,
receiptsCfg StageReceiptsCfg,
lastMileCfg StageLastMileCfg,
finishCfg StageFinishCfg,
) []*Stage {
@ -54,6 +112,8 @@ func DefaultStages(ctx context.Context,
handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg)
handlerStageStateSync := NewStageStateSync(stateSyncCfg)
handlerStageReceipts := NewStageReceipts(receiptsCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg)
@ -83,6 +143,16 @@ func DefaultStages(ctx context.Context,
Description: "Update Blockchain State",
Handler: handlerStageStates,
},
{
ID: StateSync,
Description: "Retrieve States",
Handler: handlerStageStateSync,
},
{
ID: Receipts,
Description: "Retrieve Receipts",
Handler: handlerStageReceipts,
},
{
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",

@ -73,6 +73,27 @@ func checkGetBlockByHashesResult(blocks []*types.Block, hashes []common.Hash) er
return nil
}
func getBlockByMaxVote(blocks []*types.Block) (*types.Block, error) {
hashesVote := make(map[common.Hash]int)
maxVote := int(-1)
maxVotedBlockIndex := int(0)
for i, block := range blocks {
if block == nil {
continue
}
hashesVote[block.Header().Hash()]++
if hashesVote[block.Header().Hash()] > maxVote {
maxVote = hashesVote[block.Header().Hash()]
maxVotedBlockIndex = i
}
}
if maxVote < 0 {
return nil, ErrInvalidBlockBytes
}
return blocks[maxVotedBlockIndex], nil
}
func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.StreamID]struct{}) (common.Hash, map[sttypes.StreamID]struct{}) {
var (
voteM = make(map[common.Hash]int)

@ -0,0 +1,146 @@
package stagedstreamsync
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
)
// ProofSet stores a set of trie nodes. It implements trie.Database and can also
// act as a cache for another trie.Database.
type ProofSet struct {
nodes map[string][]byte
order []string
dataSize int
lock sync.RWMutex
}
// NewProofSet creates an empty node set
func NewProofSet() *ProofSet {
return &ProofSet{
nodes: make(map[string][]byte),
}
}
// Put stores a new node in the set
func (db *ProofSet) Put(key []byte, value []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
if _, ok := db.nodes[string(key)]; ok {
return nil
}
keystr := string(key)
db.nodes[keystr] = common.CopyBytes(value)
db.order = append(db.order, keystr)
db.dataSize += len(value)
return nil
}
// Delete removes a node from the set
func (db *ProofSet) Delete(key []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
delete(db.nodes, string(key))
return nil
}
// Get returns a stored node
func (db *ProofSet) Get(key []byte) ([]byte, error) {
db.lock.RLock()
defer db.lock.RUnlock()
if entry, ok := db.nodes[string(key)]; ok {
return entry, nil
}
return nil, errors.New("not found")
}
// Has returns true if the node set contains the given key
func (db *ProofSet) Has(key []byte) (bool, error) {
_, err := db.Get(key)
return err == nil, nil
}
// KeyCount returns the number of nodes in the set
func (db *ProofSet) KeyCount() int {
db.lock.RLock()
defer db.lock.RUnlock()
return len(db.nodes)
}
// DataSize returns the aggregated data size of nodes in the set
func (db *ProofSet) DataSize() int {
db.lock.RLock()
defer db.lock.RUnlock()
return db.dataSize
}
// List converts the node set to a ProofList
func (db *ProofSet) List() ProofList {
db.lock.RLock()
defer db.lock.RUnlock()
var values ProofList
for _, key := range db.order {
values = append(values, db.nodes[key])
}
return values
}
// Store writes the contents of the set to the given database
func (db *ProofSet) Store(target ethdb.KeyValueWriter) {
db.lock.RLock()
defer db.lock.RUnlock()
for key, value := range db.nodes {
target.Put([]byte(key), value)
}
}
// ProofList stores an ordered list of trie nodes. It implements ethdb.KeyValueWriter.
type ProofList []rlp.RawValue
// Store writes the contents of the list to the given database
func (n ProofList) Store(db ethdb.KeyValueWriter) {
for _, node := range n {
db.Put(crypto.Keccak256(node), node)
}
}
// Set converts the node list to a ProofSet
func (n ProofList) Set() *ProofSet {
db := NewProofSet()
n.Store(db)
return db
}
// Put stores a new node at the end of the list
func (n *ProofList) Put(key []byte, value []byte) error {
*n = append(*n, value)
return nil
}
// Delete panics as there's no reason to remove a node from the list.
func (n *ProofList) Delete(key []byte) error {
panic("not supported")
}
// DataSize returns the aggregated data size of nodes in the list
func (n ProofList) DataSize() int {
var size int
for _, node := range n {
size += len(node)
}
return size
}

@ -0,0 +1,84 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package stagedstreamsync
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
)
// hashSpace is the total size of the 256 bit hash space for accounts.
var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
// hashRange is a utility to handle ranges of hashes, Split up the
// hash-space into sections, and 'walk' over the sections
type hashRange struct {
current *uint256.Int
step *uint256.Int
}
// newHashRange creates a new hashRange, initiated at the start position,
// and with the step set to fill the desired 'num' chunks
func newHashRange(start common.Hash, num uint64) *hashRange {
left := new(big.Int).Sub(hashSpace, start.Big())
step := new(big.Int).Div(
new(big.Int).Add(left, new(big.Int).SetUint64(num-1)),
new(big.Int).SetUint64(num),
)
step256 := new(uint256.Int)
step256.SetFromBig(step)
return &hashRange{
current: new(uint256.Int).SetBytes32(start[:]),
step: step256,
}
}
// Next pushes the hash range to the next interval.
func (r *hashRange) Next() bool {
next, overflow := new(uint256.Int).AddOverflow(r.current, r.step)
if overflow {
return false
}
r.current = next
return true
}
// Start returns the first hash in the current interval.
func (r *hashRange) Start() common.Hash {
return r.current.Bytes32()
}
// End returns the last hash in the current interval.
func (r *hashRange) End() common.Hash {
// If the end overflows (non divisible range), return a shorter interval
next, overflow := new(uint256.Int).AddOverflow(r.current, r.step)
if overflow {
return common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
return next.SubUint64(next, 1).Bytes32()
}
// incHash returns the next hash, in lexicographical order (a.k.a plus one)
func incHash(h common.Hash) common.Hash {
var a uint256.Int
a.SetBytes32(h[:])
a.AddUint64(&a, 1)
return common.Hash(a.Bytes32())
}

@ -0,0 +1,180 @@
package stagedstreamsync
import (
"sync"
"github.com/harmony-one/harmony/core/types"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
)
type ReceiptDownloadDetails struct {
streamID sttypes.StreamID
}
type Received struct {
streamID sttypes.StreamID
block *types.Block
receipts types.Receipts
}
// receiptDownloadManager is the helper structure for get receipts request management
type receiptDownloadManager struct {
chain blockChain
tx kv.RwTx
targetBN uint64
requesting map[uint64]struct{} // receipt numbers that have been assigned to workers but not received
processing map[uint64]struct{} // receipt numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rdd map[uint64]ReceiptDownloadDetails // details about how this receipt was downloaded
received map[uint64]Received
logger zerolog.Logger
lock sync.Mutex
}
func newReceiptDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logger zerolog.Logger) *receiptDownloadManager {
return &receiptDownloadManager{
chain: chain,
tx: tx,
targetBN: targetBN,
requesting: make(map[uint64]struct{}),
processing: make(map[uint64]struct{}),
retries: newPrioritizedNumbers(),
rdd: make(map[uint64]ReceiptDownloadDetails),
received: make(map[uint64]Received),
logger: logger,
}
}
// GetNextBatch get the next receipt numbers batch
func (rdm *receiptDownloadManager) GetNextBatch(curHeight uint64) []uint64 {
rdm.lock.Lock()
defer rdm.lock.Unlock()
cap := ReceiptsPerRequest
bns := rdm.getBatchFromRetries(cap, curHeight)
if len(bns) > 0 {
cap -= len(bns)
rdm.addBatchToRequesting(bns)
}
if rdm.availableForMoreTasks() {
addBNs := rdm.getBatchFromUnprocessed(cap, curHeight)
rdm.addBatchToRequesting(addBNs)
bns = append(bns, addBNs...)
}
return bns
}
// HandleRequestError handles the error result
func (rdm *receiptDownloadManager) HandleRequestError(bns []uint64, err error) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
// add requested receipt numbers to retries
for _, bn := range bns {
delete(rdm.requesting, bn)
rdm.retries.push(bn)
}
}
// HandleRequestResult handles get receipts result
func (rdm *receiptDownloadManager) HandleRequestResult(bns []uint64, receivedReceipts []types.Receipts, receivedBlocks []*types.Block, streamID sttypes.StreamID) error {
rdm.lock.Lock()
defer rdm.lock.Unlock()
for i, bn := range bns {
delete(rdm.requesting, bn)
if !indexExists(receivedBlocks, i) || !indexExists(receivedReceipts, i) {
rdm.retries.push(bn)
} else {
rdm.processing[bn] = struct{}{}
rdm.rdd[bn] = ReceiptDownloadDetails{
streamID: streamID,
}
rdm.received[bn] = Received{
block: receivedBlocks[i],
receipts: receivedReceipts[i],
}
}
}
return nil
}
// SetDownloadDetails sets the download details for a batch of blocks
func (rdm *receiptDownloadManager) SetDownloadDetails(bns []uint64, streamID sttypes.StreamID) error {
rdm.lock.Lock()
defer rdm.lock.Unlock()
for _, bn := range bns {
rdm.rdd[bn] = ReceiptDownloadDetails{
streamID: streamID,
}
}
return nil
}
// GetDownloadDetails returns the download details for a certain block number
func (rdm *receiptDownloadManager) GetDownloadDetails(blockNumber uint64) (streamID sttypes.StreamID) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
return rdm.rdd[blockNumber].streamID
}
// getBatchFromRetries get the receipt number batch to be requested from retries.
func (rdm *receiptDownloadManager) getBatchFromRetries(cap int, fromBlockNumber uint64) []uint64 {
var (
requestBNs []uint64
)
for cnt := 0; cnt < cap; cnt++ {
bn := rdm.retries.pop()
if bn == 0 {
break // no more retries
}
if bn <= fromBlockNumber {
continue
}
requestBNs = append(requestBNs, bn)
}
return requestBNs
}
// getBatchFromUnprocessed returns a batch of receipt numbers to be requested from unprocessed.
func (rdm *receiptDownloadManager) getBatchFromUnprocessed(cap int, curHeight uint64) []uint64 {
var (
requestBNs []uint64
)
bn := curHeight + 1
// TODO: this algorithm can be potentially optimized.
for cnt := 0; cnt < cap && bn <= rdm.targetBN; cnt++ {
for bn <= rdm.targetBN {
_, ok1 := rdm.requesting[bn]
_, ok2 := rdm.processing[bn]
if !ok1 && !ok2 {
requestBNs = append(requestBNs, bn)
bn++
break
}
bn++
}
}
return requestBNs
}
func (rdm *receiptDownloadManager) availableForMoreTasks() bool {
return len(rdm.requesting) < SoftQueueCap
}
func (rdm *receiptDownloadManager) addBatchToRequesting(bns []uint64) {
for _, bn := range bns {
rdm.requesting[bn] = struct{}{}
}
}

@ -29,7 +29,7 @@ func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) {
return len(blocks), nil
}
func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error {
func verifyBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error {
var (
sigBytes bls.SerializedSignature
bitmap []byte
@ -61,7 +61,18 @@ func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*type
case err != nil:
return errors.Wrap(err, "[InsertChain]")
default:
}
return nil
}
func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error {
//verify block
if err := verifyBlock(bc, block, nextBlocks...); err != nil {
return err
}
// insert block
if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil {
return errors.Wrap(err, "[InsertChain]")
}
return nil
}

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
@ -26,6 +27,7 @@ type StageBodiesCfg struct {
concurrency int
protocol syncProtocol
isBeacon bool
extractReceiptHashes bool
logProgress bool
}
@ -35,7 +37,7 @@ func NewStageBodies(cfg StageBodiesCfg) *StageBodies {
}
}
func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageBodiesCfg {
func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, extractReceiptHashes bool, logProgress bool) StageBodiesCfg {
return StageBodiesCfg{
bc: bc,
db: db,
@ -43,6 +45,7 @@ func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concu
concurrency: concurrency,
protocol: protocol,
isBeacon: isBeacon,
extractReceiptHashes: extractReceiptHashes,
logProgress: logProgress,
}
}
@ -67,7 +70,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
}
maxHeight := s.state.status.targetBN
currentHead := b.configs.bc.CurrentBlock().NumberU64()
currentHead := s.state.CurrentBlockNumber()
if currentHead >= maxHeight {
return nil
}
@ -118,7 +121,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
for i := 0; i != s.state.config.Concurrency; i++ {
wg.Add(1)
go b.runBlockWorkerLoop(ctx, s.state.gbm, &wg, i, startTime)
go b.runBlockWorkerLoop(ctx, s.state.gbm, &wg, i, s, startTime)
}
wg.Wait()
@ -133,9 +136,9 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
}
// runBlockWorkerLoop creates a work loop for download blocks
func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time) {
func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownloadManager, wg *sync.WaitGroup, loopID int, s *StageState, startTime time.Time) {
currentBlock := int(b.configs.bc.CurrentBlock().NumberU64())
currentBlock := int(s.state.CurrentBlockNumber())
defer wg.Done()
@ -145,7 +148,8 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
return
default:
}
batch := gbm.GetNextBatch()
curHeight := s.state.CurrentBlockNumber()
batch := gbm.GetNextBatch(curHeight)
if len(batch) == 0 {
select {
case <-ctx.Done():
@ -204,6 +208,34 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
}
}
func (b *StageBodies) verifyBlockAndExtractReceiptsData(batchBlockBytes [][]byte, batchSigBytes [][]byte, s *StageState) error {
var block *types.Block
for i := uint64(0); i < uint64(len(batchBlockBytes)); i++ {
blockBytes := batchBlockBytes[i]
sigBytes := batchSigBytes[i]
if blockBytes == nil {
continue
}
if err := rlp.DecodeBytes(blockBytes, &block); err != nil {
utils.Logger().Error().
Uint64("block number", i).
Msg("block size invalid")
return ErrInvalidBlockBytes
}
if sigBytes != nil {
block.SetCurrentCommitSig(sigBytes)
}
// if block.NumberU64() != i {
// return ErrInvalidBlockNumber
// }
if err := verifyBlock(b.configs.bc, block); err != nil {
return err
}
}
return nil
}
// redownloadBadBlock tries to redownload the bad block from other streams
func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) error {
@ -403,7 +435,7 @@ func (b *StageBodies) Revert(ctx context.Context, firstCycle bool, u *RevertStat
defer tx.Rollback()
}
// save progress
currentHead := b.configs.bc.CurrentBlock().NumberU64()
currentHead := s.state.CurrentBlockNumber()
if err = s.Update(tx, currentHead); err != nil {
utils.Logger().Error().
Err(err).

@ -53,7 +53,7 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock
maxHeight := s.state.status.targetBN
maxBlocksPerSyncCycle := uint64(1024) // TODO: should be in config -> s.state.MaxBlocksPerSyncCycle
currentHeight := heads.configs.bc.CurrentBlock().NumberU64()
currentHeight := s.state.CurrentBlockNumber()
s.state.currentCycle.TargetHeight = maxHeight
targetHeight := uint64(0)
if errV := CreateView(ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) {
@ -89,6 +89,14 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock
targetHeight = currentHeight + maxBlocksPerSyncCycle
}
// check pivot: if chain hasn't reached to pivot yet
if s.state.status.cycleSyncMode != FullSync && s.state.status.pivotBlock != nil {
// set target height on the pivot block
if !s.state.status.statesSynced && targetHeight > s.state.status.pivotBlock.NumberU64() {
targetHeight = s.state.status.pivotBlock.NumberU64()
}
}
s.state.currentCycle.TargetHeight = targetHeight
if err := s.Update(tx, targetHeight); err != nil {

@ -0,0 +1,398 @@
package stagedstreamsync
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
)
type StageReceipts struct {
configs StageReceiptsCfg
}
type StageReceiptsCfg struct {
bc core.BlockChain
db kv.RwDB
blockDBs []kv.RwDB
concurrency int
protocol syncProtocol
isBeacon bool
logProgress bool
}
func NewStageReceipts(cfg StageReceiptsCfg) *StageReceipts {
return &StageReceipts{
configs: cfg,
}
}
func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageReceiptsCfg {
return StageReceiptsCfg{
bc: bc,
db: db,
blockDBs: blockDBs,
concurrency: concurrency,
protocol: protocol,
isBeacon: isBeacon,
logProgress: logProgress,
}
}
// Exec progresses receipts stage in the forward direction
func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// only execute this stage in fast/snap sync mode
if s.state.status.cycleSyncMode == FullSync {
return nil
}
useInternalTx := tx == nil
if invalidBlockRevert {
return nil
}
// for short range sync, skip this stage
if !s.state.initSync {
return nil
}
maxHeight := s.state.status.targetBN
currentHead := s.state.CurrentBlockNumber()
if currentHead >= maxHeight {
return nil
}
currProgress := uint64(0)
targetHeight := s.state.currentCycle.TargetHeight
if errV := CreateView(ctx, r.configs.db, tx, func(etx kv.Tx) error {
if currProgress, err = s.CurrentStageProgress(etx); err != nil {
return err
}
return nil
}); errV != nil {
return errV
}
if currProgress == 0 {
currProgress = currentHead
}
if currProgress >= targetHeight {
return nil
}
// size := uint64(0)
startTime := time.Now()
// startBlock := currProgress
if r.configs.logProgress {
fmt.Print("\033[s") // save the cursor position
}
if useInternalTx {
var err error
tx, err = r.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
for {
// check if there is no any more to download break the loop
curBn := s.state.CurrentBlockNumber()
if curBn == targetHeight {
break
}
// calculate the block numbers range to download
toBn := curBn + uint64(ReceiptsPerRequest*s.state.config.Concurrency)
if toBn > targetHeight {
toBn = targetHeight
}
// Fetch receipts from connected peers
rdm := newReceiptDownloadManager(tx, r.configs.bc, toBn, s.state.logger)
// Setup workers to fetch blocks from remote node
var wg sync.WaitGroup
for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1)
go func() {
// prepare db transactions
txs := make([]kv.RwTx, r.configs.concurrency)
for i := 0; i < r.configs.concurrency; i++ {
txs[i], err = r.configs.blockDBs[i].BeginRw(ctx)
if err != nil {
return
}
}
// rollback the transactions after worker loop
defer func() {
for i := 0; i < r.configs.concurrency; i++ {
txs[i].Rollback()
}
}()
r.runReceiptWorkerLoop(ctx, rdm, &wg, s, txs, startTime)
}()
}
wg.Wait()
// insert all downloaded blocks and receipts to chain
if err := r.insertBlocksAndReceipts(ctx, rdm, toBn, s); err != nil {
utils.Logger().Err(err).Msg(WrapStagedSyncMsg("InsertReceiptChain failed"))
}
}
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
func (r *StageReceipts) insertBlocksAndReceipts(ctx context.Context, rdm *receiptDownloadManager, toBn uint64, s *StageState) error {
if len(rdm.received) == 0 {
return nil
}
var (
bns []uint64
blocks []*types.Block
receipts []types.Receipts
streamIDs []sttypes.StreamID
)
// populate blocks and receipts in separate array
// this way helps to sort blocks and receipts by block number
for bn := s.state.CurrentBlockNumber() + 1; bn <= toBn; bn++ {
if received, ok := rdm.received[bn]; !ok {
return errors.New("some blocks are missing")
} else {
bns = append(bns, bn)
blocks = append(blocks, received.block)
receipts = append(receipts, received.receipts)
streamIDs = append(streamIDs, received.streamID)
}
}
// insert sorted blocks and receipts to chain
if inserted, err := r.configs.bc.InsertReceiptChain(blocks, receipts); err != nil {
utils.Logger().Err(err).
Interface("streams", streamIDs).
Interface("block numbers", bns).
Msg(WrapStagedSyncMsg("InsertReceiptChain failed"))
rdm.HandleRequestError(bns, err)
return fmt.Errorf("InsertReceiptChain failed: %s", err.Error())
} else {
if inserted != len(blocks) {
utils.Logger().Warn().
Interface("block numbers", bns).
Int("inserted", inserted).
Int("blocks to insert", len(blocks)).
Msg(WrapStagedSyncMsg("InsertReceiptChain couldn't insert all downloaded blocks/receipts"))
}
}
return nil
}
// runReceiptWorkerLoop creates a work loop for download receipts
func (r *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDownloadManager, wg *sync.WaitGroup, s *StageState, txs []kv.RwTx, startTime time.Time) {
currentBlock := int(s.state.CurrentBlockNumber())
gbm := s.state.gbm
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
// get next batch of block numbers
curHeight := s.state.CurrentBlockNumber()
batch := rdm.GetNextBatch(curHeight)
if len(batch) == 0 {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
return
}
}
// retrieve corresponding blocks from cache db
var hashes []common.Hash
var blocks []*types.Block
for _, bn := range batch {
blkKey := marshalData(bn)
loopID, _, errBDD := gbm.GetDownloadDetails(bn)
if errBDD != nil {
utils.Logger().Warn().
Err(errBDD).
Interface("block numbers", bn).
Msg(WrapStagedSyncMsg("get block download details failed"))
return
}
blockBytes, err := txs[loopID].GetOne(BlocksBucket, blkKey)
if err != nil {
return
}
sigBytes, err := txs[loopID].GetOne(BlockSignaturesBucket, blkKey)
if err != nil {
return
}
sz := len(blockBytes)
if sz <= 1 {
return
}
var block *types.Block
if err := rlp.DecodeBytes(blockBytes, &block); err != nil {
return
}
if sigBytes != nil {
block.SetCurrentCommitSig(sigBytes)
}
if block.NumberU64() != bn {
return
}
if block.Header().ReceiptHash() == emptyHash {
return
}
// receiptHash := s.state.currentCycle.ReceiptHashes[bn]
gbm.SetRootHash(bn, block.Header().Root())
hashes = append(hashes, block.Header().Hash())
blocks = append(blocks, block)
}
// download receipts
receipts, stid, err := r.downloadReceipts(ctx, hashes)
if err != nil {
if !errors.Is(err, context.Canceled) {
r.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed"))
err = errors.Wrap(err, "request error")
rdm.HandleRequestError(batch, err)
} else {
// handle request result
rdm.HandleRequestResult(batch, receipts, blocks, stid)
// log progress
if r.configs.logProgress {
//calculating block download speed
dt := time.Now().Sub(startTime).Seconds()
speed := float64(0)
if dt > 0 {
speed = float64(len(rdm.rdd)) / dt
}
blockReceiptSpeed := fmt.Sprintf("%.2f", speed)
fmt.Print("\033[u\033[K") // restore the cursor position and clear the line
fmt.Println("downloaded blocks and receipts:", currentBlock+len(rdm.rdd), "/", int(rdm.targetBN), "(", blockReceiptSpeed, "BlocksAndReceipts/s", ")")
}
}
}
}
func (r *StageReceipts) downloadReceipts(ctx context.Context, hs []common.Hash) ([]types.Receipts, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
receipts, stid, err := r.configs.protocol.GetReceipts(ctx, hs)
if err != nil {
return nil, stid, err
}
if err := validateGetReceiptsResult(hs, receipts); err != nil {
return nil, stid, err
}
return receipts, stid, nil
}
func validateGetReceiptsResult(requested []common.Hash, result []types.Receipts) error {
// TODO: validate each receipt here
return nil
}
func (r *StageReceipts) saveProgress(ctx context.Context, s *StageState, progress uint64, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = r.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
// save progress
if err = s.Update(tx, progress); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_SYNC] saving progress for receipt stage failed")
return ErrSavingBodiesProgressFail
}
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
func (r *StageReceipts) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = r.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if err = u.Done(tx); err != nil {
return err
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
func (r *StageReceipts) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = r.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}

@ -136,6 +136,8 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
sh.streamsFailed(whitelist, "remote nodes cannot provide blocks with target hashes")
}
utils.Logger().Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result")
n, err := verifyAndInsertBlocks(sr.configs.bc, blocks)
numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n))
if err != nil {

@ -53,6 +53,11 @@ func NewStageStatesCfg(
// Exec progresses States stage in the forward direction
func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// only execute this stage in full sync mode
if s.state.status.cycleSyncMode != FullSync {
return nil
}
// for short range sync, skip this step
if !s.state.initSync {
return nil
@ -64,11 +69,11 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
}
maxHeight := s.state.status.targetBN
currentHead := stg.configs.bc.CurrentBlock().NumberU64()
currentHead := s.state.CurrentBlockNumber()
if currentHead >= maxHeight {
return nil
}
currProgress := stg.configs.bc.CurrentBlock().NumberU64()
currProgress := currentHead
targetHeight := s.state.currentCycle.TargetHeight
if currProgress >= targetHeight {
return nil
@ -110,7 +115,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
for i := currProgress + 1; i <= targetHeight; i++ {
blkKey := marshalData(i)
loopID, streamID := gbm.GetDownloadDetails(i)
loopID, streamID, errBDD := gbm.GetDownloadDetails(i)
if errBDD != nil {
return errBDD
}
blockBytes, err := txs[loopID].GetOne(BlocksBucket, blkKey)
if err != nil {

@ -0,0 +1,310 @@
package stagedstreamsync
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
type StageStateSync struct {
configs StageStateSyncCfg
}
type StageStateSyncCfg struct {
bc core.BlockChain
db kv.RwDB
concurrency int
protocol syncProtocol
logger zerolog.Logger
logProgress bool
}
func NewStageStateSync(cfg StageStateSyncCfg) *StageStateSync {
return &StageStateSync{
configs: cfg,
}
}
func NewStageStateSyncCfg(bc core.BlockChain,
db kv.RwDB,
concurrency int,
protocol syncProtocol,
logger zerolog.Logger,
logProgress bool) StageStateSyncCfg {
return StageStateSyncCfg{
bc: bc,
db: db,
concurrency: concurrency,
protocol: protocol,
logger: logger,
logProgress: logProgress,
}
}
// Exec progresses States stage in the forward direction
func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// for short range sync, skip this step
if !s.state.initSync {
return nil
} // only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced {
return nil
}
// maxHeight := s.state.status.targetBN
// currentHead := s.state.CurrentBlockNumber()
// if currentHead >= maxHeight {
// return nil
// }
// currProgress := s.state.CurrentBlockNumber()
// targetHeight := s.state.currentCycle.TargetHeight
// if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error {
// if currProgress, err = s.CurrentStageProgress(etx); err != nil {
// return err
// }
// return nil
// }); errV != nil {
// return errV
// }
// if currProgress >= targetHeight {
// return nil
// }
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = sss.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
// isLastCycle := targetHeight >= maxHeight
startTime := time.Now()
if sss.configs.logProgress {
fmt.Print("\033[s") // save the cursor position
}
// Fetch states from neighbors
// pivotRootHash := s.state.status.pivotBlock.Root()
currentBlockRootHash := s.state.bc.CurrentFastBlock().Root()
sdm := newStateDownloadManager(tx, sss.configs.bc, sss.configs.concurrency, s.state.logger)
sdm.setRootHash(currentBlockRootHash)
var wg sync.WaitGroup
for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1)
go sss.runStateWorkerLoop(ctx, sdm, &wg, i, startTime, s)
}
wg.Wait()
// insert block
if err := sss.configs.bc.WriteHeadBlock(s.state.status.pivotBlock); err != nil {
sss.configs.logger.Warn().Err(err).
Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()).
Msg(WrapStagedSyncMsg("insert pivot block failed"))
// TODO: panic("pivot block is failed to insert in chain.")
return err
}
// states should be fully synced in this stage
s.state.status.statesSynced = true
/*
gbm := s.state.gbm
// Setup workers to fetch states from remote node
var wg sync.WaitGroup
curHeight := s.state.CurrentBlockNumber()
for bn := curHeight + 1; bn <= gbm.targetBN; bn++ {
root := gbm.GetRootHash(bn)
if root == emptyHash {
continue
}
sdm.setRootHash(root)
for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1)
go sss.runStateWorkerLoop(ctx, sdm, &wg, i, startTime, s)
}
wg.Wait()
}
*/
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
// runStateWorkerLoop creates a work loop for download states
func (sss *StageStateSync) runStateWorkerLoop(ctx context.Context, sdm *StateDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time, s *StageState) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
nodes, paths, codes, err := sdm.GetNextBatch()
if len(nodes)+len(codes) == 0 || err != nil {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
return
}
}
data, stid, err := sss.downloadStates(ctx, nodes, codes)
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "downloadStates failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("downloadStates failed"))
err = errors.Wrap(err, "request error")
sdm.HandleRequestError(codes, paths, stid, err)
} else if data == nil || len(data) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("downloadStates failed, received empty data bytes"))
err := errors.New("downloadStates received empty data bytes")
sdm.HandleRequestError(codes, paths, stid, err)
} else {
sdm.HandleRequestResult(nodes, paths, data, loopID, stid)
if sss.configs.logProgress {
//calculating block download speed
dt := time.Now().Sub(startTime).Seconds()
speed := float64(0)
if dt > 0 {
speed = float64(len(data)) / dt
}
stateDownloadSpeed := fmt.Sprintf("%.2f", speed)
fmt.Print("\033[u\033[K") // restore the cursor position and clear the line
fmt.Println("state download speed:", stateDownloadSpeed, "states/s")
}
}
}
}
func (sss *StageStateSync) downloadStates(ctx context.Context, nodes []common.Hash, codes []common.Hash) ([][]byte, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
hashes := append(codes, nodes...)
data, stid, err := sss.configs.protocol.GetNodeData(ctx, hashes)
if err != nil {
return nil, stid, err
}
if err := validateGetNodeDataResult(hashes, data); err != nil {
return nil, stid, err
}
return data, stid, nil
}
func validateGetNodeDataResult(requested []common.Hash, result [][]byte) error {
if len(result) != len(requested) {
return fmt.Errorf("unexpected number of nodes delivered: %v / %v", len(result), len(requested))
}
return nil
}
func (stg *StageStateSync) insertChain(gbm *blockDownloadManager,
protocol syncProtocol,
lbls prometheus.Labels,
targetBN uint64) {
}
func (stg *StageStateSync) saveProgress(s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = stg.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
// save progress
if err = s.Update(tx, s.state.CurrentBlockNumber()); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_SYNC] saving progress for block States stage failed")
return ErrSaveStateProgressFail
}
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
func (stg *StageStateSync) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if err = u.Done(tx); err != nil {
return err
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
func (stg *StageStateSync) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}

@ -0,0 +1,469 @@
package stagedstreamsync
import (
"context"
"fmt"
"sync"
"time"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
//sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
type StageFullStateSync struct {
configs StageFullStateSyncCfg
}
type StageFullStateSyncCfg struct {
bc core.BlockChain
db kv.RwDB
concurrency int
protocol syncProtocol
logger zerolog.Logger
logProgress bool
}
func NewStageFullStateSync(cfg StageFullStateSyncCfg) *StageFullStateSync {
return &StageFullStateSync{
configs: cfg,
}
}
func NewStageFullStateSyncCfg(bc core.BlockChain,
db kv.RwDB,
concurrency int,
protocol syncProtocol,
logger zerolog.Logger,
logProgress bool) StageFullStateSyncCfg {
return StageFullStateSyncCfg{
bc: bc,
db: db,
concurrency: concurrency,
protocol: protocol,
logger: logger,
logProgress: logProgress,
}
}
// Exec progresses States stage in the forward direction
func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) {
// for short range sync, skip this step
if !s.state.initSync {
return nil
} // only execute this stage in fast/snap sync mode and once we reach to pivot
if s.state.status.pivotBlock == nil ||
s.state.CurrentBlockNumber() != s.state.status.pivotBlock.NumberU64() ||
s.state.status.statesSynced {
return nil
}
// maxHeight := s.state.status.targetBN
// currentHead := s.state.CurrentBlockNumber()
// if currentHead >= maxHeight {
// return nil
// }
// currProgress := s.state.CurrentBlockNumber()
// targetHeight := s.state.currentCycle.TargetHeight
// if errV := CreateView(ctx, sss.configs.db, tx, func(etx kv.Tx) error {
// if currProgress, err = s.CurrentStageProgress(etx); err != nil {
// return err
// }
// return nil
// }); errV != nil {
// return errV
// }
// if currProgress >= targetHeight {
// return nil
// }
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = sss.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
// isLastCycle := targetHeight >= maxHeight
startTime := time.Now()
if sss.configs.logProgress {
fmt.Print("\033[s") // save the cursor position
}
// Fetch states from neighbors
currentBlockRootHash := s.state.bc.CurrentFastBlock().Root()
scheme := sss.configs.bc.TrieDB().Scheme()
sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger)
sdm.setRootHash(currentBlockRootHash)
var wg sync.WaitGroup
for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1)
go sss.runStateWorkerLoop(ctx, sdm, &wg, i, startTime, s)
}
wg.Wait()
// insert block
if err := sss.configs.bc.WriteHeadBlock(s.state.status.pivotBlock); err != nil {
sss.configs.logger.Warn().Err(err).
Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()).
Msg(WrapStagedSyncMsg("insert pivot block failed"))
// TODO: panic("pivot block is failed to insert in chain.")
return err
}
// states should be fully synced in this stage
s.state.status.statesSynced = true
/*
gbm := s.state.gbm
// Setup workers to fetch states from remote node
var wg sync.WaitGroup
curHeight := s.state.CurrentBlockNumber()
for bn := curHeight + 1; bn <= gbm.targetBN; bn++ {
root := gbm.GetRootHash(bn)
if root == emptyHash {
continue
}
sdm.setRootHash(root)
for i := 0; i < s.state.config.Concurrency; i++ {
wg.Add(1)
go sss.runStateWorkerLoop(ctx, sdm, &wg, i, startTime, s)
}
wg.Wait()
}
*/
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
// runStateWorkerLoop creates a work loop for download states
func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *FullStateDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time, s *StageState) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
accountTasks, codes, storages, healtask, codetask, err := sdm.GetNextBatch()
if len(accountTasks)+len(codes)+len(storages.accounts)+len(healtask.hashes)+len(codetask.hashes) == 0 || err != nil {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
return
}
}
if len(accountTasks) > 0 {
task := accountTasks[0]
origin := task.Next
limit := task.Last
root := sdm.root
cap := maxRequestSize
retAccounts, proof, stid, err := sss.configs.protocol.GetAccountRange(ctx, root, origin, limit, uint64(cap))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetAccountRange failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetAccountRange failed"))
err = errors.Wrap(err, "request error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
} else if retAccounts == nil || len(retAccounts) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetAccountRange failed, received empty accounts"))
err := errors.New("GetAccountRange received empty slots")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
if err := sdm.HandleAccountRequestResult(task, retAccounts, proof, origin[:], limit[:], loopID, stid); err != nil {
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetAccountRange handle result failed"))
err = errors.Wrap(err, "handle result error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
} else if len(codes) > 0 {
stid, err := sss.downloadByteCodes(ctx, sdm, codes, loopID)
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "downloadByteCodes failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("downloadByteCodes failed"))
err = errors.Wrap(err, "request error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
} else if len(storages.accounts) > 0 {
root := sdm.root
roots := storages.roots
accounts := storages.accounts
cap := maxRequestSize
origin := storages.origin
limit := storages.limit
mainTask := storages.mainTask
subTask := storages.subtask
slots, proof, stid, err := sss.configs.protocol.GetStorageRanges(ctx, root, accounts, origin, limit, uint64(cap))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetStorageRanges failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetStorageRanges failed"))
err = errors.Wrap(err, "request error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
} else if slots == nil || len(slots) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetStorageRanges failed, received empty slots"))
err := errors.New("GetStorageRanges received empty slots")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
if err := sdm.HandleStorageRequestResult(mainTask, subTask, accounts, roots, origin, limit, slots, proof, loopID, stid); err != nil {
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetStorageRanges handle result failed"))
err = errors.Wrap(err, "handle result error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
} else {
// assign trie node Heal Tasks
if len(healtask.hashes) > 0 {
root := sdm.root
task := healtask.task
hashes := healtask.hashes
pathsets := healtask.pathsets
paths := healtask.paths
nodes, stid, err := sss.configs.protocol.GetTrieNodes(ctx, root, pathsets, maxRequestSize)
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetTrieNodes failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetTrieNodes failed"))
err = errors.Wrap(err, "request error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
} else if nodes == nil || len(nodes) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetTrieNodes failed, received empty nodes"))
err := errors.New("GetTrieNodes received empty nodes")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
if err := sdm.HandleTrieNodeHealRequestResult(task, paths, hashes, nodes, loopID, stid); err != nil {
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetTrieNodes handle result failed"))
err = errors.Wrap(err, "handle result error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
}
if len(codetask.hashes) > 0 {
task := codetask.task
hashes := codetask.hashes
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, hashes, maxRequestSize)
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
sss.configs.protocol.StreamFailed(stid, "GetByteCodes failed")
}
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetByteCodes failed"))
err = errors.Wrap(err, "request error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
} else if retCodes == nil || len(retCodes) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetByteCodes failed, received empty codes"))
err := errors.New("GetByteCodes received empty codes")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
if err := sdm.HandleBytecodeRequestResult(task, hashes, retCodes, loopID, stid); err != nil {
utils.Logger().Error().
Err(err).
Str("stream", string(stid)).
Msg(WrapStagedSyncMsg("GetByteCodes handle result failed"))
err = errors.Wrap(err, "handle result error")
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err)
return
}
}
}
}
}
func (sss *StageFullStateSync) downloadByteCodes(ctx context.Context, sdm *FullStateDownloadManager, codeTasks []*byteCodeTasksBundle, loopID int) (stid sttypes.StreamID, err error) {
for _, codeTask := range codeTasks {
// try to get byte codes from remote peer
// if any of them failed, the stid will be the id of the failed stream
retCodes, stid, err := sss.configs.protocol.GetByteCodes(ctx, codeTask.hashes, maxRequestSize)
if err != nil {
return stid, err
}
if len(retCodes) == 0 {
return stid, errors.New("empty codes array")
}
if err = sdm.HandleBytecodeRequestResult(codeTask.task, codeTask.hashes, retCodes, loopID, stid); err != nil {
return stid, err
}
}
return
}
// func (sss *StageFullStateSync) downloadStates(ctx context.Context,
// root common.Hash,
// origin common.Hash,
// accounts []*accountTask,
// codes []common.Hash,
// storages *storageTaskBundle) ([][]byte, sttypes.StreamID, error) {
// ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// defer cancel()
// // if there is any account task, first we have to complete that
// if len(accounts) > 0 {
// }
// // hashes := append(codes, nodes...)
// // data, stid, err := sss.configs.protocol.GetNodeData(ctx, hashes)
// // if err != nil {
// // return nil, stid, err
// // }
// // if err := validateGetNodeDataResult(hashes, data); err != nil {
// // return nil, stid, err
// // }
// return data, stid, nil
// }
func (stg *StageFullStateSync) insertChain(gbm *blockDownloadManager,
protocol syncProtocol,
lbls prometheus.Labels,
targetBN uint64) {
}
func (stg *StageFullStateSync) saveProgress(s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
var err error
tx, err = stg.configs.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
// save progress
if err = s.Update(tx, s.state.CurrentBlockNumber()); err != nil {
utils.Logger().Error().
Err(err).
Msgf("[STAGED_SYNC] saving progress for block States stage failed")
return ErrSaveStateProgressFail
}
if useInternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
func (stg *StageFullStateSync) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if err = u.Done(tx); err != nil {
return err
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
func (stg *StageFullStateSync) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
useInternalTx := tx == nil
if useInternalTx {
tx, err = stg.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}

@ -75,7 +75,6 @@ type StagedStreamSync struct {
status *status //TODO: merge this with currentSyncCycle
initSync bool // if sets to true, node start long range syncing
UseMemDB bool
revertPoint *uint64 // used to run stages
prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon)
invalidBlock InvalidBlock
@ -267,8 +266,18 @@ func New(
logger zerolog.Logger,
) *StagedStreamSync {
revertStages := make([]*Stage, len(stagesList))
for i, stageIndex := range DefaultRevertOrder {
forwardStages := make([]*Stage, len(StagesForwardOrder))
for i, stageIndex := range StagesForwardOrder {
for _, s := range stagesList {
if s.ID == stageIndex {
forwardStages[i] = s
break
}
}
}
revertStages := make([]*Stage, len(StagesRevertOrder))
for i, stageIndex := range StagesRevertOrder {
for _, s := range stagesList {
if s.ID == stageIndex {
revertStages[i] = s
@ -276,8 +285,9 @@ func New(
}
}
}
pruneStages := make([]*Stage, len(stagesList))
for i, stageIndex := range DefaultCleanUpOrder {
pruneStages := make([]*Stage, len(StagesCleanUpOrder))
for i, stageIndex := range StagesCleanUpOrder {
for _, s := range stagesList {
if s.ID == stageIndex {
pruneStages[i] = s
@ -306,7 +316,7 @@ func New(
inserted: 0,
config: config,
logger: logger,
stages: stagesList,
stages: forwardStages,
currentStage: 0,
revertOrder: revertStages,
pruningOrder: pruneStages,
@ -327,6 +337,18 @@ func (s *StagedStreamSync) doGetCurrentNumberRequest(ctx context.Context) (uint6
return bn, stid, nil
}
// doGetBlockByNumberRequest returns block by its number and corresponding stream
func (s *StagedStreamSync) doGetBlockByNumberRequest(ctx context.Context, bn uint64) (*types.Block, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
blocks, stid, err := s.protocol.GetBlocksByNumber(ctx, []uint64{bn}, syncproto.WithHighPriority())
if err != nil || len(blocks) != 1 {
return nil, stid, err
}
return blocks[0], stid, nil
}
// promLabels returns a prometheus labels for current shard id
func (s *StagedStreamSync) promLabels() prometheus.Labels {
sid := s.bc.ShardID()
@ -472,7 +494,6 @@ func (s *StagedStreamSync) runStage(ctx context.Context, stage *Stage, db kv.RwD
if err != nil {
return err
}
if err = stage.Handler.Exec(ctx, firstCycle, invalidBlockRevert, stageState, s, tx); err != nil {
utils.Logger().Error().
Err(err).

@ -13,6 +13,8 @@ const (
SyncEpoch SyncStageID = "SyncEpoch" // epoch sync
BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
States SyncStageID = "States" // will construct most recent state from downloaded blocks
StateSync SyncStageID = "StateSync" // State sync
Receipts SyncStageID = "Receipts" // Receipts
LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well
Finish SyncStageID = "Finish" // Nominal stage after all other stages
)

@ -0,0 +1,432 @@
package stagedstreamsync
import (
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/trie"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
"golang.org/x/crypto/sha3"
)
// codeTask represents a single byte code download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type codeTask struct {
attempts map[sttypes.StreamID]int
}
// trieTask represents a single trie node download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type trieTask struct {
hash common.Hash
path [][]byte
attempts map[sttypes.StreamID]int
}
type task struct {
trieTasks map[string]*trieTask // Set of trie node tasks currently queued for retrieval, indexed by path
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval, indexed by hash
}
func newTask() *task {
return &task{
trieTasks: make(map[string]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
}
}
func (t *task) addCodeTask(h common.Hash, ct *codeTask) {
t.codeTasks[h] = &codeTask{
attempts: ct.attempts,
}
}
func (t *task) getCodeTask(h common.Hash) *codeTask {
if task, ok := t.codeTasks[h]; ok {
return task
}
return nil
}
func (t *task) addNewCodeTask(h common.Hash) {
t.codeTasks[h] = &codeTask{
attempts: make(map[sttypes.StreamID]int),
}
}
func (t *task) deleteCodeTask(hash common.Hash) {
if _, ok := t.codeTasks[hash]; ok {
delete(t.codeTasks, hash)
}
}
func (t *task) deleteCodeTaskAttempts(h common.Hash, stID sttypes.StreamID) {
if task, ok := t.codeTasks[h]; ok {
if _, ok := task.attempts[stID]; ok {
delete(t.codeTasks[h].attempts, stID)
}
}
}
func (t *task) addTrieTask(path string, tt *trieTask) {
t.trieTasks[path] = &trieTask{
hash: tt.hash,
path: tt.path,
attempts: tt.attempts,
}
}
func (t *task) getTrieTask(path string) *trieTask {
if task, ok := t.trieTasks[path]; ok {
return task
}
return nil
}
func (t *task) addNewTrieTask(hash common.Hash, path string) {
t.trieTasks[path] = &trieTask{
hash: hash,
path: trie.NewSyncPath([]byte(path)),
attempts: make(map[sttypes.StreamID]int),
}
}
func (t *task) deleteTrieTask(path string) {
if _, ok := t.trieTasks[path]; ok {
delete(t.trieTasks, path)
}
}
func (t *task) deleteTrieTaskAttempts(path string, stID sttypes.StreamID) {
if task, ok := t.trieTasks[path]; ok {
if _, ok := task.attempts[stID]; ok {
delete(t.trieTasks[path].attempts, stID)
}
}
}
// StateDownloadManager is the helper structure for get blocks request management
type StateDownloadManager struct {
bc core.BlockChain
tx kv.RwTx
protocol syncProtocol
root common.Hash // State root currently being synced
sched *trie.Sync // State trie sync scheduler defining the tasks
keccak crypto.KeccakState // Keccak256 hasher to verify deliveries with
concurrency int
logger zerolog.Logger
lock sync.Mutex
numUncommitted int
bytesUncommitted int
tasks *task
requesting *task
processing *task
retries *task
}
func newStateDownloadManager(tx kv.RwTx,
bc core.BlockChain,
concurrency int,
logger zerolog.Logger) *StateDownloadManager {
return &StateDownloadManager{
bc: bc,
tx: tx,
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
concurrency: concurrency,
logger: logger,
tasks: newTask(),
requesting: newTask(),
processing: newTask(),
retries: newTask(),
}
}
func (s *StateDownloadManager) setRootHash(root common.Hash) {
s.root = root
s.sched = state.NewStateSync(root, s.bc.ChainDb(), nil, rawdb.HashScheme)
}
// fillTasks fills the tasks to send to the remote peer.
func (s *StateDownloadManager) fillTasks(n int) error {
if fill := n - (len(s.tasks.trieTasks) + len(s.tasks.codeTasks)); fill > 0 {
paths, hashes, codes := s.sched.Missing(fill)
for i, path := range paths {
s.tasks.addNewTrieTask(hashes[i], path)
}
for _, hash := range codes {
s.tasks.addNewCodeTask(hash)
}
}
return nil
}
// getNextBatch returns objects with a maximum of n state download
// tasks to send to the remote peer.
func (s *StateDownloadManager) GetNextBatch() (nodes []common.Hash, paths []string, codes []common.Hash, err error) {
s.lock.Lock()
defer s.lock.Unlock()
cap := StatesPerRequest
nodes, paths, codes = s.getBatchFromRetries(cap)
nItems := len(nodes) + len(codes)
cap -= nItems
if cap > 0 {
// Refill available tasks from the scheduler.
if s.sched.Pending() == 0 {
return
}
if err = s.commit(false); err != nil {
return
}
if err = s.fillTasks(cap); err != nil {
return
}
newNodes, newPaths, newCodes := s.getBatchFromUnprocessed(cap)
nodes = append(nodes, newNodes...)
paths = append(paths, newPaths...)
codes = append(codes, newCodes...)
}
return
}
func (s *StateDownloadManager) commit(force bool) error {
if !force && s.bytesUncommitted < ethdb.IdealBatchSize {
return nil
}
start := time.Now()
b := s.bc.ChainDb().NewBatch()
if err := s.sched.Commit(b); err != nil {
return err
}
if err := b.Write(); err != nil {
return fmt.Errorf("DB write error: %v", err)
}
s.updateStats(s.numUncommitted, 0, 0, time.Since(start))
s.numUncommitted = 0
s.bytesUncommitted = 0
return nil
}
// updateStats bumps the various state sync progress counters and displays a log
// message for the user to see.
func (s *StateDownloadManager) updateStats(written, duplicate, unexpected int, duration time.Duration) {
// TODO: here it updates the stats for total pending, processed, duplicates and unexpected
// for now, we just jog current stats
if written > 0 || duplicate > 0 || unexpected > 0 {
utils.Logger().Info().
Int("count", written).
Int("duplicate", duplicate).
Int("unexpected", unexpected).
Msg("Imported new state entries")
}
}
// getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download
// tasks to send to the remote peer.
func (s *StateDownloadManager) getBatchFromUnprocessed(n int) (nodes []common.Hash, paths []string, codes []common.Hash) {
// over trie nodes as those can be written to disk and forgotten about.
nodes = make([]common.Hash, 0, n)
paths = make([]string, 0, n)
codes = make([]common.Hash, 0, n)
for hash, t := range s.tasks.codeTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
codes = append(codes, hash)
s.requesting.addCodeTask(hash, t)
s.tasks.deleteCodeTask(hash)
}
for path, t := range s.tasks.trieTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
nodes = append(nodes, t.hash)
paths = append(paths, path)
s.requesting.addTrieTask(path, t)
s.tasks.deleteTrieTask(path)
}
return nodes, paths, codes
}
// getBatchFromRetries get the block number batch to be requested from retries.
func (s *StateDownloadManager) getBatchFromRetries(n int) ([]common.Hash, []string, []common.Hash) {
// over trie nodes as those can be written to disk and forgotten about.
nodes := make([]common.Hash, 0, n)
paths := make([]string, 0, n)
codes := make([]common.Hash, 0, n)
for hash, t := range s.retries.codeTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
codes = append(codes, hash)
s.requesting.addCodeTask(hash, t)
s.retries.deleteCodeTask(hash)
}
for path, t := range s.retries.trieTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
nodes = append(nodes, t.hash)
paths = append(paths, path)
s.requesting.addTrieTask(path, t)
s.retries.deleteTrieTask(path)
}
return nodes, paths, codes
}
// HandleRequestError handles the error result
func (s *StateDownloadManager) HandleRequestError(codeHashes []common.Hash, triePaths []string, streamID sttypes.StreamID, err error) {
s.lock.Lock()
defer s.lock.Unlock()
// add requested code hashes to retries
for _, h := range codeHashes {
task := s.requesting.getCodeTask(h)
s.retries.addCodeTask(h, task)
s.requesting.deleteCodeTask(h)
}
// add requested trie paths to retries
for _, path := range triePaths {
task := s.requesting.getTrieTask(path)
s.retries.addTrieTask(path, task)
s.requesting.deleteTrieTask(path)
}
}
// HandleRequestResult handles get trie paths and code hashes result
func (s *StateDownloadManager) HandleRequestResult(codeHashes []common.Hash, triePaths []string, response [][]byte, loopID int, streamID sttypes.StreamID) error {
s.lock.Lock()
defer s.lock.Unlock()
// Collect processing stats and update progress if valid data was received
duplicate, unexpected, successful := 0, 0, 0
for _, blob := range response {
hash, err := s.processNodeData(codeHashes, triePaths, blob)
switch err {
case nil:
s.numUncommitted++
s.bytesUncommitted += len(blob)
successful++
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
duplicate++
default:
return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
}
for _, path := range triePaths {
task := s.requesting.getTrieTask(path)
if task == nil {
// it is already removed from requesting
// either it has been completed and deleted by processNodeData or it does not exist
continue
}
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len(response) > 0 { //TODO: if timeout also do same
s.requesting.deleteTrieTaskAttempts(path, streamID)
} else if task.attempts[streamID] >= MaxTriesToFetchNodeData {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
return fmt.Errorf("trie node %s failed with peer %s (%d tries)", task.hash.TerminalString(), streamID, task.attempts[streamID])
}
// Missing item, place into the retry queue.
s.retries.addTrieTask(path, task)
s.requesting.deleteTrieTask(path)
}
for _, hash := range codeHashes {
task := s.requesting.getCodeTask(hash)
if task == nil {
// it is already removed from requesting
// either it has been completed and deleted by processNodeData or it does not exist
continue
}
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len(response) > 0 { //TODO: if timeout also do same
s.requesting.deleteCodeTaskAttempts(hash, streamID) //TODO: do we need delete attempts???
} else if task.attempts[streamID] >= MaxTriesToFetchNodeData {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
return fmt.Errorf("byte code %s failed with peer %s (%d tries)", hash.TerminalString(), streamID, task.attempts[streamID])
}
// Missing item, place into the retry queue.
s.retries.addCodeTask(hash, task)
s.requesting.deleteCodeTask(hash)
}
return nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any
// error occurred.
//
// If multiple requests correspond to the same hash, this method will inject the
// blob as a result for the first one only, leaving the remaining duplicates to
// be fetched again.
func (s *StateDownloadManager) processNodeData(codeHashes []common.Hash, triePaths []string, responseData []byte) (common.Hash, error) {
var hash common.Hash
s.keccak.Reset()
s.keccak.Write(responseData)
s.keccak.Read(hash[:])
//TODO: remove from requesting
if _, present := s.requesting.codeTasks[hash]; present {
err := s.sched.ProcessCode(trie.CodeSyncResult{
Hash: hash,
Data: responseData,
})
s.requesting.deleteCodeTask(hash)
return hash, err
}
for _, path := range triePaths {
task := s.requesting.getTrieTask(path)
if task == nil {
// this shouldn't happen while the path is given from triPaths and triPaths
// are given from requesting queue
continue
}
if task.hash == hash {
err := s.sched.ProcessNode(trie.NodeSyncResult{
Path: path,
Data: responseData,
})
s.requesting.deleteTrieTask(path)
return hash, err
}
}
return common.Hash{}, trie.ErrNotRequested
}

File diff suppressed because it is too large Load Diff

@ -11,6 +11,8 @@ import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
@ -81,20 +83,28 @@ func CreateStagedSync(ctx context.Context,
return nil, errInitDB
}
extractReceiptHashes := config.SyncMode == FastSync || config.SyncMode == SnapSync
stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, extractReceiptHashes, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)
stageStateSyncCfg := NewStageStateSyncCfg(bc, mainDB, config.Concurrency, protocol, logger, config.LogProgress)
stageReceiptsCfg := NewStageReceiptsCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB)
stageFinishCfg := NewStageFinishCfg(mainDB)
stages := DefaultStages(ctx,
// init stages order based on sync mode
initStagesOrder(config.SyncMode)
defaultStages := DefaultStages(ctx,
stageHeadsCfg,
stageSyncEpochCfg,
stageShortRangeCfg,
stageBodiesCfg,
stageStateSyncCfg,
stageStatesCfg,
stageReceiptsCfg,
lastMileCfg,
stageFinishCfg,
)
@ -112,7 +122,7 @@ func CreateStagedSync(ctx context.Context,
bc,
consensus,
mainDB,
stages,
defaultStages,
isBeaconNode,
protocol,
isBeaconNode,
@ -214,6 +224,65 @@ func (s *StagedStreamSync) Debug(source string, msg interface{}) {
}
}
// checkPivot checks pivot block and returns pivot block and cycle Sync mode
func (s *StagedStreamSync) checkPivot(ctx context.Context, estimatedHeight uint64, initSync bool) (*types.Block, SyncMode, error) {
if s.config.SyncMode == FullSync {
return nil, FullSync, nil
}
// do full sync if chain is at early stage
if initSync && estimatedHeight < MaxPivotDistanceToHead {
return nil, FullSync, nil
}
pivotBlockNumber := uint64(0)
var curPivot *uint64
if curPivot = rawdb.ReadLastPivotNumber(s.bc.ChainDb()); curPivot != nil {
// if head is behind pivot, that means it is still on fast/snap sync mode
if head := s.CurrentBlockNumber(); head < *curPivot {
pivotBlockNumber = *curPivot
// pivot could be moved forward if it is far from head
if pivotBlockNumber < estimatedHeight-MaxPivotDistanceToHead {
pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead
}
}
} else {
if head := s.CurrentBlockNumber(); s.config.SyncMode == FastSync && head <= 1 {
pivotBlockNumber = estimatedHeight - MinPivotDistanceToHead
if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil {
s.logger.Warn().Err(err).
Uint64("new pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("update pivot number failed"))
}
}
}
if pivotBlockNumber > 0 {
if block, err := s.queryAllPeersForBlockByNumber(ctx, pivotBlockNumber); err != nil {
s.logger.Error().Err(err).
Uint64("pivot", pivotBlockNumber).
Msg(WrapStagedSyncMsg("query peers for pivot block failed"))
return block, FastSync, err
} else {
if curPivot == nil || pivotBlockNumber != *curPivot {
if err := rawdb.WriteLastPivotNumber(s.bc.ChainDb(), pivotBlockNumber); err != nil {
s.logger.Warn().Err(err).
Uint64("new pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("update pivot number failed"))
return block, FastSync, err
}
}
s.status.pivotBlock = block
s.logger.Info().
Uint64("estimatedHeight", estimatedHeight).
Uint64("pivot number", pivotBlockNumber).
Msg(WrapStagedSyncMsg("fast/snap sync mode, pivot is set successfully"))
return block, FastSync, nil
}
}
return nil, FullSync, nil
}
// doSync does the long range sync.
// One LongRangeSync consists of several iterations.
// For each iteration, estimate the current block number, then fetch block & insert to blockchain
@ -224,7 +293,6 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
var totalInserted int
s.initSync = initSync
if err := s.checkPrerequisites(); err != nil {
return 0, 0, err
}
@ -238,13 +306,23 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
//TODO: use directly currentCycle var
s.status.setTargetBN(estimatedHeight)
}
if curBN := s.bc.CurrentBlock().NumberU64(); estimatedHeight <= curBN {
if curBN := s.CurrentBlockNumber(); estimatedHeight <= curBN {
s.logger.Info().Uint64("current number", curBN).Uint64("target number", estimatedHeight).
Msg(WrapStagedSyncMsg("early return of long range sync (chain is already ahead of target height)"))
return estimatedHeight, 0, nil
}
}
// We are probably in full sync, but we might have rewound to before the
// fast/snap sync pivot, check if we should reenable
if pivotBlock, cycleSyncMode, err := s.checkPivot(downloaderContext, estimatedHeight, initSync); err != nil {
s.logger.Error().Err(err).Msg(WrapStagedSyncMsg("check pivot failed"))
return 0, 0, err
} else {
s.status.cycleSyncMode = cycleSyncMode
s.status.pivotBlock = pivotBlock
}
s.startSyncing()
defer s.finishSyncing()
@ -289,7 +367,7 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
}
// add consensus last mile blocks
if s.consensus != nil {
if s.consensus != nil && s.isBeaconNode {
if hashes, err := s.addConsensusLastMile(s.Blockchain(), s.consensus); err != nil {
utils.Logger().Error().Err(err).
Msg("[STAGED_STREAM_SYNC] Add consensus last mile failed")
@ -315,7 +393,7 @@ func (s *StagedStreamSync) doSyncCycle(ctx context.Context) (int, error) {
var totalInserted int
s.inserted = 0
startHead := s.bc.CurrentBlock().NumberU64()
startHead := s.CurrentBlockNumber()
canRunCycleInOneTransaction := false
var tx kv.RwTx
@ -379,6 +457,36 @@ func (s *StagedStreamSync) checkPrerequisites() error {
return s.checkHaveEnoughStreams()
}
func (s *StagedStreamSync) CurrentBlockNumber() uint64 {
// if current head is ahead of pivot block, return chain head regardless of sync mode
if s.status.pivotBlock != nil && s.bc.CurrentBlock().NumberU64() >= s.status.pivotBlock.NumberU64() {
return s.bc.CurrentBlock().NumberU64()
}
current := uint64(0)
switch s.config.SyncMode {
case FullSync:
current = s.bc.CurrentBlock().NumberU64()
case FastSync:
current = s.bc.CurrentFastBlock().NumberU64()
case SnapSync:
current = s.bc.CurrentHeader().Number().Uint64()
}
return current
}
func (s *StagedStreamSync) stateSyncStage() bool {
switch s.config.SyncMode {
case FullSync:
return false
case FastSync:
return s.status.pivotBlock != nil && s.bc.CurrentFastBlock().NumberU64() == s.status.pivotBlock.NumberU64()-1
case SnapSync:
return false
}
return false
}
// estimateCurrentNumber roughly estimates the current block number.
// The block number does not need to be exact, but just a temporary target of the iteration
func (s *StagedStreamSync) estimateCurrentNumber(ctx context.Context) (uint64, error) {
@ -418,3 +526,45 @@ func (s *StagedStreamSync) estimateCurrentNumber(ctx context.Context) (uint64, e
bn := computeBlockNumberByMaxVote(cnResults)
return bn, nil
}
// queryAllPeersForBlockByNumber queries all connected streams for a block by its number.
func (s *StagedStreamSync) queryAllPeersForBlockByNumber(ctx context.Context, bn uint64) (*types.Block, error) {
var (
blkResults []*types.Block
lock sync.Mutex
wg sync.WaitGroup
)
wg.Add(s.config.Concurrency)
for i := 0; i != s.config.Concurrency; i++ {
go func() {
defer wg.Done()
block, stid, err := s.doGetBlockByNumberRequest(ctx, bn)
if err != nil {
s.logger.Err(err).Str("streamID", string(stid)).
Msg(WrapStagedSyncMsg("getBlockByNumber request failed"))
if !errors.Is(err, context.Canceled) {
s.protocol.StreamFailed(stid, "getBlockByNumber request failed")
}
return
}
lock.Lock()
blkResults = append(blkResults, block)
lock.Unlock()
}()
}
wg.Wait()
if len(blkResults) == 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return nil, ErrZeroBlockResponse
}
block, err := getBlockByMaxVote(blkResults)
if err != nil {
return nil, err
}
return block, nil
}

@ -16,6 +16,9 @@ var (
type status struct {
isSyncing bool
targetBN uint64
pivotBlock *types.Block
cycleSyncMode SyncMode
statesSynced bool
lock sync.Mutex
}

@ -192,6 +192,7 @@ var defaultStagedSyncConfig = harmonyconfig.StagedSyncConfig{
var (
defaultMainnetSyncConfig = harmonyconfig.SyncConfig{
Enabled: false,
SyncMode: 0,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
@ -207,6 +208,7 @@ var (
defaultTestNetSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
SyncMode: 0,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
@ -222,6 +224,7 @@ var (
defaultLocalNetSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
SyncMode: 0,
Downloader: true,
StagedSync: true,
StagedSyncCfg: defaultStagedSyncConfig,
@ -237,6 +240,7 @@ var (
defaultPartnerSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
SyncMode: 0,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
@ -252,6 +256,7 @@ var (
defaultElseSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
SyncMode: 0,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,

@ -238,6 +238,7 @@ var (
syncFlags = []cli.Flag{
syncStreamEnabledFlag,
syncModeFlag,
syncDownloaderFlag,
syncStagedSyncFlag,
syncConcurrencyFlag,
@ -1876,6 +1877,13 @@ var (
Usage: "Enable the stream sync protocol (experimental feature)",
DefValue: false,
}
syncModeFlag = cli.IntFlag{
Name: "sync.mode",
Usage: "synchronization mode of the downloader (0=FullSync, 1=FastSync, 2=SnapSync)",
DefValue: 0,
}
// TODO: Deprecate this flag, and always set to true after stream sync is fully up.
syncDownloaderFlag = cli.BoolFlag{
Name: "sync.downloader",
@ -1937,6 +1945,10 @@ func applySyncFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
config.Sync.Enabled = cli.GetBoolFlagValue(cmd, syncStreamEnabledFlag)
}
if cli.IsFlagChanged(cmd, syncModeFlag) {
config.Sync.SyncMode = uint32(cli.GetIntFlagValue(cmd, syncModeFlag))
}
if cli.IsFlagChanged(cmd, syncDownloaderFlag) {
config.Sync.Downloader = cli.GetBoolFlagValue(cmd, syncDownloaderFlag)
}

@ -1005,6 +1005,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
sConfig := stagedstreamsync.Config{
ServerOnly: !hc.Sync.Downloader,
SyncMode: stagedstreamsync.SyncMode(hc.Sync.SyncMode),
Network: nodeconfig.NetworkType(hc.Network.NetworkType),
Concurrency: hc.Sync.Concurrency,
MinStreams: hc.Sync.MinPeers,
@ -1016,7 +1017,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
SmDiscBatch: hc.Sync.DiscBatch,
UseMemDB: hc.Sync.StagedSyncCfg.UseMemDB,
LogProgress: hc.Sync.StagedSyncCfg.LogProgress,
DebugMode: hc.Sync.StagedSyncCfg.DebugMode,
DebugMode: true, // hc.Sync.StagedSyncCfg.DebugMode,
}
// If we are running side chain, we will need to do some extra works for beacon

@ -52,6 +52,11 @@ type BlockChain interface {
// CurrentBlock retrieves the current head block of the canonical chain. The
// block is retrieved from the blockchain's internal cache.
CurrentBlock() *types.Block
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
// block is retrieved from the blockchain's internal cache.
CurrentFastBlock() *types.Block
// Validator returns the current validator.
Validator() Validator
// Processor returns the current processor.
Processor() Processor
// State returns a new mutable state based on the current HEAD block.
@ -100,6 +105,20 @@ type BlockChain interface {
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
Rollback(chain []common.Hash) error
// writeHeadBlock writes a new head block
WriteHeadBlock(block *types.Block) error
// WriteBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
WriteBlockWithoutState(block *types.Block) (err error)
// WriteBlockWithState writes the block and all associated state to the database.
WriteBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt,
stakeMsgs []types2.StakeMsg,
paid reward.Reader,
state *state.DB,
) (status WriteStatus, err error)
// GetMaxGarbageCollectedBlockNumber ..
GetMaxGarbageCollectedBlockNumber() int64
// InsertChain attempts to insert the given batch of blocks in to the canonical
@ -109,7 +128,10 @@ type BlockChain interface {
//
// After insertion is done, all accumulated events will be fired.
InsertChain(chain types.Blocks, verifyHeaders bool) (int, error)
// LeaderRotationMeta returns info about leader rotation.
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error)
// LeaderRotationMeta returns the number of continuous blocks by the leader.
LeaderRotationMeta() LeaderRotationMeta
// BadBlocks returns a list of the last 'bad blocks' that
// the client has seen on the network.

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
@ -71,6 +72,7 @@ import (
var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
@ -186,6 +188,7 @@ type BlockChainImpl struct {
pendingSlashingCandidatesMU sync.RWMutex // pending slashing candidates
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
@ -319,6 +322,7 @@ func newBlockChainWithOptions(
}
var nilBlock *types.Block
bc.currentBlock.Store(nilBlock)
bc.currentFastBlock.Store(nilBlock)
if err := bc.loadLastState(); err != nil {
return nil, err
}
@ -612,8 +616,22 @@ func (bc *BlockChainImpl) loadLastState() error {
return errors.Wrap(err, "headerChain SetCurrentHeader")
}
// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
// Issue a status log for the user
currentFastBlock := bc.CurrentFastBlock()
headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number().Uint64())
blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64())
utils.Logger().Info().
Str("number", currentHeader.Number().String()).
@ -627,6 +645,12 @@ func (bc *BlockChainImpl) loadLastState() error {
Str("td", blockTd.String()).
Str("age", common.PrettyAge(time.Unix(currentBlock.Time().Int64(), 0)).String()).
Msg("Loaded most recent local full block")
utils.Logger().Info().
Str("number", currentFastBlock.Number().String()).
Str("hash", currentFastBlock.Hash().Hex()).
Str("td", fastTd.String()).
Str("age", common.PrettyAge(time.Unix(currentFastBlock.Time().Int64(), 0)).String()).
Msg("Loaded most recent local fast block")
return nil
}
@ -663,16 +687,30 @@ func (bc *BlockChainImpl) setHead(head uint64) error {
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
}
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number().Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(currentHeader.Hash(), currentHeader.Number().Uint64())
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
// If either blocks reached nil, reset to the genesis state
if currentBlock := bc.CurrentBlock(); currentBlock == nil {
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
if err := rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()); err != nil {
return err
}
if err := rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
return err
}
return bc.loadLastState()
}
@ -685,6 +723,17 @@ func (bc *BlockChainImpl) CurrentBlock() *types.Block {
return bc.currentBlock.Load().(*types.Block)
}
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
// chain. The block is retrieved from the blockchain's internal cache.
func (bc *BlockChainImpl) CurrentFastBlock() *types.Block {
return bc.currentFastBlock.Load().(*types.Block)
}
// Validator returns the current validator.
func (bc *BlockChainImpl) Validator() Validator {
return bc.validator
}
func (bc *BlockChainImpl) Processor() Processor {
return bc.processor
}
@ -727,6 +776,8 @@ func (bc *BlockChainImpl) resetWithGenesisBlock(genesis *types.Block) error {
}
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil
}
@ -828,6 +879,10 @@ func (bc *BlockChainImpl) ExportN(w io.Writer, first uint64, last uint64) error
return nil
}
func (bc *BlockChainImpl) WriteHeadBlock(block *types.Block) error {
return bc.writeHeadBlock(block)
}
// writeHeadBlock writes a new head block
func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
// If the block is on a side chain or an unknown one, force other heads onto it too
@ -841,6 +896,20 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
if err := rawdb.WriteHeadBlockHash(batch, block.Hash()); err != nil {
return err
}
if err := rawdb.WriteHeadHeaderHash(batch, block.Hash()); err != nil {
return err
}
isNewEpoch := block.IsLastBlockInEpoch()
if isNewEpoch {
epoch := block.Header().Epoch()
nextEpoch := epoch.Add(epoch, common.Big1)
if err := rawdb.WriteShardStateBytes(batch, nextEpoch, block.Header().ShardState()); err != nil {
utils.Logger().Error().Err(err).Msg("failed to store shard state")
return err
}
}
if err := batch.Write(); err != nil {
return err
}
@ -856,6 +925,9 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
if err := rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil {
return err
}
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
return nil
}
@ -869,6 +941,9 @@ func (bc *BlockChainImpl) tikvFastForward(block *types.Block, logs []*types.Log)
return errors.Wrap(err, "HeaderChain SetCurrentHeader")
}
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
var events []interface{}
events = append(events, ChainEvent{block, block.Hash(), logs})
events = append(events, ChainHeadEvent{block})
@ -1170,6 +1245,14 @@ func (bc *BlockChainImpl) Rollback(chain []common.Hash) error {
}
}
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
if newFastBlock != nil {
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
}
}
if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
if newBlock != nil {
@ -1192,9 +1275,191 @@ func (bc *BlockChainImpl) Rollback(chain []common.Hash) error {
return bc.removeInValidatorList(valsToRemove)
}
// SetReceiptsData computes all the non-consensus fields of the receipts
func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts types.Receipts) error {
signer := types.MakeSigner(config, block.Epoch())
ethSigner := types.NewEIP155Signer(config.EthCompatibleChainID)
transactions, stakingTransactions, logIndex := block.Transactions(), block.StakingTransactions(), uint(0)
if len(transactions)+len(stakingTransactions) != len(receipts) {
return errors.New("transaction+stakingTransactions and receipt count mismatch")
}
// The used gas can be calculated based on previous receipts
if len(receipts) > 0 && len(transactions) > 0 {
receipts[0].GasUsed = receipts[0].CumulativeGasUsed
}
for j := 1; j < len(transactions); j++ {
// The transaction hash can be retrieved from the transaction itself
receipts[j].TxHash = transactions[j].Hash()
receipts[j].GasUsed = receipts[j].CumulativeGasUsed - receipts[j-1].CumulativeGasUsed
// The contract address can be derived from the transaction itself
if transactions[j].To() == nil {
// Deriving the signer is expensive, only do if it's actually needed
var from common.Address
if transactions[j].IsEthCompatible() {
from, _ = types.Sender(ethSigner, transactions[j])
} else {
from, _ = types.Sender(signer, transactions[j])
}
receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce())
}
// The derived log fields can simply be set from the block and transaction
for k := 0; k < len(receipts[j].Logs); k++ {
receipts[j].Logs[k].BlockNumber = block.NumberU64()
receipts[j].Logs[k].BlockHash = block.Hash()
receipts[j].Logs[k].TxHash = receipts[j].TxHash
receipts[j].Logs[k].TxIndex = uint(j)
receipts[j].Logs[k].Index = logIndex
logIndex++
}
}
// The used gas can be calculated based on previous receipts
if len(receipts) > len(transactions) && len(stakingTransactions) > 0 {
receipts[len(transactions)].GasUsed = receipts[len(transactions)].CumulativeGasUsed
}
// in a block, txns are processed before staking txns
for j := len(transactions) + 1; j < len(transactions)+len(stakingTransactions); j++ {
// The transaction hash can be retrieved from the staking transaction itself
receipts[j].TxHash = stakingTransactions[j].Hash()
receipts[j].GasUsed = receipts[j].CumulativeGasUsed - receipts[j-1].CumulativeGasUsed
// The derived log fields can simply be set from the block and transaction
for k := 0; k < len(receipts[j].Logs); k++ {
receipts[j].Logs[k].BlockNumber = block.NumberU64()
receipts[j].Logs[k].BlockHash = block.Hash()
receipts[j].Logs[k].TxHash = receipts[j].TxHash
receipts[j].Logs[k].TxIndex = uint(j) + uint(len(transactions))
receipts[j].Logs[k].Index = logIndex
logIndex++
}
}
return nil
}
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
func (bc *BlockChainImpl) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(blockChain); i++ {
if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
utils.Logger().Error().
Str("number", blockChain[i].Number().String()).
Str("hash", blockChain[i].Hash().Hex()).
Str("parent", blockChain[i].ParentHash().Hex()).
Str("prevnumber", blockChain[i-1].Number().String()).
Str("prevhash", blockChain[i-1].Hash().Hex()).
Msg("Non contiguous receipt insert")
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
}
}
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
bytes = 0
batch = bc.db.NewBatch()
)
for i, block := range blockChain {
receipts := receiptChain[i]
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, fmt.Errorf("Premature abort during blocks processing")
}
// Add header if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
if err := rawdb.WriteHeader(batch, block.Header()); err != nil {
return 0, err
}
// return 0, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
}
// Skip if the entire data is already known
if bc.HasBlock(block.Hash(), block.NumberU64()) {
stats.ignored++
continue
}
// Compute all the non-consensus fields of the receipts
if err := SetReceiptsData(bc.chainConfig, block, receipts); err != nil {
return 0, fmt.Errorf("failed to set receipts data: %v", err)
}
// Write all the data out into the database
if err := rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return 0, err
}
if err := rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return 0, err
}
if err := rawdb.WriteBlockTxLookUpEntries(batch, block); err != nil {
return 0, err
}
if err := rawdb.WriteBlockStxLookUpEntries(batch, block); err != nil {
return 0, err
}
isNewEpoch := block.IsLastBlockInEpoch()
if isNewEpoch {
epoch := block.Header().Epoch()
nextEpoch := epoch.Add(epoch, common.Big1)
err := rawdb.WriteShardStateBytes(batch, nextEpoch, block.Header().ShardState())
if err != nil {
utils.Logger().Error().Err(err).Msg("failed to store shard state")
return 0, err
}
}
stats.processed++
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
}
bytes += batch.ValueSize()
batch.Reset()
}
}
if batch.ValueSize() > 0 {
bytes += batch.ValueSize()
if err := batch.Write(); err != nil {
return 0, err
}
}
// Update the head fast sync block if better
head := blockChain[len(blockChain)-1]
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
utils.Logger().Info().
Int32("count", stats.processed).
Str("elapsed", common.PrettyDuration(time.Since(start)).String()).
Str("age", common.PrettyAge(time.Unix(head.Time().Int64(), 0)).String()).
Str("head", head.Number().String()).
Str("hash", head.Hash().Hex()).
Str("size", common.StorageSize(bytes).String()).
Int32("ignored", stats.ignored).
Msg("Imported new block receipts")
return int(stats.processed), nil
}
var lastWrite uint64
func (bc *BlockChainImpl) writeBlockWithState(
func (bc *BlockChainImpl) WriteBlockWithoutState(block *types.Block) (err error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
if err := rawdb.WriteBlock(bc.db, block); err != nil {
return err
}
return nil
}
func (bc *BlockChainImpl) WriteBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt,
stakeMsgs []staking.StakeMsg,
@ -1506,7 +1771,9 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Prune in case non-empty winner chain
if len(winner) > 0 {
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChain(winner, true /* verifyHeaders */)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs
if err != nil {
@ -1583,7 +1850,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.writeBlockWithState(
status, err := bc.WriteBlockWithState(
block, receipts, cxReceipts, stakeMsgs, payout, state,
)
if err != nil {

@ -49,6 +49,10 @@ func (a Stub) CurrentBlock() *types.Block {
return nil
}
func (a Stub) CurrentFastBlock() *types.Block {
return nil
}
func (a Stub) Validator() Validator {
return nil
}
@ -120,7 +124,7 @@ func (a Stub) Rollback(chain []common.Hash) error {
return errors.Errorf("method Rollback not implemented for %s", a.Name)
}
func (a Stub) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) {
func (a Stub) WriteBlockWithoutState(block *types.Block) (err error) {
return errors.Errorf("method WriteBlockWithoutState not implemented for %s", a.Name)
}
@ -136,6 +140,10 @@ func (a Stub) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) {
return 0, errors.Errorf("method InsertChain not implemented for %s", a.Name)
}
func (a Stub) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
return 0, errors.Errorf("method InsertReceiptChain not implemented for %s", a.Name)
}
func (a Stub) BadBlocks() []BadBlock {
return nil
}

@ -597,14 +597,17 @@ func ReadLastPivotNumber(db ethdb.KeyValueReader) *uint64 {
}
// WriteLastPivotNumber stores the number of the last pivot block.
func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) {
func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) error {
enc, err := rlp.EncodeToBytes(pivot)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to encode pivot block number")
return err
}
if err := db.Put(lastPivotKey, enc); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to store pivot block number")
return err
}
return nil
}
// ReadTxIndexTail retrieves the number of oldest indexed block

@ -22,7 +22,7 @@ func ReadShardState(
data, err := db.Get(shardStateKey(epoch))
if err != nil {
return nil, errors.Errorf(
MsgNoShardStateFromDB, "epoch: %d", epoch,
MsgNoShardStateFromDB, "epoch: %d", epoch.Uint64(),
)
}
ss, err2 := shard.DecodeWrapper(data)

@ -329,6 +329,7 @@ type PrometheusConfig struct {
type SyncConfig struct {
// TODO: Remove this bool after stream sync is fully up.
Enabled bool // enable the stream sync protocol
SyncMode uint32 // sync mode (default:Full sync, 1: Fast Sync, 2: Snap Sync(not implemented yet))
Downloader bool // start the sync downloader client
StagedSync bool // use staged sync
StagedSyncCfg StagedSyncConfig // staged sync configurations

@ -171,7 +171,10 @@ func (ch *chainHelperImpl) getNodeData(hs []common.Hash) ([][]byte, error) {
entry, err = ch.chain.ValidatorCode(hash)
}
}
if err == nil && len(entry) > 0 {
if err != nil {
return nil, err
}
if len(entry) > 0 {
nodes = append(nodes, entry)
bytes += len(entry)
}
@ -196,7 +199,7 @@ func (ch *chainHelperImpl) getReceipts(hs []common.Hash) ([]types.Receipts, erro
return receipts, nil
}
// getAccountRangeRequest
// getAccountRange
func (ch *chainHelperImpl) getAccountRange(root common.Hash, origin common.Hash, limit common.Hash, bytes uint64) ([]*message.AccountData, [][]byte, error) {
if bytes > softResponseLimit {
bytes = softResponseLimit

@ -184,7 +184,7 @@ func (p *Protocol) GetNodeData(ctx context.Context, hs []common.Hash, opts ...Op
// GetAccountRange do getAccountRange through sync stream protocol.
// returns the accounts along with proofs as result, target stream id, and error
func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (accounts []*message.AccountData, proof []common.Hash, stid sttypes.StreamID, err error) {
func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error) {
timer := p.doMetricClientRequest("getAccountRange")
defer p.doMetricPostClientRequest("getAccountRange", err, timer)
@ -207,7 +207,7 @@ func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin
// GetStorageRanges do getStorageRanges through sync stream protocol.
// returns the slots along with proofs as result, target stream id, and error
func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (slots []*message.StorageData, proof []common.Hash, stid sttypes.StreamID, err error) {
func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error) {
timer := p.doMetricClientRequest("getStorageRanges")
defer p.doMetricPostClientRequest("getStorageRanges", err, timer)
@ -233,11 +233,9 @@ func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accou
if err != nil {
return
}
slots = make([]*message.StorageData, 0)
slots = make([][]*message.StorageData, 0)
for _, storage := range storages {
for _, data := range storage.Data {
slots = append(slots, data)
}
slots = append(slots, storage.Data)
}
return
}
@ -735,8 +733,7 @@ func (req *getAccountRangeRequest) Encode() ([]byte, error) {
return protobuf.Marshal(msg)
}
// []*message.AccountData, []common.Hash
func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Response) ([]*message.AccountData, []common.Hash, error) {
func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Response) ([]*message.AccountData, [][]byte, error) {
sResp, ok := resp.(*syncResponse)
if !ok || sResp == nil {
return nil, nil, errors.New("not sync response")
@ -744,7 +741,7 @@ func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Resp
return req.parseGetAccountRangeResponse(sResp)
}
func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncResponse) ([]*message.AccountData, []common.Hash, error) {
func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncResponse) ([]*message.AccountData, [][]byte, error) {
if errResp := resp.pb.GetErrorResponse(); errResp != nil {
return nil, nil, errors.New(errResp.Error)
}
@ -752,9 +749,9 @@ func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncRespon
if grResp == nil {
return nil, nil, errors.New("response not GetAccountRange")
}
proofs := make([]common.Hash, 0)
proofs := make([][]byte, 0)
for _, proofBytes := range grResp.Proof {
var proof common.Hash
var proof []byte
if err := rlp.DecodeBytes(proofBytes, &proof); err != nil {
return nil, nil, errors.Wrap(err, "[GetAccountRangeResponse]")
}
@ -817,7 +814,7 @@ func (req *getStorageRangesRequest) Encode() ([]byte, error) {
}
// []*message.AccountData, []common.Hash
func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Response) ([]*message.StoragesData, []common.Hash, error) {
func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Response) ([]*message.StoragesData, [][]byte, error) {
sResp, ok := resp.(*syncResponse)
if !ok || sResp == nil {
return nil, nil, errors.New("not sync response")
@ -825,7 +822,7 @@ func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Re
return req.parseGetStorageRangesResponse(sResp)
}
func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResponse) ([]*message.StoragesData, []common.Hash, error) {
func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResponse) ([]*message.StoragesData, [][]byte, error) {
if errResp := resp.pb.GetErrorResponse(); errResp != nil {
return nil, nil, errors.New(errResp.Error)
}
@ -833,9 +830,9 @@ func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResp
if grResp == nil {
return nil, nil, errors.New("response not GetStorageRanges")
}
proofs := make([]common.Hash, 0)
proofs := make([][]byte, 0)
for _, proofBytes := range grResp.Proof {
var proof common.Hash
var proof []byte
if err := rlp.DecodeBytes(proofBytes, &proof); err != nil {
return nil, nil, errors.Wrap(err, "[GetStorageRangesResponse]")
}

Loading…
Cancel
Save