Add validator snapshot in local db

pull/1834/head
Rongjian Lan 5 years ago
parent 867fdb1f8d
commit 92f80b0445
  1. 4
      consensus/engine/consensus_engine.go
  2. 197
      core/blockchain.go
  3. 2
      core/chain_makers.go
  4. 75
      core/rawdb/accessors_chain.go
  5. 14
      core/rawdb/schema.go
  6. 2
      core/state_transition.go
  7. 6
      hmy/api_backend.go
  8. 2
      internal/chain/engine.go
  9. 2
      internal/hmyapi/backend.go
  10. 2
      node/node_handler.go
  11. 3
      staking/types/validator.go

@ -40,8 +40,8 @@ type ChainReader interface {
// Thus, only should be used to read the shard state of the current chain.
ReadShardState(epoch *big.Int) (shard.State, error)
// CurrentValidatorAddresses retrieves the current list of validators
CurrentValidatorAddresses() []common.Address
// ActiveValidatorAddresses retrieves the list of active validators
ActiveValidatorAddresses() []common.Address
}
// Engine is an algorithm agnostic consensus engine.

@ -70,9 +70,9 @@ const (
commitsCacheLimit = 10
epochCacheLimit = 10
randomnessCacheLimit = 10
stakingCacheLimit = 256
validatorListCacheLimit = 2
validatorListByDelegatorCacheLimit = 256
validatorCacheLimit = 1024
validatorListCacheLimit = 10
validatorListByDelegatorCacheLimit = 1024
// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
BlockChainVersion = 3
@ -135,7 +135,7 @@ type BlockChain struct {
lastCommitsCache *lru.Cache
epochCache *lru.Cache // Cache epoch number → first block number
randomnessCache *lru.Cache // Cache for vrf/vdf
stakingCache *lru.Cache // Cache for staking validator
validatorCache *lru.Cache // Cache for staking validator
validatorListCache *lru.Cache // Cache of validator list
validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator
@ -174,7 +174,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
commitsCache, _ := lru.New(commitsCacheLimit)
epochCache, _ := lru.New(epochCacheLimit)
randomnessCache, _ := lru.New(randomnessCacheLimit)
stakingCache, _ := lru.New(stakingCacheLimit)
stakingCache, _ := lru.New(validatorCacheLimit)
validatorListCache, _ := lru.New(validatorListCacheLimit)
validatorListByDelegatorCache, _ := lru.New(validatorListByDelegatorCacheLimit)
@ -195,7 +195,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
lastCommitsCache: commitsCache,
epochCache: epochCache,
randomnessCache: randomnessCache,
stakingCache: stakingCache,
validatorCache: stakingCache,
validatorListCache: validatorListCache,
validatorListByDelegatorCache: validatorListByDelegatorCache,
engine: engine,
@ -1078,6 +1078,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
batch := bc.db.NewBatch()
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
//// Cross-shard txns
epoch := block.Header().Epoch()
if bc.chainConfig.IsCrossTx(block.Epoch()) {
shardingConfig := shard.Schedule.InstanceForEpoch(epoch)
@ -1097,6 +1098,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
bc.WriteCXReceiptsProofSpent(block.IncomingReceipts())
}
//// VRF + VDF
//check non zero VRF field in header and add to local db
if len(block.Vrf()) > 0 {
vrfBlockNumbers, _ := bc.ReadEpochVrfBlockNums(block.Header().Epoch())
@ -1130,16 +1132,45 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
}
}
//// Shard State and Validator Update
header := block.Header()
if header.ShardStateHash() != (common.Hash{}) {
epoch := new(big.Int).Add(header.Epoch(), common.Big1)
err = bc.WriteShardStateBytes(batch, epoch, header.ShardState())
shardState, err := bc.WriteShardStateBytes(batch, epoch, header.ShardState())
if err != nil {
header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state")
return NonStatTy, err
}
processed := make(map[common.Address]struct{})
allActiveValidators := []common.Address{}
for i := range *shardState {
shard := (*shardState)[i]
for j := range shard.NodeList {
if shard.NodeList[j].StakeWithDelegationApplied != nil { // For external validator
_, ok := processed[shard.NodeList[j].EcdsaAddress]
if !ok {
allActiveValidators = append(allActiveValidators, shard.NodeList[j].EcdsaAddress)
}
}
}
}
bc.UpdateActiveValidatorsSnapshot(allActiveValidators)
}
if bc.chainConfig.IsStaking(block.Epoch()) {
for _, tx := range block.StakingTransactions() {
err = bc.UpdateStakingMetaData(tx)
// keep offchain database consistency with onchain we need revert
// but it should not happend unless local database corrupted
if err != nil {
utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err)
return NonStatTy, err
}
}
}
//// Cross-links
if len(header.CrossLinks()) > 0 {
crossLinks := &types.CrossLinks{}
err = rlp.DecodeBytes(header.CrossLinks(), crossLinks)
@ -1159,19 +1190,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
bc.WriteShardLastCrossLink(crossLink.ShardID(), crossLink)
}
}
if bc.chainConfig.IsStaking(block.Epoch()) {
for _, tx := range block.StakingTransactions() {
err = bc.UpdateStakingMetaData(tx)
// keep offchain database consistency with onchain we need revert
// but it should not happend unless local database corrupted
if err != nil {
utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err)
return NonStatTy, err
}
}
}
/////////////////////////// END
// If the total difficulty is higher than our known, add it to the canonical chain
@ -1870,18 +1888,18 @@ func (bc *BlockChain) WriteShardState(
// WriteShardStateBytes saves the given sharding state under the given epoch number.
func (bc *BlockChain) WriteShardStateBytes(db rawdb.DatabaseWriter,
epoch *big.Int, shardState []byte,
) error {
) (*shard.State, error) {
decodeShardState := shard.State{}
if err := rlp.DecodeBytes(shardState, &decodeShardState); err != nil {
return err
return nil, err
}
err := rawdb.WriteShardStateBytes(db, epoch, shardState)
if err != nil {
return err
return nil, err
}
cacheKey := string(epoch.Bytes())
bc.shardStateCache.Add(cacheKey, decodeShardState)
return nil
return &decodeShardState, nil
}
// ReadLastCommits retrieves last commits.
@ -2276,9 +2294,9 @@ func (bc *BlockChain) ReadTxLookupEntry(txID common.Hash) (common.Hash, uint64,
return rawdb.ReadTxLookupEntry(bc.db, txID)
}
// ReadStakingValidator reads staking information of given validatorWrapper
func (bc *BlockChain) ReadStakingValidator(addr common.Address) (*staking.ValidatorWrapper, error) {
if cached, ok := bc.stakingCache.Get("staking-" + string(addr.Bytes())); ok {
// ReadValidatorData reads staking information of given validatorWrapper
func (bc *BlockChain) ReadValidatorData(addr common.Address) (*staking.ValidatorWrapper, error) {
if cached, ok := bc.validatorCache.Get("v-" + string(addr.Bytes())); ok {
by := cached.([]byte)
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(by, &v); err != nil {
@ -2287,12 +2305,12 @@ func (bc *BlockChain) ReadStakingValidator(addr common.Address) (*staking.Valida
return &v, nil
}
return rawdb.ReadStakingValidator(bc.db, addr)
return rawdb.ReadValidatorData(bc.db, addr)
}
// WriteStakingValidator reads staking information of given validatorWrapper
func (bc *BlockChain) WriteStakingValidator(v *staking.ValidatorWrapper) error {
err := rawdb.WriteStakingValidator(bc.db, v)
// WriteValidatorData writes staking information of given validatorWrapper
func (bc *BlockChain) WriteValidatorData(v *staking.ValidatorWrapper) error {
err := rawdb.WriteValidatorData(bc.db, v)
if err != nil {
return err
}
@ -2300,7 +2318,68 @@ func (bc *BlockChain) WriteStakingValidator(v *staking.ValidatorWrapper) error {
if err != nil {
return err
}
bc.stakingCache.Add("staking-"+string(v.Address.Bytes()), by)
bc.validatorCache.Add("v-"+string(v.Address.Bytes()), by)
return nil
}
// ReadValidatorSnapshot reads the snapshot staking information of given validator address
// TODO: put epoch number in to snapshot too.
func (bc *BlockChain) ReadValidatorSnapshot(addr common.Address) (*staking.ValidatorWrapper, error) {
if cached, ok := bc.validatorCache.Get("vs-" + string(addr.Bytes())); ok {
by := cached.([]byte)
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(by, &v); err != nil {
return nil, err
}
return &v, nil
}
return rawdb.ReadValidatorSnapshot(bc.db, addr)
}
// WriteValidatorSnapshots writes the snapshot of provided list of validators
func (bc *BlockChain) WriteValidatorsSnapshot(addrs []common.Address) error {
validators := []*staking.ValidatorWrapper{}
for _, addr := range addrs {
validator, err := bc.ReadValidatorData(addr)
if err != nil {
return err
}
validators = append(validators, validator)
}
batch := bc.db.NewBatch()
for i := range validators {
err := rawdb.WriteValidatorSnapshot(batch, validators[i])
if err != nil {
return err
}
}
if err := batch.Write(); err != nil {
return err
}
for i := range validators {
by, err := rlp.EncodeToBytes(validators[i])
if err == nil {
bc.validatorCache.Add("vs-"+string(validators[i].Address.Bytes()), by)
}
}
return nil
}
// DeleteValidatorData deletes the snapshot staking information of given validator address
func (bc *BlockChain) DeleteValidatorsSnapshot(addrs []common.Address) error {
batch := bc.db.NewBatch()
for i := range addrs {
rawdb.DeleteValidatorSnapshot(batch, addrs[i])
}
if err := batch.Write(); err != nil {
return err
}
for i := range addrs {
bc.validatorCache.Remove("vs-" + string(addrs[i].Bytes()))
}
return nil
}
@ -2314,12 +2393,12 @@ func (bc *BlockChain) ReadValidatorList() ([]common.Address, error) {
}
return m, nil
}
return rawdb.ReadValidatorList(bc.db)
return rawdb.ReadValidatorList(bc.db, false)
}
// WriteValidatorList writes the list of validator addresses to database
func (bc *BlockChain) WriteValidatorList(addrs []common.Address) error {
err := rawdb.WriteValidatorList(bc.db, addrs)
err := rawdb.WriteValidatorList(bc.db, addrs, false)
if err != nil {
return err
}
@ -2330,6 +2409,50 @@ func (bc *BlockChain) WriteValidatorList(addrs []common.Address) error {
return nil
}
// ReadActiveValidatorList reads the addresses of active validators
func (bc *BlockChain) ReadActiveValidatorList() ([]common.Address, error) {
if cached, ok := bc.validatorListCache.Get("activeValidatorList"); ok {
by := cached.([]byte)
m := []common.Address{}
if err := rlp.DecodeBytes(by, &m); err != nil {
return nil, err
}
return m, nil
}
return rawdb.ReadValidatorList(bc.db, true)
}
// WriteActiveValidatorList writes the list of active validator addresses to database
func (bc *BlockChain) WriteActiveValidatorList(addrs []common.Address) error {
err := rawdb.WriteValidatorList(bc.db, addrs, true)
if err != nil {
return err
}
bytes, err := rlp.EncodeToBytes(addrs)
if err == nil {
bc.validatorListCache.Add("activeValidatorList", bytes)
}
return nil
}
// UpdateActiveValidatorsSnapshot updates the list of active validators and updates the content snapshot of the active validators
func (bc *BlockChain) UpdateActiveValidatorsSnapshot(activeValidators []common.Address) error {
prevActiveValidators, err := bc.ReadActiveValidatorList()
if err != nil {
return err
}
err = bc.DeleteValidatorsSnapshot(prevActiveValidators)
if err != nil {
return err
}
if err = bc.WriteValidatorsSnapshot(activeValidators); err != nil {
return err
}
return nil
}
// ReadValidatorListByDelegator reads the addresses of validators delegated by a delegator
func (bc *BlockChain) ReadValidatorListByDelegator(delegator common.Address) ([]common.Address, error) {
if cached, ok := bc.validatorListByDelegatorCache.Get(delegator.Bytes()); ok {
@ -2413,8 +2536,9 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction) erro
return nil
}
// CurrentValidatorAddresses returns the address of active validators for current epoch
func (bc *BlockChain) CurrentValidatorAddresses() []common.Address {
// ActiveValidatorAddresses returns the address of active validators for current epoch
// TODO: should only return those that are selected by epos.
func (bc *BlockChain) ActiveValidatorAddresses() []common.Address {
list, err := bc.ReadValidatorList()
if err != nil {
return make([]common.Address, 0)
@ -2428,6 +2552,7 @@ func (bc *BlockChain) CurrentValidatorAddresses() []common.Address {
if err != nil {
continue
}
// TODO: double check this logic here.
epoch := shard.Schedule.CalcEpochNumber(val.CreationHeight.Uint64())
if epoch.Cmp(currentEpoch) >= 0 {
// wait for next epoch

@ -270,4 +270,4 @@ func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *block.Header
func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *block.Header { return nil }
func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil }
func (cr *fakeChainReader) ReadShardState(epoch *big.Int) (shard.State, error) { return nil, nil }
func (cr *fakeChainReader) CurrentValidatorAddresses() []common.Address { return nil }
func (cr *fakeChainReader) ActiveValidatorAddresses() []common.Address { return nil }

@ -614,11 +614,11 @@ func WriteCXReceiptsProofUnspentCheckpoint(db DatabaseWriter, shardID uint32, bl
return db.Put(cxReceiptUnspentCheckpointKey(shardID), by)
}
// ReadStakingValidator retrieves staking validator by its address
func ReadStakingValidator(db DatabaseReader, addr common.Address) (*staking.ValidatorWrapper, error) {
data, err := db.Get(stakingKey(addr))
// ReadValidatorData retrieves staking validator by its address
func ReadValidatorData(db DatabaseReader, addr common.Address) (*staking.ValidatorWrapper, error) {
data, err := db.Get(validatorKey(addr))
if len(data) == 0 || err != nil {
utils.Logger().Info().Err(err).Msg("ReadStakingValidator")
utils.Logger().Info().Err(err).Msg("ReadValidatorData")
return nil, err
}
v := staking.ValidatorWrapper{}
@ -629,21 +629,64 @@ func ReadStakingValidator(db DatabaseReader, addr common.Address) (*staking.Vali
return &v, nil
}
// WriteStakingValidator stores staking validator's information by its address
func WriteStakingValidator(db DatabaseWriter, v *staking.ValidatorWrapper) error {
// WriteValidatorData stores staking validator's information by its address
func WriteValidatorData(db DatabaseWriter, v *staking.ValidatorWrapper) error {
bytes, err := rlp.EncodeToBytes(v)
if err != nil {
utils.Logger().Error().Msg("[WriteStakingValidator] Failed to encode")
utils.Logger().Error().Msg("[WriteValidatorData] Failed to encode")
return err
}
if err := db.Put(stakingKey(v.Address), bytes); err != nil {
utils.Logger().Error().Msg("[WriteStakingValidator] Failed to store to database")
if err := db.Put(validatorKey(v.Address), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorData] Failed to store to database")
return err
}
return err
}
// ReadValidatorSnapshot retrieves staking validator's snapshot by its address
func ReadValidatorSnapshot(db DatabaseReader, addr common.Address) (*staking.ValidatorWrapper, error) {
data, err := db.Get(validatorSnapshotKey(addr))
if len(data) == 0 || err != nil {
utils.Logger().Info().Err(err).Msg("ReadValidatorSnapshot")
return nil, err
}
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(data, &v); err != nil {
utils.Logger().Error().Err(err).Str("address", addr.Hex()).Msg("Unable to Decode staking validator from database")
return nil, err
}
return &v, nil
}
// WriteValidatorSnapshot stores staking validator's snapshot by its address
func WriteValidatorSnapshot(db DatabaseWriter, v *staking.ValidatorWrapper) error {
bytes, err := rlp.EncodeToBytes(v)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to encode")
return err
}
if err := db.Put(validatorSnapshotKey(v.Address), bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorSnapshot] Failed to store to database")
return err
}
return err
}
// DeleteValidatorSnapshot removes the validator's snapshot by its address
func DeleteValidatorSnapshot(db DatabaseDeleter, addr common.Address) {
if err := db.Delete(validatorSnapshotKey(addr)); err != nil {
utils.Logger().Error().Msg("Failed to delete snapshot of a validator")
}
}
// ReadValidatorList retrieves staking validator by its address
func ReadValidatorList(db DatabaseReader) ([]common.Address, error) {
data, err := db.Get([]byte("validatorList"))
// Return only active validators if isActive==true, otherwise, return all validators
func ReadValidatorList(db DatabaseReader, isActive bool) ([]common.Address, error) {
key := validatorListKey
if isActive {
key = activeValidatorListKey
}
data, err := db.Get(key)
if len(data) == 0 || err != nil {
return []common.Address{}, nil
}
@ -656,12 +699,18 @@ func ReadValidatorList(db DatabaseReader) ([]common.Address, error) {
}
// WriteValidatorList stores staking validator's information by its address
func WriteValidatorList(db DatabaseWriter, addrs []common.Address) error {
// Writes only for active validators if isActive==true, otherwise, writes for all validators
func WriteValidatorList(db DatabaseWriter, addrs []common.Address, isActive bool) error {
key := validatorListKey
if isActive {
key = activeValidatorListKey
}
bytes, err := rlp.EncodeToBytes(addrs)
if err != nil {
utils.Logger().Error().Msg("[WriteValidatorList] Failed to encode")
}
if err := db.Put([]byte("validatorList"), bytes); err != nil {
if err := db.Put(key, bytes); err != nil {
utils.Logger().Error().Msg("[WriteValidatorList] Failed to store to database")
}
return err

@ -74,7 +74,10 @@ var (
cxReceiptSpentPrefix = []byte("cxReceiptSpent") // prefix for indicator of unspent of cxReceiptsProof
cxReceiptUnspentCheckpointPrefix = []byte("cxReceiptUnspentCheckpoint") // prefix for cxReceiptsProof unspent checkpoint
stakingPrefix = []byte("staking") // prefix for staking validator information
validatorPrefix = []byte("validator-") // prefix for staking validator information
validatorSnapshotPrefix = []byte("validator-snapshot-") // prefix for staking validator's snapshot information
validatorListKey = []byte("validator-list") // key for all validators list
activeValidatorListKey = []byte("active-validator-list") // key for active validators list
// epochBlockNumberPrefix + epoch (big.Int.Bytes())
// -> epoch block number (big.Int.Bytes())
@ -237,7 +240,12 @@ func cxReceiptUnspentCheckpointKey(shardID uint32) []byte {
return append(prefix, sKey...)
}
func stakingKey(addr common.Address) []byte {
prefix := stakingPrefix
func validatorKey(addr common.Address) []byte {
prefix := validatorPrefix
return append(prefix, addr.Bytes()...)
}
func validatorSnapshotKey(addr common.Address) []byte {
prefix := validatorSnapshotPrefix
return append(prefix, addr.Bytes()...)
}

@ -354,7 +354,7 @@ func (st *StateTransition) applyCreateValidatorTx(createValidator *staking.Creat
delegations := []staking.Delegation{}
delegations = append(delegations, staking.NewDelegation(v.Address, createValidator.Amount))
wrapper := staking.ValidatorWrapper{*v, delegations, nil, nil}
wrapper := staking.ValidatorWrapper{*v, delegations}
if err := st.state.UpdateStakingInfo(v.Address, &wrapper); err != nil {
return err

@ -290,9 +290,9 @@ func (b *APIBackend) SendStakingTx(
return nil
}
// GetCurrentValidatorAddresses returns the address of active validators for current epoch
func (b *APIBackend) GetCurrentValidatorAddresses() []common.Address {
return b.hmy.BlockChain().CurrentValidatorAddresses()
// GetActiveValidatorAddresses returns the address of active validators for current epoch
func (b *APIBackend) GetActiveValidatorAddresses() []common.Address {
return b.hmy.BlockChain().ActiveValidatorAddresses()
}
// GetValidatorCandidates returns the up to date validator candidates for next epoch

@ -179,7 +179,7 @@ func (e *engineImpl) Finalize(
// Only do such at the last block of an epoch
if len(header.ShardState()) > 0 {
// TODO: make sure we are using the correct validator list
validators := chain.CurrentValidatorAddresses()
validators := chain.ActiveValidatorAddresses()
for _, validator := range validators {
wrapper := state.GetStakingInfo(validator)
if wrapper != nil {

@ -73,7 +73,7 @@ type Backend interface {
ResendCx(ctx context.Context, txID common.Hash) (uint64, bool)
IsLeader() bool
SendStakingTx(ctx context.Context, newStakingTx *staking.StakingTransaction) error
GetCurrentValidatorAddresses() []common.Address
GetActiveValidatorAddresses() []common.Address
GetValidatorCandidates() []common.Address
GetValidatorInformation(addr common.Address) *staking.Validator
GetDelegatorsInformation(addr common.Address) []*staking.Delegation

@ -411,7 +411,7 @@ func (node *Node) AddNewBlock(newBlock *types.Block) error {
}
utils.Logger().Debug().Msgf("ValidatorInformation %v: %v", i, val)
}
currAddrs := node.Blockchain().CurrentValidatorAddresses()
currAddrs := node.Blockchain().ActiveValidatorAddresses()
utils.Logger().Debug().Msgf("CurrentValidators : %v", currAddrs)
candidates := node.Blockchain().ValidatorCandidates()
utils.Logger().Debug().Msgf("CandidateValidators : %v", candidates)

@ -36,9 +36,6 @@ var (
type ValidatorWrapper struct {
Validator `json:"validator" yaml:"validator" rlp:"nil"`
Delegations []Delegation `json:"delegations" yaml:"delegations" rlp:"nil"`
// TODO: move snapshot into off-chain db.
SnapshotValidator *Validator `json:"snapshot_validator" yaml:"snaphost_validator" rlp:"nil"`
SnapshotDelegations []Delegation `json:"snapshot_delegations" yaml:"snapshot_delegations" rlp:"nil"`
}
// Validator - data fields for a validator

Loading…
Cancel
Save