From d442ba9efc469b2626fa4fd69d1bcd4d41c2ce97 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 26 Oct 2021 11:04:38 -0700 Subject: [PATCH] Cache block processing results and reuse during block commitment to db (#3905) * Cache block processing results and reuse during block commitment to db * fix comments --- core/blockchain.go | 11 ++++--- core/state_processor.go | 73 ++++++++++++++++++++++++++++++++--------- core/types.go | 7 ++-- hmy/tracer.go | 4 +-- node/node_newblock.go | 4 +++ node/worker/worker.go | 44 ++++++++++++++++++------- 6 files changed, 107 insertions(+), 36 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index a4686a4d7..df67b784a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -258,15 +258,15 @@ func NewBlockChain( // ValidateNewBlock validates new block. func (bc *BlockChain) ValidateNewBlock(block *types.Block) error { state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache) - if err != nil { return err } // NOTE Order of mutating state here matters. // Process block using the parent state as reference point. - receipts, cxReceipts, _, usedGas, _, err := bc.processor.Process( - block, state, bc.vmConfig, + // Do not read cache from processor. + receipts, cxReceipts, _, usedGas, _, _, err := bc.processor.Process( + block, state, bc.vmConfig, false, ) if err != nil { bc.reportBlock(block, receipts, err) @@ -1448,9 +1448,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int, } // Process block using the parent state as reference point. - receipts, cxReceipts, logs, usedGas, payout, err := bc.processor.Process( - block, state, bc.vmConfig, + receipts, cxReceipts, logs, usedGas, payout, newState, err := bc.processor.Process( + block, state, bc.vmConfig, true, ) + state = newState // update state in case the new state is cached. if err != nil { bc.reportBlock(block, receipts, err) return i, events, coalescedLogs, err diff --git a/core/state_processor.go b/core/state_processor.go index d89e8ce21..dafe81cd9 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -20,6 +20,8 @@ import ( "math/big" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" @@ -37,24 +39,40 @@ import ( "github.com/pkg/errors" ) +const ( + resultCacheLimit = 64 // The number of cached results from processing blocks +) + // StateProcessor is a basic Processor, which takes care of transitioning // state from one point to another. // // StateProcessor implements Processor. type StateProcessor struct { - config *params.ChainConfig // Chain configuration options - bc *BlockChain // Canonical block chain - engine consensus_engine.Engine // Consensus engine used for block rewards + config *params.ChainConfig // Chain configuration options + bc *BlockChain // Canonical block chain + engine consensus_engine.Engine // Consensus engine used for block rewards + resultCache *lru.Cache // Cache for result after a certain block is processed +} + +type ProcessorResult struct { + Receipts types.Receipts + CxReceipts types.CXReceipts + Logs []*types.Log + UsedGas uint64 + Reward reward.Reader + State *state.DB } // NewStateProcessor initialises a new StateProcessor. func NewStateProcessor( config *params.ChainConfig, bc *BlockChain, engine consensus_engine.Engine, ) *StateProcessor { + resultCache, _ := lru.New(resultCacheLimit) return &StateProcessor{ - config: config, - bc: bc, - engine: engine, + config: config, + bc: bc, + engine: engine, + resultCache: resultCache, } } @@ -66,11 +84,22 @@ func NewStateProcessor( // returns the amount of gas that was used in the process. If any of the // transactions failed to execute due to insufficient gas it will return an error. func (p *StateProcessor) Process( - block *types.Block, statedb *state.DB, cfg vm.Config, + block *types.Block, statedb *state.DB, cfg vm.Config, readCache bool, ) ( types.Receipts, types.CXReceipts, - []*types.Log, uint64, reward.Reader, error, + []*types.Log, uint64, reward.Reader, *state.DB, error, ) { + cacheKey := block.Hash() + if readCache { + if cached, ok := p.resultCache.Get(cacheKey); ok { + // Return the cached result to avoid process the same block again. + // Only the successful results are cached in case for retry. + result := cached.(*ProcessorResult) + utils.Logger().Info().Str("block num", block.Number().String()).Msg("result cache hit.") + return result.Receipts, result.CxReceipts, result.Logs, result.UsedGas, result.Reward, result.State, nil + } + } + var ( receipts types.Receipts outcxs types.CXReceipts @@ -83,7 +112,7 @@ func (p *StateProcessor) Process( beneficiary, err := p.bc.GetECDSAFromCoinbase(header) if err != nil { - return nil, nil, nil, 0, nil, err + return nil, nil, nil, 0, nil, statedb, err } startTime := time.Now() @@ -94,7 +123,7 @@ func (p *StateProcessor) Process( p.config, p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg, ) if err != nil { - return nil, nil, nil, 0, nil, err + return nil, nil, nil, 0, nil, statedb, err } receipts = append(receipts, receipt) if cxReceipt != nil { @@ -113,7 +142,7 @@ func (p *StateProcessor) Process( p.config, p.bc, &beneficiary, gp, statedb, header, tx, usedGas, cfg, ) if err != nil { - return nil, nil, nil, 0, nil, err + return nil, nil, nil, 0, nil, statedb, err } receipts = append(receipts, receipt) allLogs = append(allLogs, receipt.Logs...) @@ -127,14 +156,14 @@ func (p *StateProcessor) Process( p.config, statedb, header, cx, ); err != nil { return nil, nil, - nil, 0, nil, errors.New("[Process] Cannot apply incoming receipts") + nil, 0, nil, statedb, errors.New("[Process] Cannot apply incoming receipts") } } slashes := slash.Records{} if s := header.Slashes(); len(s) > 0 { if err := rlp.DecodeBytes(s, &slashes); err != nil { - return nil, nil, nil, 0, nil, errors.New( + return nil, nil, nil, 0, nil, statedb, errors.New( "[Process] Cannot finalize block", ) } @@ -151,10 +180,24 @@ func (p *StateProcessor) Process( receipts, outcxs, incxs, block.StakingTransactions(), slashes, sigsReady, func() uint64 { return header.ViewID().Uint64() }, ) if err != nil { - return nil, nil, nil, 0, nil, errors.New("[Process] Cannot finalize block") + return nil, nil, nil, 0, nil, statedb, errors.New("[Process] Cannot finalize block") } - return receipts, outcxs, allLogs, *usedGas, payout, nil + result := &ProcessorResult{ + Receipts: receipts, + CxReceipts: outcxs, + Logs: allLogs, + UsedGas: *usedGas, + Reward: payout, + State: statedb, + } + p.resultCache.Add(cacheKey, result) + return receipts, outcxs, allLogs, *usedGas, payout, statedb, nil +} + +// CacheProcessorResult caches the process result on the cache key. +func (p *StateProcessor) CacheProcessorResult(cacheKey interface{}, result *ProcessorResult) { + p.resultCache.Add(cacheKey, result) } // return true if it is valid diff --git a/core/types.go b/core/types.go index 8a6282dba..1f89096d5 100644 --- a/core/types.go +++ b/core/types.go @@ -54,9 +54,12 @@ type Validator interface { // initial state is based. It should return the receipts generated, amount // of gas used in the process and return an error if any of the internal rules // failed. +// Process will cache the result of successfully processed blocks. +// readCache decides whether the method will try reading from result cache. type Processor interface { - Process(block *types.Block, statedb *state.DB, cfg vm.Config) ( + Process(block *types.Block, statedb *state.DB, cfg vm.Config, readCache bool) ( types.Receipts, types.CXReceipts, - []*types.Log, uint64, reward.Reader, error, + []*types.Log, uint64, reward.Reader, *state.DB, error, ) + CacheProcessorResult(cacheKey interface{}, result *ProcessorResult) } diff --git a/hmy/tracer.go b/hmy/tracer.go index 8d9c508ca..5f292e06c 100644 --- a/hmy/tracer.go +++ b/hmy/tracer.go @@ -280,7 +280,7 @@ func (hmy *Harmony) TraceChain(ctx context.Context, start, end *types.Block, con traced += uint64(len(txs)) } // Generate the next state snapshot fast without tracing - _, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{}) + _, _, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{}, false) if err != nil { failed = err break @@ -674,7 +674,7 @@ func (hmy *Harmony) ComputeStateDB(block *types.Block, reexec uint64) (*state.DB if block = hmy.BlockChain.GetBlockByNumber(block.NumberU64() + 1); block == nil { return nil, fmt.Errorf("block #%d not found", block.NumberU64()+1) } - _, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{}) + _, _, _, _, _, _, err := hmy.BlockChain.Processor().Process(block, statedb, vm.Config{}, false) if err != nil { return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err) } diff --git a/node/node_newblock.go b/node/node_newblock.go index c29d8cca2..8ceac2d4b 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -303,6 +303,10 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error) utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Failed verifying the new block header") return nil, err } + + // Save process result in the cache for later use for faster block commitment to db. + result := node.Worker.GetCurrentResult() + node.Blockchain().Processor().CacheProcessorResult(finalizedBlock.Hash(), result) return finalizedBlock, nil } diff --git a/node/worker/worker.go b/node/worker/worker.go index cee42f03b..cd023328f 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -7,6 +7,8 @@ import ( "sort" "time" + "github.com/harmony-one/harmony/consensus/reward" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/crypto/bls" @@ -41,6 +43,8 @@ type environment struct { txs []*types.Transaction stakingTxs []*staking.StakingTransaction receipts []*types.Receipt + logs []*types.Log + reward reward.Reader outcxs []*types.CXReceipt // cross shard transaction receipts (source shard) incxs []*types.CXReceiptsProof // cross shard receipts and its proof (desitinatin shard) slashes slash.Records @@ -97,7 +101,7 @@ func (w *Worker) CommitSortedTransactions( // Start executing the transaction w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs)) - _, err := w.commitTransaction(tx, coinbase) + err := w.commitTransaction(tx, coinbase) sender, _ := common2.AddressToBech32(from) switch err { @@ -161,7 +165,7 @@ func (w *Worker) CommitTransactions( // Start executing the transaction w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs)+len(w.current.stakingTxs)) // THESE CODE ARE DUPLICATED AS ABOVE>> - if _, err := w.commitStakingTransaction(tx, coinbase); err != nil { + if err := w.commitStakingTransaction(tx, coinbase); err != nil { txID := tx.Hash().Hex() utils.Logger().Error().Err(err). Str("stakingTxID", txID). @@ -186,7 +190,7 @@ func (w *Worker) CommitTransactions( func (w *Worker) commitStakingTransaction( tx *staking.StakingTransaction, coinbase common.Address, -) ([]*types.Log, error) { +) error { snap := w.current.state.Snapshot() gasUsed := w.current.header.GasUsed() receipt, _, err := core.ApplyStakingTransaction( @@ -199,15 +203,17 @@ func (w *Worker) commitStakingTransaction( utils.Logger().Error(). Err(err).Interface("stkTxn", tx). Msg("Staking transaction failed commitment") - return nil, err + return err } if receipt == nil { - return nil, fmt.Errorf("nil staking receipt") + return fmt.Errorf("nil staking receipt") } w.current.stakingTxs = append(w.current.stakingTxs, tx) w.current.receipts = append(w.current.receipts, receipt) - return receipt.Logs, nil + w.current.logs = append(w.current.logs, receipt.Logs...) + + return nil } var ( @@ -216,7 +222,7 @@ var ( func (w *Worker) commitTransaction( tx *types.Transaction, coinbase common.Address, -) ([]*types.Log, error) { +) error { snap := w.current.state.Snapshot() gasUsed := w.current.header.GasUsed() receipt, cx, _, err := core.ApplyTransaction( @@ -236,20 +242,21 @@ func (w *Worker) commitTransaction( utils.Logger().Error(). Err(err).Interface("txn", tx). Msg("Transaction failed commitment") - return nil, errNilReceipt + return errNilReceipt } if receipt == nil { utils.Logger().Warn().Interface("tx", tx).Interface("cx", cx).Msg("Receipt is Nil!") - return nil, errNilReceipt + return errNilReceipt } w.current.txs = append(w.current.txs, tx) w.current.receipts = append(w.current.receipts, receipt) + w.current.logs = append(w.current.logs, receipt.Logs...) if cx != nil { w.current.outcxs = append(w.current.outcxs, cx) } - return receipt.Logs, nil + return nil } // CommitReceipts commits a list of already verified incoming cross shard receipts @@ -316,6 +323,18 @@ func (w *Worker) makeCurrent(parent *types.Block, header *block.Header) error { return nil } +// GetCurrentResult gets the current block processing result. +func (w *Worker) GetCurrentResult() *core.ProcessorResult { + return &core.ProcessorResult{ + Receipts: w.current.receipts, + CxReceipts: w.current.outcxs, + Logs: w.current.logs, + UsedGas: w.current.header.GasUsed(), + Reward: w.current.reward, + State: w.current.state, + } +} + // GetCurrentState gets the current state. func (w *Worker) GetCurrentState() *state.DB { return w.current.state @@ -497,7 +516,7 @@ func (w *Worker) FinalizeNewBlock( return nil, err } } - state := w.current.state.Copy() + state := w.current.state copyHeader := types.CopyHeader(w.current.header) sigsReady := make(chan bool) @@ -524,7 +543,7 @@ func (w *Worker) FinalizeNewBlock( } }() - block, _, err := w.engine.Finalize( + block, payout, err := w.engine.Finalize( w.chain, copyHeader, state, w.current.txs, w.current.receipts, w.current.outcxs, w.current.incxs, w.current.stakingTxs, w.current.slashes, sigsReady, viewID, @@ -532,6 +551,7 @@ func (w *Worker) FinalizeNewBlock( if err != nil { return nil, errors.Wrapf(err, "cannot finalize block") } + w.current.reward = payout return block, nil }