refactor get receipts stage to use insertReceiptsChain

pull/4465/head
“GheisMohammadi” 1 year ago
parent 8f818100a7
commit 57a77ab0f1
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 2
      api/service/stagedstreamsync/receipt_download_manager.go
  2. 85
      api/service/stagedstreamsync/stage_receipts.go
  3. 4
      api/service/stagedstreamsync/stage_state.go

@ -77,7 +77,7 @@ func (rdm *receiptDownloadManager) HandleRequestError(bns []uint64, err error, s
} }
// HandleRequestResult handles get receipts result // HandleRequestResult handles get receipts result
func (rdm *receiptDownloadManager) HandleRequestResult(bns []uint64, receipts []*types.Receipt, loopID int, streamID sttypes.StreamID) error { func (rdm *receiptDownloadManager) HandleRequestResult(bns []uint64, receipts []types.Receipts, loopID int, streamID sttypes.StreamID) error {
rdm.lock.Lock() rdm.lock.Lock()
defer rdm.lock.Unlock() defer rdm.lock.Unlock()

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
@ -89,6 +90,22 @@ func (b *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR
// size := uint64(0) // size := uint64(0)
startTime := time.Now() startTime := time.Now()
// startBlock := currProgress // startBlock := currProgress
// prepare db transactions
txs := make([]kv.RwTx, b.configs.concurrency)
for i := 0; i < b.configs.concurrency; i++ {
txs[i], err = b.configs.blockDBs[i].BeginRw(ctx)
if err != nil {
return err
}
}
defer func() {
for i := 0; i < b.configs.concurrency; i++ {
txs[i].Rollback()
}
}()
if b.configs.logProgress { if b.configs.logProgress {
fmt.Print("\033[s") // save the cursor position fmt.Print("\033[s") // save the cursor position
} }
@ -110,7 +127,7 @@ func (b *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR
for i := 0; i != s.state.config.Concurrency; i++ { for i := 0; i != s.state.config.Concurrency; i++ {
wg.Add(1) wg.Add(1)
go b.runReceiptWorkerLoop(ctx, s.state.rdm, &wg, i, s, startTime) go b.runReceiptWorkerLoop(ctx, s.state.rdm, &wg, i, s, txs, startTime)
} }
wg.Wait() wg.Wait()
@ -125,9 +142,10 @@ func (b *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockR
} }
// runReceiptWorkerLoop creates a work loop for download receipts // runReceiptWorkerLoop creates a work loop for download receipts
func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDownloadManager, wg *sync.WaitGroup, loopID int, s *StageState, startTime time.Time) { func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDownloadManager, wg *sync.WaitGroup, loopID int, s *StageState, txs []kv.RwTx, startTime time.Time) {
currentBlock := int(b.configs.bc.CurrentBlock().NumberU64()) currentBlock := int(b.configs.bc.CurrentBlock().NumberU64())
gbm := s.state.gbm
defer wg.Done() defer wg.Done()
@ -137,6 +155,7 @@ func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDo
return return
default: default:
} }
// get next batch of block numbers
batch := rdm.GetNextBatch() batch := rdm.GetNextBatch()
if len(batch) == 0 { if len(batch) == 0 {
select { select {
@ -146,16 +165,43 @@ func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDo
return return
} }
} }
// retrieve corresponding blocks from cache db
var hashes []common.Hash var hashes []common.Hash
var blocks []*types.Block
for _, bn := range batch { for _, bn := range batch {
/* blkKey := marshalData(bn)
// TODO: check if we can directly use bc rather than receipt hashes map loopID, _ := gbm.GetDownloadDetails(bn)
header := b.configs.bc.GetHeaderByNumber(bn) blockBytes, err := txs[loopID].GetOne(BlocksBucket, blkKey)
hashes = append(hashes, header.ReceiptHash()) if err != nil {
*/ return
receiptHash := s.state.currentCycle.ReceiptHashes[bn] }
hashes = append(hashes, receiptHash) sigBytes, err := txs[loopID].GetOne(BlockSignaturesBucket, blkKey)
if err != nil {
return
}
sz := len(blockBytes)
if sz <= 1 {
return
}
var block *types.Block
if err := rlp.DecodeBytes(blockBytes, &block); err != nil {
return
} }
if sigBytes != nil {
block.SetCurrentCommitSig(sigBytes)
}
if block.NumberU64() != bn {
return
}
if block.Header().ReceiptHash() == emptyHash {
return
}
// receiptHash := s.state.currentCycle.ReceiptHashes[bn]
hashes = append(hashes, block.Header().ReceiptHash())
blocks = append(blocks, block)
}
// download receipts
receipts, stid, err := b.downloadReceipts(ctx, hashes) receipts, stid, err := b.downloadReceipts(ctx, hashes)
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) {
@ -176,7 +222,17 @@ func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDo
err := errors.New("downloadRawBlocks received empty reciptBytes") err := errors.New("downloadRawBlocks received empty reciptBytes")
rdm.HandleRequestError(batch, err, stid) rdm.HandleRequestError(batch, err, stid)
} else { } else {
// insert block and receipts to chain
if inserted, err := b.configs.bc.InsertReceiptChain(blocks, receipts); err != nil {
} else {
if inserted != len(blocks) {
}
}
rdm.HandleRequestResult(batch, receipts, loopID, stid) rdm.HandleRequestResult(batch, receipts, loopID, stid)
if b.configs.logProgress { if b.configs.logProgress {
//calculating block download speed //calculating block download speed
dt := time.Now().Sub(startTime).Seconds() dt := time.Now().Sub(startTime).Seconds()
@ -193,7 +249,7 @@ func (b *StageReceipts) runReceiptWorkerLoop(ctx context.Context, rdm *receiptDo
} }
} }
func (b *StageReceipts) downloadReceipts(ctx context.Context, hs []common.Hash) ([]*types.Receipt, sttypes.StreamID, error) { func (b *StageReceipts) downloadReceipts(ctx context.Context, hs []common.Hash) ([]types.Receipts, sttypes.StreamID, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
@ -207,14 +263,7 @@ func (b *StageReceipts) downloadReceipts(ctx context.Context, hs []common.Hash)
return receipts, stid, nil return receipts, stid, nil
} }
func (b *StageReceipts) downloadRawBlocks(ctx context.Context, bns []uint64) ([][]byte, [][]byte, sttypes.StreamID, error) { func validateGetReceiptsResult(requested []common.Hash, result []types.Receipts) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return b.configs.protocol.GetRawBlocksByNumber(ctx, bns)
}
func validateGetReceiptsResult(requested []common.Hash, result []*types.Receipt) error {
// TODO: validate each receipt here // TODO: validate each receipt here
return nil return nil

@ -174,10 +174,6 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
return err return err
} }
// TODO: only for fast sync
// add receipt hash for next stage
s.state.currentCycle.ReceiptHashes[block.NumberU64()] = block.Header().ReceiptHash()
if invalidBlockRevert { if invalidBlockRevert {
if s.state.invalidBlock.Number == i { if s.state.invalidBlock.Number == i {
s.state.invalidBlock.resolve() s.state.invalidBlock.resolve()

Loading…
Cancel
Save