Snapshot integration and add cache to statedb (#4419)

* integrate snapshot feature with statedb and add cache to states

* send nil as stateCache to let NewDatabase function decides about cache configs

* fix test issues of using snapshot
pull/4443/head
Gheis Mohammadi 1 year ago committed by GitHub
parent 6a2f9cd4c7
commit 61b8aba82c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 116
      core/blockchain_impl.go
  2. 3
      core/evm_test.go
  3. 5
      core/state/database.go
  4. 2
      core/state/snapshot/snapshot.go
  5. 2
      core/state/statedb.go
  6. 3
      core/tx_pool_test.go
  7. 5
      internal/shardchain/shardchains.go
  8. 8
      node/worker/worker_test.go
  9. 3
      test/chain/main.go
  10. 2
      test/chain/reward/main.go

@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/block"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
@ -46,6 +47,7 @@ import (
"github.com/harmony-one/harmony/consensus/votepower"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/state/snapshot"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/bls"
@ -124,10 +126,29 @@ const (
// CacheConfig contains the configuration values for the trie caching/pruning
// that's resident in a blockchain.
type CacheConfig struct {
Disabled bool // Whether to disable trie write caching (archive node)
TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TriesInMemory uint64 // Block number from the head stored in disk before exiting
Disabled bool // Whether to disable trie write caching (archive node)
TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TriesInMemory uint64 // Block number from the head stored in disk before exiting
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieCleanJournal string // Disk journal for saving clean cache entries.
Preimages bool // Whether to store preimage of trie key to the disk
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}
// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
Disabled: false,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
}
type BlockChainImpl struct {
@ -137,8 +158,10 @@ type BlockChainImpl struct {
shardID uint32 // Shard number
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
triedb *trie.Database // The database handler for maintaining trie nodes.
// The following two variables are used to clean up the cache of redis in tikv mode.
// This can improve the cache hit rate of redis
@ -225,6 +248,21 @@ func newBlockChainWithOptions(
cacheConfig *CacheConfig, chainConfig *params.ChainConfig,
engine consensus_engine.Engine, vmConfig vm.Config, options Options) (*BlockChainImpl, error) {
if cacheConfig == nil {
cacheConfig = defaultCacheConfig
}
// Open trie database with provided config
triedb := trie.NewDatabaseWithConfig(db, &trie.Config{
Cache: cacheConfig.TrieCleanLimit,
Journal: cacheConfig.TrieCleanJournal,
Preimages: cacheConfig.Preimages,
})
if stateCache == nil {
stateCache = state.NewDatabaseWithNodeDB(db, triedb)
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
receiptsCache, _ := lru.New(receiptsCacheLimit)
@ -248,6 +286,7 @@ func newBlockChainWithOptions(
cacheConfig: cacheConfig,
db: db,
triegc: prque.New[int64, common.Hash](nil),
triedb: triedb,
stateCache: stateCache,
quit: make(chan struct{}),
bodyCache: bodyCache,
@ -298,6 +337,28 @@ func newBlockChainWithOptions(
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, beaconChain, engine))
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool
head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.NumberU64() {
utils.Logger().Warn().Uint64("diskbase", *layer).Uint64("chainhead", head.NumberU64()).Msg("Enabling snapshot recovery")
recover = true
}
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Hash())
}
// Take ownership of this particular state
go bc.update()
return bc, nil
@ -463,7 +524,7 @@ func (bc *BlockChainImpl) ValidateNewBlock(block *types.Block, beaconChain Block
}
func (bc *BlockChainImpl) validateNewBlock(block *types.Block) error {
state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache, nil)
state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache, bc.snaps)
if err != nil {
return err
}
@ -521,7 +582,7 @@ func (bc *BlockChainImpl) loadLastState() error {
return bc.Reset()
}
// Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache, nil); err != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Dangling block without a state associated, init from scratch
utils.Logger().Warn().
Str("number", currentBlock.Number().String()).
@ -618,7 +679,7 @@ func (bc *BlockChainImpl) SetHead(head uint64) error {
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache, nil); err != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
@ -695,7 +756,12 @@ func (bc *BlockChainImpl) State() (*state.DB, error) {
}
func (bc *BlockChainImpl) StateAt(root common.Hash) (*state.DB, error) {
return state.New(root, bc.stateCache, nil)
return state.New(root, bc.stateCache, bc.snaps)
}
// Snapshots returns the blockchain snapshot tree.
func (bc *BlockChainImpl) Snapshots() *snapshot.Tree {
return bc.snaps
}
func (bc *BlockChainImpl) Reset() error {
@ -740,7 +806,7 @@ func (bc *BlockChainImpl) repair(head **types.Block) error {
valsToRemove := map[common.Address]struct{}{}
for {
// Abort if we've rewound to a head block that does have associated state
if _, err := state.New((*head).Root(), bc.stateCache, nil); err == nil {
if _, err := state.New((*head).Root(), bc.stateCache, bc.snaps); err == nil {
utils.Logger().Info().
Str("number", (*head).Number().String()).
Str("hash", (*head).Hash().Hex()).
@ -1052,6 +1118,15 @@ func (bc *BlockChainImpl) Stop() {
return
}
// Ensure that the entirety of the state snapshot is journalled to disk.
var snapBase common.Hash
if bc.snaps != nil {
var err error
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Header().Root()); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to journal state snapshot")
}
}
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}
@ -1091,6 +1166,12 @@ func (bc *BlockChainImpl) Stop() {
}
}
}
if snapBase != (common.Hash{}) {
utils.Logger().Info().Interface("root", snapBase).Msg("Writing snapshot state to disk")
if err := triedb.Commit(snapBase, true); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to commit recent state trie")
}
}
for !bc.triegc.Empty() {
v := common.Hash(bc.triegc.PopItem())
triedb.Dereference(v)
@ -1099,6 +1180,15 @@ func (bc *BlockChainImpl) Stop() {
utils.Logger().Error().Msg("Dangling trie nodes after full cleanup")
}
}
// Flush the collected preimages to disk
if err := bc.stateCache.TrieDB().CommitPreimages(); err != nil {
utils.Logger().Error().Interface("err", err).Msg("Failed to commit trie preimages")
}
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
if bc.cacheConfig.TrieCleanJournal != "" {
bc.triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
}
utils.Logger().Info().Msg("Blockchain manager stopped")
}
@ -1733,7 +1823,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
} else {
parent = chain[i-1]
}
state, err := state.New(parent.Root(), bc.stateCache, nil)
state, err := state.New(parent.Root(), bc.stateCache, bc.snaps)
if err != nil {
return i, events, coalescedLogs, err
}
@ -3418,14 +3508,14 @@ func (bc *BlockChainImpl) tikvCleanCache() {
for i := bc.latestCleanCacheNum + 1; i <= to; i++ {
// build previous block statedb
fromBlock := bc.GetBlockByNumber(i)
fromTrie, err := state.New(fromBlock.Root(), bc.stateCache, nil)
fromTrie, err := state.New(fromBlock.Root(), bc.stateCache, bc.snaps)
if err != nil {
continue
}
// build current block statedb
toBlock := bc.GetBlockByNumber(i + 1)
toTrie, err := state.New(toBlock.Root(), bc.stateCache, nil)
toTrie, err := state.New(toBlock.Root(), bc.stateCache, bc.snaps)
if err != nil {
continue
}
@ -3525,7 +3615,7 @@ func (bc *BlockChainImpl) InitTiKV(conf *harmonyconfig.TiKVConfig) {
// If redis is empty, the hit rate will be too low and the synchronization block speed will be slow
// set LOAD_PRE_FETCH is yes can significantly improve this.
if os.Getenv("LOAD_PRE_FETCH") == "yes" {
if trie, err := state.New(bc.CurrentBlock().Root(), bc.stateCache, nil); err == nil {
if trie, err := state.New(bc.CurrentBlock().Root(), bc.stateCache, bc.snaps); err == nil {
trie.Prefetch(512)
} else {
log.Println("LOAD_PRE_FETCH ERR: ", err)

@ -46,7 +46,8 @@ func getTestEnvironment(testBankKey ecdsa.PrivateKey) (*BlockChainImpl, *state.D
genesis := gspec.MustCommit(database)
// fake blockchain
chain, _ := NewBlockChain(database, state.NewDatabase(database), nil, nil, gspec.Config, engine, vm.Config{})
cacheConfig := &CacheConfig{SnapshotLimit: 0}
chain, _ := NewBlockChain(database, nil, nil, cacheConfig, gspec.Config, engine, vm.Config{})
db, _ := chain.StateAt(genesis.Root())
// make a fake block header (use epoch 1 so that locked tokens can be tested)

@ -139,7 +139,10 @@ func NewDatabase(db ethdb.Database) Database {
}
func NewDatabaseWithCache(db ethdb.Database, cache int) Database {
return NewDatabaseWithConfig(db, nil)
config := trie.Config{
Cache: cache,
}
return NewDatabaseWithConfig(db, &config)
}
// NewDatabaseWithConfig creates a backing store for state. The returned database

@ -206,7 +206,7 @@ func New(config Config, diskdb ethdb.KeyValueStore, triedb *trie.Database, root
utils.Logger().Warn().Err(err).Msg("Snapshot maintenance disabled (syncing)")
return snap, nil
}
// Create the building waiter iff the background generation is allowed
// Create the building waiter if the background generation is allowed
if !config.NoBuild && !config.AsyncBuild {
defer snap.waitBuild()
}

@ -24,13 +24,13 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state/snapshot"
types2 "github.com/harmony-one/harmony/core/types"
common2 "github.com/harmony-one/harmony/internal/common"

@ -161,7 +161,8 @@ func createBlockChain() *BlockChainImpl {
genesis := gspec.MustCommit(database)
_ = genesis
engine := chain2.NewEngine()
blockchain, _ := NewBlockChain(database, state.NewDatabase(database), nil, nil, gspec.Config, engine, vm.Config{})
cacheConfig := &CacheConfig{SnapshotLimit: 0}
blockchain, _ := NewBlockChain(database, nil, nil, cacheConfig, gspec.Config, engine, vm.Config{})
return blockchain
}

@ -107,7 +107,7 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c
Msg("disable cache, running in archival mode")
} else {
cacheConfig = &core.CacheConfig{
TrieNodeLimit: 256 * 1024 * 1024,
TrieNodeLimit: 256,
TrieTimeLimit: 2 * time.Minute,
TriesInMemory: 128,
}
@ -172,9 +172,8 @@ func initStateCache(db ethdb.Database, sc *CollectionImpl, shardID uint32) (stat
return nil, err
}
return state.NewDatabaseWithCache(stateDB, 64), nil
} else {
return state.NewDatabase(db), nil
}
return nil, nil
}
// DisableCache disables caching mode for newly opened chains.

@ -5,8 +5,6 @@ import (
"math/rand"
"testing"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/ethereum/go-ethereum/common"
@ -45,7 +43,8 @@ func TestNewWorker(t *testing.T) {
genesis := gspec.MustCommit(database)
_ = genesis
chain, err := core.NewBlockChain(database, state.NewDatabase(database), &core.BlockChainImpl{}, nil, gspec.Config, engine, vm.Config{})
cacheConfig := &core.CacheConfig{SnapshotLimit: 0}
chain, err := core.NewBlockChain(database, nil, &core.BlockChainImpl{}, cacheConfig, gspec.Config, engine, vm.Config{})
if err != nil {
t.Error(err)
@ -72,7 +71,8 @@ func TestCommitTransactions(t *testing.T) {
)
gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, state.NewDatabase(database), nil, nil, gspec.Config, engine, vm.Config{})
cacheConfig := &core.CacheConfig{SnapshotLimit: 0}
chain, _ := core.NewBlockChain(database, nil, nil, cacheConfig, gspec.Config, engine, vm.Config{})
// Create a new worker
worker := New(params.TestChainConfig, chain, nil, engine)

@ -15,7 +15,6 @@ import (
blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/core"
core_state "github.com/harmony-one/harmony/core/state"
harmonyState "github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/hash"
@ -206,7 +205,7 @@ func playFaucetContract(chain core.BlockChain) {
func main() {
genesis := gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, harmonyState.NewDatabase(database), nil, nil, gspec.Config, chain.Engine(), vm.Config{})
chain, _ := core.NewBlockChain(database, nil, nil, nil, gspec.Config, chain.Engine(), vm.Config{})
txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain, types.NewTransactionErrorSink())
backend := &testWorkerBackend{

@ -109,7 +109,7 @@ func main() {
genesis := gspec.MustCommit(database)
_ = genesis
engine := chain.NewEngine()
bc, _ := core.NewBlockChain(database, state.NewDatabase(database), nil, nil, gspec.Config, engine, vm.Config{})
bc, _ := core.NewBlockChain(database, nil, nil, nil, gspec.Config, engine, vm.Config{})
statedb, _ := state.New(common2.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
msg := createValidator()
statedb.AddBalance(msg.ValidatorAddress, new(big.Int).Mul(big.NewInt(5e18), big.NewInt(2000)))

Loading…
Cancel
Save