record SnapdbInfo to db

pull/4174/head
peekpi 3 years ago committed by Leo Chen
parent 9f6c94e93e
commit 8053f87f12
  1. 142
      cmd/harmony/dumpdb.go
  2. 49
      core/rawdb/accessors_snapdb.go
  3. 2
      core/rawdb/schema.go
  4. 13
      core/state/dump.go

@ -26,6 +26,8 @@ import (
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
)
var snapdbInfo = rawdb.SnapdbInfo{}
var batchFlag = cli.IntFlag{
Name: "batch",
Shorthand: "b",
@ -34,59 +36,23 @@ var batchFlag = cli.IntFlag{
}
var dumpDBCmd = &cobra.Command{
Use: "dumpdb srcdb destdb [startKey [endKey [firstStateStartKey [firstStateEndKey]",
Use: "dumpdb srcdb destdb",
Short: "dump a snapshot db.",
Long: "dump a snapshot db.",
Example: "harmony dumpdb /srcDir/harmony_db_0 /destDir/harmony_db_0",
Args: cobra.RangeArgs(2, 6),
Run: func(cmd *cobra.Command, args []string) {
srcDBDir, destDBDir := args[0], args[1]
var batchLimitMB int
var startKey []byte
var endKey []byte
var firstStateStartKey []byte
var firstStateEndKey []byte
if len(args) > 2 {
_startKey, err := hexutil.Decode(args[2])
if err != nil {
fmt.Println("invalid startKey:", err)
os.Exit(-1)
}
startKey = _startKey
}
if len(args) > 3 {
_endKey, err := hexutil.Decode(args[3])
if err != nil {
fmt.Println("invalid endKey:", err)
os.Exit(-1)
}
endKey = _endKey
}
if len(args) > 4 {
_startKey, err := hexutil.Decode(args[4])
if err != nil {
fmt.Println("invalid stateStartKey:", err)
os.Exit(-1)
}
firstStateStartKey = _startKey
}
if len(args) > 5 {
_endKey, err := hexutil.Decode(args[5])
if err != nil {
fmt.Println("invalid stateEndKey:", err)
os.Exit(-1)
}
firstStateEndKey = _endKey
}
batchLimitMB = cli.GetIntFlagValue(cmd, batchFlag)
batchLimitMB := cli.GetIntFlagValue(cmd, batchFlag)
networkType := getNetworkType(cmd)
shardSchedule = getShardSchedule(networkType)
if shardSchedule == nil {
fmt.Println("unsupported network type")
os.Exit(-1)
}
fmt.Println(srcDBDir, destDBDir, batchLimitMB, hexutil.Encode(startKey), hexutil.Encode(endKey), hexutil.Encode(firstStateStartKey), hexutil.Encode(firstStateEndKey))
dumpMain(srcDBDir, destDBDir, batchLimitMB*MB, startKey, endKey, firstStateStartKey, firstStateEndKey)
snapdbInfo.NetworkType = networkType
fmt.Println(srcDBDir, destDBDir, batchLimitMB)
dumpMain(srcDBDir, destDBDir, batchLimitMB*MB)
os.Exit(0)
},
}
@ -139,27 +105,24 @@ const (
)
var (
totalSize = 0 // current dump size
printSize = 0 // last print dump size
flushSize = 0 // size flushed into db
accountCount = 0 // number of accounts
lastAccount = state.DumpAccount{
printSize = uint64(0) // last print dump size
flushedSize = uint64(0) // size flushed into db
lastAccount = state.DumpAccount{
Address: &common.Address{},
}
savedStateKey hexutil.Bytes
accountState = NONE
emptyHash = common.Hash{}
shardSchedule shardingconfig.Schedule // init by cli flag
)
func dumpPrint(prefix string, showAccount bool) {
if totalSize-printSize > MB || showAccount {
if snapdbInfo.DumpedSize-printSize > MB || showAccount {
now := time.Now().Unix()
fmt.Println(now, prefix, accountCount, totalSize, printSize/MB, flushSize/MB)
fmt.Println(now, prefix, snapdbInfo.AccountCount, snapdbInfo.DumpedSize, printSize/MB, flushedSize/MB)
if showAccount {
fmt.Println("account:", lastAccount.Address.Hex(), lastAccount.Balance, len(lastAccount.Code), accountState, lastAccount.SecureKey.String(), savedStateKey.String())
fmt.Println("account:", lastAccount.Address.Hex(), lastAccount.Balance, len(lastAccount.Code), accountState, lastAccount.SecureKey.String(), snapdbInfo.LastAccountStateKey.String())
}
printSize = totalSize
printSize = snapdbInfo.DumpedSize
}
}
@ -182,15 +145,16 @@ func (db *KakashiDB) Delete(key []byte) error {
// copy key,value to toDB
func (db *KakashiDB) copyKV(key, value []byte) {
db.toDBBatch.Put(key, value)
totalSize += len(key) + len(value)
snapdbInfo.DumpedSize += uint64(len(key) + len(value))
dumpPrint("copyKV", false)
}
func (db *KakashiDB) flush() {
dumpPrint("KakashiDB batch writhing", true)
rawdb.WriteSnapdbInfo(db.toDBBatch, &snapdbInfo)
db.toDBBatch.Write()
db.toDBBatch.Reset()
flushSize = totalSize
flushedSize = snapdbInfo.DumpedSize
dumpPrint("KakashiDB flushed", false)
}
@ -208,22 +172,23 @@ func (db *KakashiDB) OnAccountStart(addr common.Address, acc state.DumpAccount)
accountState = ON_ACCOUNT_START
lastAccount = acc
lastAccount.Address = &addr
snapdbInfo.LastAccountKey = acc.SecureKey
}
// OnAccount implements DumpCollector interface
func (db *KakashiDB) OnAccountState(addr common.Address, StateSecureKey hexutil.Bytes, key, value []byte) {
accountState = ON_ACCOUNT_STATE
if totalSize-flushSize > int(db.batchLimit) {
savedStateKey = StateSecureKey
snapdbInfo.LastAccountStateKey = StateSecureKey
if snapdbInfo.DumpedSize-flushedSize > uint64(db.batchLimit) {
db.flush()
}
}
// OnAccount implements DumpCollector interface
func (db *KakashiDB) OnAccountEnd(addr common.Address, acc state.DumpAccount) {
accountCount++
snapdbInfo.AccountCount++
accountState = ON_ACCOUNT_END
if totalSize-flushSize > int(db.batchLimit) {
if snapdbInfo.DumpedSize-flushedSize > uint64(db.batchLimit) {
db.flush()
}
}
@ -283,6 +248,7 @@ func (db *KakashiDB) indexerDataDump(block *types.Block) {
for i := blkno; i <= block.NumberU64(); i++ {
db.GetHeaderByNumber(i)
}
snapdbInfo.IndexerDataDumped = true
db.flush()
}
@ -356,11 +322,12 @@ func (db *KakashiDB) offchainDataDump(block *types.Block) {
db.copyKV(it.Key(), it.Value())
return true
})
snapdbInfo.OffchainDataDumped = true
db.flush()
}
func (db *KakashiDB) stateDataDump(block *types.Block, startKey, endKey, firstStateStartKey, firstStateEndKey []byte) {
fmt.Println("stateDataDump:")
func (db *KakashiDB) stateDataDump(block *types.Block) {
fmt.Println("stateDataDump:", snapdbInfo.LastAccountKey.String(), snapdbInfo.LastAccountStateKey.String())
stateDB0 := state.NewDatabaseWithCache(db, STATEDB_CACHE_SIZE)
rootHash := block.Root()
stateDB, err := state.New(rootHash, stateDB0)
@ -368,15 +335,21 @@ func (db *KakashiDB) stateDataDump(block *types.Block, startKey, endKey, firstSt
panic(err)
}
config := new(state.DumpConfig)
config.Start = startKey
config.End = endKey
config.StateStart = firstStateStartKey
config.StateEnd = firstStateEndKey
config.Start = snapdbInfo.LastAccountKey
if len(snapdbInfo.LastAccountStateKey) > 0 {
stateKey := new(big.Int).SetBytes(snapdbInfo.LastAccountStateKey)
stateKey.Add(stateKey, big.NewInt(1))
config.StateStart = stateKey.Bytes()
if len(config.StateStart) != len(snapdbInfo.LastAccountStateKey) {
panic("statekey overflow")
}
}
stateDB.DumpToCollector(db, config)
snapdbInfo.StateDataDumped = true
db.flush()
}
func dumpMain(srcDBDir, destDBDir string, batchLimit int, startKey, endKey, firstStateStartKey, firstStateEndKey []byte) {
func dumpMain(srcDBDir, destDBDir string, batchLimit int) {
fmt.Println("===dumpMain===")
srcDB, err := ethRawDB.NewLevelDBDatabase(srcDBDir, LEVELDB_CACHE_SIZE, LEVELDB_HANDLES, "")
if err != nil {
@ -389,19 +362,30 @@ func dumpMain(srcDBDir, destDBDir string, batchLimit int, startKey, endKey, firs
os.Exit(-1)
}
headHash := rawdb.ReadHeadBlockHash(srcDB)
headNumber := *rawdb.ReadHeaderNumber(srcDB, headHash)
fmt.Println("head-block:", headNumber, headHash.Hex())
if lastSnapdbInfo := rawdb.ReadSnapdbInfo(destDB); lastSnapdbInfo != nil {
if lastSnapdbInfo.NetworkType != snapdbInfo.NetworkType {
fmt.Printf("different network type! last:%s cmd:%s\n", lastSnapdbInfo.NetworkType, snapdbInfo.NetworkType)
os.Exit(-1)
}
snapdbInfo = *lastSnapdbInfo
}
if headHash == emptyHash {
fmt.Println("empty head block hash")
os.Exit(-1)
var block *types.Block
if snapdbInfo.BlockHeader == nil {
headHash := rawdb.ReadHeadBlockHash(srcDB)
headNumber := rawdb.ReadHeaderNumber(srcDB, headHash)
block = rawdb.ReadBlock(srcDB, headHash, *headNumber)
} else {
block = rawdb.ReadBlock(destDB, snapdbInfo.BlockHeader.Hash(), snapdbInfo.BlockHeader.Number().Uint64())
}
block := rawdb.ReadBlock(srcDB, headHash, headNumber)
if block == nil {
fmt.Println("ReadBlock error:")
if block == nil || block.Hash() == emptyHash {
fmt.Println("empty head block")
os.Exit(-1)
}
fmt.Println("head-block:", block.Header().Number(), block.Hash().Hex())
snapdbInfo.BlockHeader = block.Header()
fmt.Println("start copying...")
cache, _ := lru.New(LRU_CACHE_SIZE)
copier := &KakashiDB{
@ -412,7 +396,13 @@ func dumpMain(srcDBDir, destDBDir string, batchLimit int, startKey, endKey, firs
cache: cache,
}
defer copier.Close()
copier.offchainDataDump(block)
copier.indexerDataDump(block)
copier.stateDataDump(block, startKey, endKey, firstStateStartKey, firstStateEndKey)
if !snapdbInfo.OffchainDataDumped {
copier.offchainDataDump(block)
}
if !snapdbInfo.IndexerDataDumped {
copier.indexerDataDump(block)
}
if !snapdbInfo.StateDataDumped {
copier.stateDataDump(block)
}
}

@ -0,0 +1,49 @@
package rawdb
import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
)
type SnapdbInfo struct {
NetworkType nodeconfig.NetworkType // network type
BlockHeader *block.Header // latest header at snapshot
AccountCount int // number of dumped account
OffchainDataDumped bool // is OffchainData dumped
IndexerDataDumped bool // is IndexerData dumped
StateDataDumped bool // is StateData dumped
DumpedSize uint64 // size of key-value already dumped
LastAccountKey hexutil.Bytes // MPT key of the account last dumped, use this to continue dumping
LastAccountStateKey hexutil.Bytes // MPT key of the account's state last dumped, use this to continue dumping
}
// ReadSnapdbInfo return the SnapdbInfo of the db
func ReadSnapdbInfo(db DatabaseReader) *SnapdbInfo {
data, _ := db.Get(snapdbInfoKey)
if len(data) == 0 {
return nil
}
info := &SnapdbInfo{}
if err := rlp.DecodeBytes(data, info); err != nil {
utils.Logger().Error().Err(err).Msg("Invalid SnapdbInfo RLP")
return nil
}
return info
}
// WriteSnapdbInfo write the SnapdbInfo into db
func WriteSnapdbInfo(db DatabaseWriter, info *SnapdbInfo) error {
data, err := rlp.EncodeToBytes(info)
if err != nil {
utils.Logger().Error().Msg("Failed to RLP encode SnapdbInfo")
return err
}
if err := db.Put(snapdbInfoKey, data); err != nil {
utils.Logger().Error().Msg("Failed to store SnapdbInfo")
return err
}
return nil
}

@ -72,6 +72,8 @@ var (
preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
currentRewardGivenOutPrefix = []byte("blk-rwd-")
// key of SnapdbInfo
snapdbInfoKey = []byte("SnapdbInfo")
)
// TxLookupEntry is a positional metadata to help looking up the data content of

@ -35,6 +35,7 @@ type DumpConfig struct {
SkipCode bool
SkipStorage bool
OnlyWithAddresses bool
HoldStorage bool
Start []byte
End []byte
StateStart []byte
@ -211,12 +212,14 @@ func (s *DB) DumpToCollector(c DumpCollector, conf *DumpConfig) (nextKey []byte)
}
key := s.trie.GetKey(storageIt.Key)
c.OnAccountState(addr, storageIt.Key, key, storageIt.Value)
_, content, _, err := rlp.Split(storageIt.Value)
if err != nil {
log.Error("Failed to decode the value returned by iterator", "error", err)
continue
if conf.HoldStorage {
_, content, _, err := rlp.Split(storageIt.Value)
if err != nil {
log.Error("Failed to decode the value returned by iterator", "error", err)
continue
}
account.Storage[common.BytesToHash(s.trie.GetKey(storageIt.Key))] = common.Bytes2Hex(content)
}
account.Storage[common.BytesToHash(s.trie.GetKey(storageIt.Key))] = common.Bytes2Hex(content)
}
stateStart = nil
hasStateEnd = false

Loading…
Cancel
Save