Cache block processing results and reuse during block commitment to db (#3905)

* Cache block processing results and reuse during block commitment to db

* fix comments
pull/3912/head
Rongjian Lan 3 years ago committed by GitHub
parent c317881bbc
commit d442ba9efc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      core/blockchain.go
  2. 73
      core/state_processor.go
  3. 7
      core/types.go
  4. 4
      hmy/tracer.go
  5. 4
      node/node_newblock.go
  6. 44
      node/worker/worker.go

@ -258,15 +258,15 @@ func NewBlockChain(
// ValidateNewBlock validates new block.
func (bc *BlockChain) ValidateNewBlock(block *types.Block) error {
state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache)
if err != nil {
return err
}
// NOTE Order of mutating state here matters.
// Process block using the parent state as reference point.
receipts, cxReceipts, _, usedGas, _, err := bc.processor.Process(
block, state, bc.vmConfig,
// Do not read cache from processor.
receipts, cxReceipts, _, usedGas, _, _, err := bc.processor.Process(
block, state, bc.vmConfig, false,
)
if err != nil {
bc.reportBlock(block, receipts, err)
@ -1448,9 +1448,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
}
// Process block using the parent state as reference point.
receipts, cxReceipts, logs, usedGas, payout, err := bc.processor.Process(
block, state, bc.vmConfig,
receipts, cxReceipts, logs, usedGas, payout, newState, err := bc.processor.Process(
block, state, bc.vmConfig, true,
)
state = newState // update state in case the new state is cached.
if err != nil {
bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err

@ -20,6 +20,8 @@ import (
"math/big"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
@ -37,24 +39,40 @@ import (
"github.com/pkg/errors"
)
const (
resultCacheLimit = 64 // The number of cached results from processing blocks
)
// StateProcessor is a basic Processor, which takes care of transitioning
// state from one point to another.
//
// StateProcessor implements Processor.
type StateProcessor struct {
config *params.ChainConfig // Chain configuration options
bc *BlockChain // Canonical block chain
engine consensus_engine.Engine // Consensus engine used for block rewards
config *params.ChainConfig // Chain configuration options
bc *BlockChain // Canonical block chain
engine consensus_engine.Engine // Consensus engine used for block rewards
resultCache *lru.Cache // Cache for result after a certain block is processed
}
type ProcessorResult struct {
Receipts types.Receipts
CxReceipts types.CXReceipts
Logs []*types.Log
UsedGas uint64
Reward reward.Reader
State *state.DB
}
// NewStateProcessor initialises a new StateProcessor.
func NewStateProcessor(
config *params.ChainConfig, bc *BlockChain, engine consensus_engine.Engine,
) *StateProcessor {
resultCache, _ := lru.New(resultCacheLimit)
return &StateProcessor{
config: config,
bc: bc,
engine: engine,
config: config,
bc: bc,
engine: engine,
resultCache: resultCache,
}
}
@ -66,11 +84,22 @@ func NewStateProcessor(
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(
block *types.Block, statedb *state.DB, cfg vm.Config,
block *types.Block, statedb *state.DB, cfg vm.Config, readCache bool,
) (
types.Receipts, types.CXReceipts,
[]*types.Log, uint64, reward.Reader, error,
[]*types.Log, uint64, reward.Reader, *state.DB, error,
) {
cacheKey := block.Hash()
if readCache {
if cached, ok := p.resultCache.Get(cacheKey); ok {
// Return the cached result to avoid process the same block again.
// Only the successful results are cached in case for retry.
result := cached.(*ProcessorResult)
utils.Logger().Info().Str("block num", block.Number().String()).Msg("result cache hit.")
return result.Receipts, result.CxReceipts, result.Logs, result.UsedGas, result.Reward, result.State, nil
}
}
var (
receipts types.Receipts
outcxs types.CXReceipts
@ -83,7 +112,7 @@ func (p *StateProcessor) Process(
beneficiary, err := p.bc.GetECDSAFromCoinbase(header)
if err != nil {
return nil, nil, nil, 0, nil, err
return nil, nil, nil, 0, nil, statedb, err
}
startTime := time.Now()
@ -94,7 +123,7 @@ func (p *StateProcessor) Process(
p.config, p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, 0, nil, err
return nil, nil, nil, 0, nil, statedb, err
}
receipts = append(receipts, receipt)
if cxReceipt != nil {
@ -113,7 +142,7 @@ func (p *StateProcessor) Process(
p.config, p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg,
)
if err != nil {
return nil, nil, nil, 0, nil, err
return nil, nil, nil, 0, nil, statedb, err
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
@ -127,14 +156,14 @@ func (p *StateProcessor) Process(
p.config, statedb, header, cx,
); err != nil {
return nil, nil,
nil, 0, nil, errors.New("[Process] Cannot apply incoming receipts")
nil, 0, nil, statedb, errors.New("[Process] Cannot apply incoming receipts")
}
}
slashes := slash.Records{}
if s := header.Slashes(); len(s) > 0 {
if err := rlp.DecodeBytes(s, &slashes); err != nil {
return nil, nil, nil, 0, nil, errors.New(
return nil, nil, nil, 0, nil, statedb, errors.New(
"[Process] Cannot finalize block",
)
}
@ -151,10 +180,24 @@ func (p *StateProcessor) Process(
receipts, outcxs, incxs, block.StakingTransactions(), slashes, sigsReady, func() uint64 { return header.ViewID().Uint64() },
)
if err != nil {
return nil, nil, nil, 0, nil, errors.New("[Process] Cannot finalize block")
return nil, nil, nil, 0, nil, statedb, errors.New("[Process] Cannot finalize block")
}
return receipts, outcxs, allLogs, *usedGas, payout, nil
result := &ProcessorResult{
Receipts: receipts,
CxReceipts: outcxs,
Logs: allLogs,
UsedGas: *usedGas,
Reward: payout,
State: statedb,
}
p.resultCache.Add(cacheKey, result)
return receipts, outcxs, allLogs, *usedGas, payout, statedb, nil
}
// CacheProcessorResult caches the process result on the cache key.
func (p *StateProcessor) CacheProcessorResult(cacheKey interface{}, result *ProcessorResult) {
p.resultCache.Add(cacheKey, result)
}
// return true if it is valid

@ -54,9 +54,12 @@ type Validator interface {
// initial state is based. It should return the receipts generated, amount
// of gas used in the process and return an error if any of the internal rules
// failed.
// Process will cache the result of successfully processed blocks.
// readCache decides whether the method will try reading from result cache.
type Processor interface {
Process(block *types.Block, statedb *state.DB, cfg vm.Config) (
Process(block *types.Block, statedb *state.DB, cfg vm.Config, readCache bool) (
types.Receipts, types.CXReceipts,
[]*types.Log, uint64, reward.Reader, error,
[]*types.Log, uint64, reward.Reader, *state.DB, error,
)
CacheProcessorResult(cacheKey interface{}, result *ProcessorResult)
}

@ -280,7 +280,7 @@ func (hmy *Harmony) TraceChain(ctx context.Context, start, end *types.Block, con
traced += uint64(len(txs))
}
// Generate the next state snapshot fast without tracing
_, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{})
_, _, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{}, false)
if err != nil {
failed = err
break
@ -674,7 +674,7 @@ func (hmy *Harmony) ComputeStateDB(block *types.Block, reexec uint64) (*state.DB
if block = hmy.BlockChain.GetBlockByNumber(block.NumberU64() + 1); block == nil {
return nil, fmt.Errorf("block #%d not found", block.NumberU64()+1)
}
_, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{})
_, _, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{}, false)
if err != nil {
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
}

@ -303,6 +303,10 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error)
utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Failed verifying the new block header")
return nil, err
}
// Save process result in the cache for later use for faster block commitment to db.
result := node.Worker.GetCurrentResult()
node.Blockchain().Processor().CacheProcessorResult(finalizedBlock.Hash(), result)
return finalizedBlock, nil
}

@ -7,6 +7,8 @@ import (
"sort"
"time"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto/bls"
@ -41,6 +43,8 @@ type environment struct {
txs []*types.Transaction
stakingTxs []*staking.StakingTransaction
receipts []*types.Receipt
logs []*types.Log
reward reward.Reader
outcxs []*types.CXReceipt // cross shard transaction receipts (source shard)
incxs []*types.CXReceiptsProof // cross shard receipts and its proof (desitinatin shard)
slashes slash.Records
@ -97,7 +101,7 @@ func (w *Worker) CommitSortedTransactions(
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs))
_, err := w.commitTransaction(tx, coinbase)
err := w.commitTransaction(tx, coinbase)
sender, _ := common2.AddressToBech32(from)
switch err {
@ -161,7 +165,7 @@ func (w *Worker) CommitTransactions(
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs)+len(w.current.stakingTxs))
// THESE CODE ARE DUPLICATED AS ABOVE>>
if _, err := w.commitStakingTransaction(tx, coinbase); err != nil {
if err := w.commitStakingTransaction(tx, coinbase); err != nil {
txID := tx.Hash().Hex()
utils.Logger().Error().Err(err).
Str("stakingTxID", txID).
@ -186,7 +190,7 @@ func (w *Worker) CommitTransactions(
func (w *Worker) commitStakingTransaction(
tx *staking.StakingTransaction, coinbase common.Address,
) ([]*types.Log, error) {
) error {
snap := w.current.state.Snapshot()
gasUsed := w.current.header.GasUsed()
receipt, _, err := core.ApplyStakingTransaction(
@ -199,15 +203,17 @@ func (w *Worker) commitStakingTransaction(
utils.Logger().Error().
Err(err).Interface("stkTxn", tx).
Msg("Staking transaction failed commitment")
return nil, err
return err
}
if receipt == nil {
return nil, fmt.Errorf("nil staking receipt")
return fmt.Errorf("nil staking receipt")
}
w.current.stakingTxs = append(w.current.stakingTxs, tx)
w.current.receipts = append(w.current.receipts, receipt)
return receipt.Logs, nil
w.current.logs = append(w.current.logs, receipt.Logs...)
return nil
}
var (
@ -216,7 +222,7 @@ var (
func (w *Worker) commitTransaction(
tx *types.Transaction, coinbase common.Address,
) ([]*types.Log, error) {
) error {
snap := w.current.state.Snapshot()
gasUsed := w.current.header.GasUsed()
receipt, cx, _, err := core.ApplyTransaction(
@ -236,20 +242,21 @@ func (w *Worker) commitTransaction(
utils.Logger().Error().
Err(err).Interface("txn", tx).
Msg("Transaction failed commitment")
return nil, errNilReceipt
return errNilReceipt
}
if receipt == nil {
utils.Logger().Warn().Interface("tx", tx).Interface("cx", cx).Msg("Receipt is Nil!")
return nil, errNilReceipt
return errNilReceipt
}
w.current.txs = append(w.current.txs, tx)
w.current.receipts = append(w.current.receipts, receipt)
w.current.logs = append(w.current.logs, receipt.Logs...)
if cx != nil {
w.current.outcxs = append(w.current.outcxs, cx)
}
return receipt.Logs, nil
return nil
}
// CommitReceipts commits a list of already verified incoming cross shard receipts
@ -316,6 +323,18 @@ func (w *Worker) makeCurrent(parent *types.Block, header *block.Header) error {
return nil
}
// GetCurrentResult gets the current block processing result.
func (w *Worker) GetCurrentResult() *core.ProcessorResult {
return &core.ProcessorResult{
Receipts: w.current.receipts,
CxReceipts: w.current.outcxs,
Logs: w.current.logs,
UsedGas: w.current.header.GasUsed(),
Reward: w.current.reward,
State: w.current.state,
}
}
// GetCurrentState gets the current state.
func (w *Worker) GetCurrentState() *state.DB {
return w.current.state
@ -497,7 +516,7 @@ func (w *Worker) FinalizeNewBlock(
return nil, err
}
}
state := w.current.state.Copy()
state := w.current.state
copyHeader := types.CopyHeader(w.current.header)
sigsReady := make(chan bool)
@ -524,7 +543,7 @@ func (w *Worker) FinalizeNewBlock(
}
}()
block, _, err := w.engine.Finalize(
block, payout, err := w.engine.Finalize(
w.chain, copyHeader, state, w.current.txs, w.current.receipts,
w.current.outcxs, w.current.incxs, w.current.stakingTxs,
w.current.slashes, sigsReady, viewID,
@ -532,6 +551,7 @@ func (w *Worker) FinalizeNewBlock(
if err != nil {
return nil, errors.Wrapf(err, "cannot finalize block")
}
w.current.reward = payout
return block, nil
}

Loading…
Cancel
Save