add prune beacon chain feature

pull/4037/head^2
Lutty 3 years ago committed by Leo Chen
parent 8f49e4f8de
commit 622c0c6a44
  1. 50
      core/blockchain.go
  2. 227
      core/blockchain_pruner.go
  3. 73
      core/blockchain_pruner_meric.go
  4. 24
      core/rawdb/accessors_chain.go
  5. 21
      core/rawdb/accessors_offchain.go
  6. 6
      core/rawdb/interfaces.go
  7. 5
      core/rawdb/schema.go
  8. 17
      internal/configs/harmony/harmony.go
  9. 6
      node/node.go

@ -115,8 +115,9 @@ type CacheConfig struct {
// included in the canonical one where as GetBlockByNumber always represents the
// canonical chain.
type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning
pruneBeaconChainEnable bool // pruneBeaconChainEnable is enable prune BeaconChain feature
db ethdb.Database // Low level persistent database to store final content in
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
@ -148,16 +149,17 @@ type BlockChain struct {
futureBlocks *lru.Cache // future blocks are blocks added for later processing
shardStateCache *lru.Cache
lastCommitsCache *lru.Cache
epochCache *lru.Cache // Cache epoch number → first block number
randomnessCache *lru.Cache // Cache for vrf/vdf
validatorSnapshotCache *lru.Cache // Cache for validator snapshot
validatorStatsCache *lru.Cache // Cache for validator stats
validatorListCache *lru.Cache // Cache of validator list
validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator
pendingCrossLinksCache *lru.Cache // Cache of last pending crosslinks
blockAccumulatorCache *lru.Cache // Cache of block accumulators
quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
epochCache *lru.Cache // Cache epoch number → first block number
randomnessCache *lru.Cache // Cache for vrf/vdf
validatorSnapshotCache *lru.Cache // Cache for validator snapshot
validatorStatsCache *lru.Cache // Cache for validator stats
validatorListCache *lru.Cache // Cache of validator list
validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator
pendingCrossLinksCache *lru.Cache // Cache of last pending crosslinks
blockAccumulatorCache *lru.Cache // Cache of block accumulators
quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
blockchainPruner *blockchainPruner // use to prune beacon chain
// procInterrupt must be atomically called
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down
@ -226,6 +228,7 @@ func NewBlockChain(
validatorListByDelegatorCache: validatorListByDelegatorCache,
pendingCrossLinksCache: pendingCrossLinksCache,
blockAccumulatorCache: blockAccumulatorCache,
blockchainPruner: newBlockchainPruner(db),
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
@ -1255,6 +1258,20 @@ func (bc *BlockChain) WriteBlockWithState(
return NonStatTy, err
}
if bc.IsEnablePruneBeaconChainFeature() {
if block.Number().Cmp(big.NewInt(pruneBeaconChainBlockBefore)) > 0 && block.Epoch().Cmp(big.NewInt(pruneBeaconChainBeforeEpoch)) > 0 {
maxBlockNum := big.NewInt(0).Sub(block.Number(), big.NewInt(pruneBeaconChainBlockBefore)).Uint64()
maxEpoch := big.NewInt(0).Sub(block.Epoch(), big.NewInt(pruneBeaconChainBeforeEpoch))
go func() {
err := bc.blockchainPruner.Start(maxBlockNum, maxEpoch)
if err != nil {
utils.Logger().Info().Err(err).Msg("pruneBeaconChain init error")
return
}
}()
}
}
if err := batch.Write(); err != nil {
if isUnrecoverableErr(err) {
fmt.Printf("Unrecoverable error when writing leveldb: %v\nExitting\n", err)
@ -3031,6 +3048,15 @@ func (bc *BlockChain) SuperCommitteeForNextEpoch(
return nextCommittee, err
}
func (bc *BlockChain) EnablePruneBeaconChainFeature() {
bc.pruneBeaconChainEnable = true
}
// IsEnablePruneBeaconChainFeature returns is enable prune BeaconChain feature
func (bc *BlockChain) IsEnablePruneBeaconChainFeature() bool {
return bc.pruneBeaconChainEnable
}
var (
leveldbErrSpec = "leveldb"
tooManyOpenFilesErrStr = "Too many open files"

@ -0,0 +1,227 @@
package core
import (
"math/big"
"math/rand"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
)
const (
// pruneBeaconChainBeforeEpoch Keep 16 epoch
pruneBeaconChainBeforeEpoch = 16
// pruneBeaconChainBlockBefore Keep 16 epoch blocks
pruneBeaconChainBlockBefore = 32768 * pruneBeaconChainBeforeEpoch
// maxDeleteBlockOnce max delete block once
maxDeleteBlockOnce = 10000
// compactProportion compact proportion is thousandth
compactProportion = 1000
)
type blockchainPruner struct {
isPruneBeaconChainRunning int32 // isPruneBeaconChainRunning must be called atomically
db ethdb.Database
batchWriter ethdb.Batch
compactRange [][2][]byte
startTime time.Time
deletedBlockCount int
deletedValidatorSnapshot int
skipValidatorSnapshot int
}
func newBlockchainPruner(db ethdb.Database) *blockchainPruner {
return &blockchainPruner{
db: db,
}
}
func (bp *blockchainPruner) Delete(key []byte) error {
err := bp.batchWriter.Delete(key)
if err != nil {
return err
}
if bp.batchWriter.ValueSize() >= ethdb.IdealBatchSize {
err = bp.batchWriter.Write()
if err != nil {
return err
}
bp.batchWriter.Reset()
}
return nil
}
func (bp *blockchainPruner) resetBatchWriter() error {
err := bp.batchWriter.Write()
if err != nil {
return err
}
bp.batchWriter = nil
return nil
}
func (bp *blockchainPruner) runWhenNotPruning(cb func() error) error {
// check is start
if atomic.CompareAndSwapInt32(&bp.isPruneBeaconChainRunning, 0, 1) {
defer atomic.StoreInt32(&bp.isPruneBeaconChainRunning, 0)
return cb()
}
return nil
}
func (bp *blockchainPruner) Start(maxBlockNum uint64, maxEpoch *big.Int) error {
return bp.runWhenNotPruning(func() error {
// init
bp.compactRange = make([][2][]byte, 0)
bp.startTime = time.Now()
bp.deletedBlockCount = 0
bp.deletedValidatorSnapshot = 0
bp.skipValidatorSnapshot = 0
bp.batchWriter = bp.db.NewBatch()
// prune data
bp.addToCompactRange(bp.pruneBeaconChainBlock(maxBlockNum))
bp.addToCompactRange(bp.pruneBeaconChainValidatorSnapshot(maxEpoch))
// batch write data and reset
err := bp.resetBatchWriter()
if err != nil {
return err
}
prunerMaxBlock.Set(float64(maxBlockNum))
deletedValidatorSnapshot.Add(float64(bp.deletedValidatorSnapshot))
skipValidatorSnapshot.Add(float64(bp.skipValidatorSnapshot))
deletedBlockCount.Add(float64(bp.deletedBlockCount))
deletedBlockCountUsedTime.Add(float64(time.Now().Sub(bp.startTime).Milliseconds()))
utils.Logger().Info().
Uint64("maxBlockNum", maxBlockNum).
Uint64("maxEpoch", maxEpoch.Uint64()).
Int("deletedBlockCount", bp.deletedBlockCount).
Int("deletedValidatorSnapshot", bp.deletedValidatorSnapshot).
Int("skipValidatorSnapshot", bp.skipValidatorSnapshot).
Dur("cost", time.Now().Sub(bp.startTime)).
Msg("pruneBeaconChain delete block success")
// probability of 1 in 1000 blocks to Compact, It consumes a lot of IO
if rand.Intn(compactProportion) == 1 {
startTime := time.Now()
for _, compactStartEnd := range bp.compactRange {
err := bp.db.Compact(compactStartEnd[0], compactStartEnd[1])
if err != nil {
return err
}
}
compactBlockCountUsedTime.Add(float64(time.Now().Sub(startTime).Milliseconds()))
utils.Logger().Info().
Uint64("maxBlockNum", maxBlockNum).
Uint64("maxEpoch", maxEpoch.Uint64()).
Dur("cost", time.Now().Sub(startTime)).
Msg("pruneBeaconChain compact db success")
}
return nil
})
}
func (bp *blockchainPruner) pruneBeaconChainBlock(maxBlockNum uint64) (minKey []byte, maxKey []byte) {
return rawdb.IteratorBlocks(bp.db, func(blockNum uint64, hash common.Hash) bool {
if blockNum >= maxBlockNum {
return false
}
// skip genesis block
if blockNum == 0 {
return true
}
blockInfo := rawdb.ReadBlock(bp.db, hash, blockNum)
err := bp.deleteBlockInfo(blockInfo)
if err != nil {
utils.Logger().Error().
Uint64("blockNum", blockNum).
AnErr("err", err).
Msg("pruneBeaconChain delete block info error")
return false
}
err = rawdb.DeleteBlock(bp, hash, blockNum)
if err != nil {
utils.Logger().Error().
Uint64("blockNum", blockNum).
AnErr("err", err).
Msg("pruneBeaconChain delete block error")
return false
}
// limit time spent
bp.deletedBlockCount++
return bp.deletedBlockCount < maxDeleteBlockOnce
})
}
func (bp *blockchainPruner) deleteBlockInfo(info *types.Block) error {
for _, cx := range info.IncomingReceipts() {
for _, cxReceipt := range cx.Receipts {
err := rawdb.DeleteCxLookupEntry(bp, cxReceipt.TxHash)
if err != nil {
return err
}
}
}
for _, tx := range info.Transactions() {
err := rawdb.DeleteTxLookupEntry(bp, tx.Hash())
if err != nil {
return err
}
}
for _, stx := range info.StakingTransactions() {
err := rawdb.DeleteTxLookupEntry(bp, stx.Hash())
if err != nil {
return err
}
}
return nil
}
func (bp *blockchainPruner) pruneBeaconChainValidatorSnapshot(maxEpoch *big.Int) (minKey []byte, maxKey []byte) {
return rawdb.IteratorValidatorSnapshot(bp.db, func(addr common.Address, epoch *big.Int) bool {
if epoch.Cmp(maxEpoch) < 0 {
err := rawdb.DeleteValidatorSnapshot(bp, addr, epoch)
if err != nil {
utils.Logger().Error().
Str("addr", addr.Hex()).
Uint64("epoch", epoch.Uint64()).
AnErr("err", err).
Msg("pruneBeaconChain delete validator snapshot error")
return false
}
// limit time spent
bp.deletedValidatorSnapshot++
} else {
bp.skipValidatorSnapshot++
}
return bp.deletedValidatorSnapshot < maxDeleteBlockOnce
})
}
func (bp *blockchainPruner) addToCompactRange(minKey []byte, maxKey []byte) {
bp.compactRange = append(bp.compactRange, [2][]byte{minKey, maxKey})
}

@ -0,0 +1,73 @@
package core
import (
prom "github.com/harmony-one/harmony/api/service/prometheus"
"github.com/prometheus/client_golang/prometheus"
)
func init() {
prom.PromRegistry().MustRegister(
deletedValidatorSnapshot,
skipValidatorSnapshot,
deletedBlockCount,
prunerMaxBlock,
deletedBlockCountUsedTime,
compactBlockCountUsedTime,
)
}
var (
deletedValidatorSnapshot = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "blockchain_pruner",
Name: "deleted_validator_snapshot",
Help: "number of deleted validator snapshot count",
},
)
skipValidatorSnapshot = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "stream",
Name: "skip_validator_snapshot",
Help: "number of skip validator snapshot count",
},
)
deletedBlockCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "blockchain_pruner",
Name: "deleted_block_count",
Help: "number of deleted block count",
},
)
prunerMaxBlock = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "stream",
Name: "pruner_max_block",
Help: "number of largest pruner block",
},
)
deletedBlockCountUsedTime = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "blockchain_pruner",
Name: "deleted_block_count_used_time",
Help: "sum of deleted block used time in ms",
},
)
compactBlockCountUsedTime = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "blockchain_pruner",
Name: "compact_block_count_used_time",
Help: "sum of compact block time in ms",
},
)
)

@ -419,3 +419,27 @@ func FindCommonAncestor(db DatabaseReader, a, b *block.Header) *block.Header {
}
return a
}
func IteratorBlocks(iterator DatabaseIterator, cb func(blockNum uint64, hash common.Hash) bool) (minKey []byte, maxKey []byte) {
iter := iterator.NewIteratorWithPrefix(headerPrefix)
defer iter.Release()
minKey = headerPrefix
for iter.Next() {
// headerKey = headerPrefix + num (uint64 big endian) + hash
key := iter.Key()
if len(key) != len(headerPrefix)+8+32 {
continue
}
maxKey = key
blockNum := decodeBlockNumber(key[len(headerPrefix) : len(headerPrefix)+8])
hash := common.BytesToHash(key[len(headerPrefix)+8:])
if !cb(blockNum, hash) {
return
}
}
return
}

@ -184,6 +184,27 @@ func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address, epoch *big
return nil
}
func IteratorValidatorSnapshot(iterator DatabaseIterator, cb func(addr common.Address, epoch *big.Int) bool) (minKey []byte, maxKey []byte) {
iter := iterator.NewIteratorWithPrefix(validatorSnapshotPrefix)
defer iter.Release()
minKey = headerPrefix
for iter.Next() {
// validatorSnapshotKey = validatorSnapshotPrefix + addr bytes (20 bytes) + epoch bytes
key := iter.Key()
maxKey = key
addressBytes := key[len(validatorSnapshotPrefix) : len(validatorSnapshotPrefix)+20]
epochBytes := key[len(validatorSnapshotPrefix)+20:]
if !cb(common.BytesToAddress(addressBytes), big.NewInt(0).SetBytes(epochBytes)) {
return
}
}
return
}
// DeleteValidatorStats ..
func DeleteValidatorStats(db DatabaseDeleter, addr common.Address) error {
if err := db.Delete(validatorStatsKey(addr)); err != nil {

@ -16,6 +16,8 @@
package rawdb
import "github.com/ethereum/go-ethereum/ethdb"
// DatabaseReader wraps the Has and Get method of a backing data store.
type DatabaseReader interface {
Has(key []byte) (bool, error)
@ -31,3 +33,7 @@ type DatabaseWriter interface {
type DatabaseDeleter interface {
Delete(key []byte) error
}
type DatabaseIterator interface {
NewIteratorWithPrefix(prefix []byte) ethdb.Iterator
}

@ -89,6 +89,11 @@ func encodeBlockNumber(number uint64) []byte {
return enc
}
// decodeBlockNumber decodes a block number as big endian uint64
func decodeBlockNumber(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
// headerKey = headerPrefix + num (uint64 big endian) + hash
func headerKey(number uint64, hash common.Hash) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...)

@ -54,14 +54,15 @@ type P2pConfig struct {
}
type GeneralConfig struct {
NodeType string
NoStaking bool
ShardID int
IsArchival bool
IsBackup bool
IsBeaconArchival bool
IsOffline bool
DataDir string
NodeType string
NoStaking bool
ShardID int
IsArchival bool
IsBackup bool
IsBeaconArchival bool
IsOffline bool
DataDir string
EnablePruneBeaconChain bool
}
type ConsensusConfig struct {

@ -155,6 +155,12 @@ func (node *Node) Beaconchain() *core.BlockChain {
if err != nil {
utils.Logger().Error().Err(err).Msg("cannot get beaconchain")
}
// only available in validator node and shard 1-3
isEnablePruneBeaconChain := node.HarmonyConfig != nil && node.HarmonyConfig.General.EnablePruneBeaconChain
isNotBeaconChainValidator := node.NodeConfig.Role() == nodeconfig.Validator && node.NodeConfig.ShardID != shard.BeaconChainShardID
if isEnablePruneBeaconChain && isNotBeaconChainValidator {
bc.EnablePruneBeaconChainFeature()
}
return bc
}

Loading…
Cancel
Save