Add cache to state validator modification; refactor snapshot read/write (#2844)

* Add cache to state validator access; refactor snapshot read/write

* make validator update happen at commit

* fix lint

* make sure slashing applies on validator wrapper in state

* Revert live update of validtor wrapper in staking txn

* add init population of validator cache logic

* Fix validator cache commit logic

* Separate validatorWrapper func

* fix build

* Fix lint
pull/2866/head
Rongjian Lan 5 years ago committed by GitHub
parent e0ccc7b71c
commit 614f528f2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      core/blockchain.go
  2. 13
      core/staking_verifier.go
  3. 43
      core/state/statedb.go
  4. 5
      core/state_transition.go
  5. 14
      core/tx_pool.go
  6. 2
      core/vm/interface.go
  7. 12
      internal/chain/engine.go
  8. 3
      node/worker/worker.go
  9. 12
      staking/availability/measure.go
  10. 7
      staking/slash/double-sign.go
  11. 2
      staking/types/validator.go
  12. 2
      test/chain/reward/main.go

@ -148,7 +148,7 @@ type BlockChain struct {
lastCommitsCache *lru.Cache
epochCache *lru.Cache // Cache epoch number → first block number
randomnessCache *lru.Cache // Cache for vrf/vdf
validatorCache *lru.Cache // Cache for validator info
validatorSnapshotCache *lru.Cache // Cache for validator snapshot
validatorStatsCache *lru.Cache // Cache for validator stats
validatorListCache *lru.Cache // Cache of validator list
validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator
@ -217,7 +217,7 @@ func NewBlockChain(
lastCommitsCache: commitsCache,
epochCache: epochCache,
randomnessCache: randomnessCache,
validatorCache: validatorCache,
validatorSnapshotCache: validatorCache,
validatorStatsCache: validatorStatsCache,
validatorListCache: validatorListCache,
validatorListByDelegatorCache: validatorListByDelegatorCache,
@ -2196,48 +2196,25 @@ func (bc *BlockChain) ReadValidatorSnapshot(
addr common.Address,
) (*staking.ValidatorSnapshot, error) {
epoch := bc.CurrentBlock().Epoch()
if cached, ok := bc.validatorCache.Get("validator-snapshot-" + string(addr.Bytes()) + epoch.String()); ok {
by := cached.([]byte)
v := staking.ValidatorWrapper{}
if err := rlp.DecodeBytes(by, &v); err != nil {
return nil, err
}
s := staking.ValidatorSnapshot{&v, epoch}
return &s, nil
key := addr.Hex() + epoch.String()
if cached, ok := bc.validatorSnapshotCache.Get(key); ok {
return cached.(*staking.ValidatorSnapshot), nil
}
return rawdb.ReadValidatorSnapshot(bc.db, addr, epoch)
}
// writeValidatorSnapshots writes the snapshot of provided list of validators
func (bc *BlockChain) writeValidatorSnapshots(
batch rawdb.DatabaseWriter, addrs []common.Address, epoch *big.Int, state *state.DB,
// WriteValidatorSnapshot writes the snapshot of provided validator
func (bc *BlockChain) WriteValidatorSnapshot(
batch rawdb.DatabaseWriter, snapshot *staking.ValidatorSnapshot,
) error {
// Read all validator's current data
validators := []*staking.ValidatorWrapper{}
for i := range addrs {
// The snapshot will be captured in the state after the last epoch block is finalized
validator, err := state.ValidatorWrapper(addrs[i])
if err != nil {
return err
}
validators = append(validators, validator)
}
// Batch write the current data as snapshot
for i := range validators {
if err := rawdb.WriteValidatorSnapshot(batch, validators[i], epoch); err != nil {
if err := rawdb.WriteValidatorSnapshot(batch, snapshot.Validator, snapshot.Epoch); err != nil {
return err
}
}
// Update cache
for i := range validators {
by, err := rlp.EncodeToBytes(validators[i])
if err == nil {
key := "validator-snapshot-" + string(validators[i].Address.Bytes()) + epoch.String()
bc.validatorCache.Add(key, by)
}
}
key := snapshot.Validator.Address.Hex() + snapshot.Epoch.String()
bc.validatorSnapshotCache.Add(key, snapshot)
return nil
}
@ -2381,7 +2358,21 @@ func (bc *BlockChain) UpdateValidatorSnapshots(
allValidators = append(allValidators, newValidators...)
return bc.writeValidatorSnapshots(batch, allValidators, epoch, state)
// Read all validator's current data and snapshot them
for i := range allValidators {
// The snapshot will be captured in the state after the last epoch block is finalized
validator, err := state.ValidatorWrapper(allValidators[i])
if err != nil {
return err
}
snapshot := &staking.ValidatorSnapshot{validator, epoch}
if err := bc.WriteValidatorSnapshot(batch, snapshot); err != nil {
return err
}
}
return nil
}
// ReadValidatorList reads the addresses of current all validators
@ -2494,13 +2485,13 @@ func (bc *BlockChain) UpdateStakingMetaData(
return newValidators, err
}
if err := rawdb.WriteValidatorSnapshot(batch, validator, epoch); err != nil {
if err := bc.WriteValidatorSnapshot(batch, &staking.ValidatorSnapshot{validator, epoch}); err != nil {
return newValidators, err
}
// For validator created at exactly the last block of an epoch, we should create the snapshot
// for next epoch too.
if newEpoch.Cmp(epoch) > 0 {
if err := rawdb.WriteValidatorSnapshot(batch, validator, newEpoch); err != nil {
if err := bc.WriteValidatorSnapshot(batch, &staking.ValidatorSnapshot{validator, newEpoch}); err != nil {
return newValidators, err
}
}

@ -95,7 +95,7 @@ func VerifyAndEditValidatorFromMsg(
if !stateDB.IsValidator(msg.ValidatorAddress) {
return nil, errValidatorNotExist
}
wrapper, err := stateDB.ValidatorWrapper(msg.ValidatorAddress)
wrapper, err := stateDB.ValidatorWrapperCopy(msg.ValidatorAddress)
if err != nil {
return nil, err
}
@ -158,7 +158,7 @@ func VerifyAndDelegateFromMsg(
if !stateDB.IsValidator(msg.ValidatorAddress) {
return nil, nil, errValidatorNotExist
}
wrapper, err := stateDB.ValidatorWrapper(msg.ValidatorAddress)
wrapper, err := stateDB.ValidatorWrapperCopy(msg.ValidatorAddress)
if err != nil {
return nil, nil, err
}
@ -252,7 +252,7 @@ func VerifyAndUndelegateFromMsg(
return nil, errValidatorNotExist
}
wrapper, err := stateDB.ValidatorWrapper(msg.ValidatorAddress)
wrapper, err := stateDB.ValidatorWrapperCopy(msg.ValidatorAddress)
if err != nil {
return nil, err
}
@ -295,7 +295,7 @@ func VerifyAndCollectRewardsFromDelegation(
totalRewards := big.NewInt(0)
for i := range delegations {
delegation := &delegations[i]
wrapper, err := stateDB.ValidatorWrapper(delegation.ValidatorAddress)
wrapper, err := stateDB.ValidatorWrapperCopy(delegation.ValidatorAddress)
if err != nil {
return nil, nil, err
}
@ -313,11 +313,6 @@ func VerifyAndCollectRewardsFromDelegation(
Msg("Delegation index out of bound")
return nil, nil, errors.New("Delegation index out of bound")
}
if err := wrapper.SanityCheck(
staking.DoNotEnforceMaxBLS,
); err != nil {
return nil, nil, err
}
updatedValidatorWrappers = append(updatedValidatorWrappers, wrapper)
}
if totalRewards.Int64() == 0 {

@ -69,6 +69,7 @@ type DB struct {
// This map holds 'live' objects, which will get modified while processing a state transition.
stateObjects map[common.Address]*Object
stateObjectsDirty map[common.Address]struct{}
stateValidators map[common.Address]*stk.ValidatorWrapper
// DB error.
// State objects are used by the consensus core and VM which are
@ -105,6 +106,7 @@ func New(root common.Hash, db Database) (*DB, error) {
trie: tr,
stateObjects: make(map[common.Address]*Object),
stateObjectsDirty: make(map[common.Address]struct{}),
stateValidators: make(map[common.Address]*stk.ValidatorWrapper),
logs: make(map[common.Hash][]*types.Log),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
@ -132,6 +134,7 @@ func (db *DB) Reset(root common.Hash) error {
db.trie = tr
db.stateObjects = make(map[common.Address]*Object)
db.stateObjectsDirty = make(map[common.Address]struct{})
db.stateValidators = make(map[common.Address]*stk.ValidatorWrapper)
db.thash = common.Hash{}
db.bhash = common.Hash{}
db.txIndex = 0
@ -520,6 +523,7 @@ func (db *DB) Copy() *DB {
trie: db.db.CopyTrie(db.trie),
stateObjects: make(map[common.Address]*Object, len(db.journal.dirties)),
stateObjectsDirty: make(map[common.Address]struct{}, len(db.journal.dirties)),
stateValidators: make(map[common.Address]*stk.ValidatorWrapper),
refund: db.refund,
logs: make(map[common.Hash][]*types.Log, len(db.logs)),
logSize: db.logSize,
@ -546,6 +550,7 @@ func (db *DB) Copy() *DB {
state.stateObjectsDirty[addr] = struct{}{}
}
}
for hash, logs := range db.logs {
cpy := make([]*types.Log, len(logs))
for i, l := range logs {
@ -592,6 +597,11 @@ func (db *DB) GetRefund() uint64 {
// Finalise finalises the state by removing the db destructed objects
// and clears the journal as well as the refunds.
func (db *DB) Finalise(deleteEmptyObjects bool) {
// Commit validator changes in cache to stateObjects
for addr, val := range db.stateValidators {
db.UpdateValidatorWrapper(addr, val)
}
for addr := range db.journal.dirties {
stateObject, exist := db.stateObjects[addr]
if !exist {
@ -691,9 +701,34 @@ var (
errAddressNotPresent = errors.New("address not present in state")
)
// ValidatorWrapper ..
// ValidatorWrapper retrieves the existing validator in the cache.
// The return value is a reference to the actual validator object in state.
// The modification on it will be committed to the state object when Finalize()
// is called.
func (db *DB) ValidatorWrapper(
addr common.Address,
) (*stk.ValidatorWrapper, error) {
// Read cache first
cached, ok := db.stateValidators[addr]
if ok {
return cached, nil
}
val, err := db.ValidatorWrapperCopy(addr)
if err != nil {
return nil, err
}
// populate cache if the validator is not in it
db.stateValidators[addr] = val
return val, nil
}
// ValidatorWrapperCopy retrieves the existing validator as a copy from state object.
// Changes on the copy has to be explicitly commited with UpdateValidatorWrapper()
// to take effect.
func (db *DB) ValidatorWrapperCopy(
addr common.Address,
) (*stk.ValidatorWrapper, error) {
by := db.GetCode(addr)
if len(by) == 0 {
@ -726,6 +761,8 @@ func (db *DB) UpdateValidatorWrapper(
return err
}
db.SetCode(addr, by)
// update cache
db.stateValidators[addr] = val
return nil
}
@ -791,7 +828,7 @@ func (db *DB) AddReward(snapshot *stk.ValidatorWrapper, reward *big.Int, shareLo
percentage, ok := shareLookup[delegation.DelegatorAddress]
if !ok {
continue
return errors.Wrapf(err, "missing delegation shares for reward distribution")
}
rewardInt := percentage.MulInt(totalRewardForDelegators).RoundInt()
@ -806,5 +843,5 @@ func (db *DB) AddReward(snapshot *stk.ValidatorWrapper, reward *big.Int, shareLo
curValidator.Delegations[0].Reward.Add(curValidator.Delegations[0].Reward, rewardPool)
}
return db.UpdateValidatorWrapper(curValidator.Address, curValidator)
return nil
}

@ -387,7 +387,6 @@ func (st *StateTransition) StakingTransitionDb() (usedGas uint64, err error) {
func (st *StateTransition) verifyAndApplyCreateValidatorTx(
createValidator *staking.CreateValidator, blockNum *big.Int,
) error {
wrapper, err := VerifyAndCreateValidatorFromMsg(
st.state, st.evm.EpochNumber, blockNum, createValidator,
)
@ -397,8 +396,8 @@ func (st *StateTransition) verifyAndApplyCreateValidatorTx(
if err := st.state.UpdateValidatorWrapper(wrapper.Address, wrapper); err != nil {
return err
}
st.state.SetValidatorFlag(wrapper.Address)
st.state.SubBalance(wrapper.Address, createValidator.Amount)
st.state.SetValidatorFlag(createValidator.ValidatorAddress)
st.state.SubBalance(createValidator.ValidatorAddress, createValidator.Amount)
return nil
}

@ -17,7 +17,6 @@
package core
import (
"context"
"fmt"
"math"
"math/big"
@ -763,6 +762,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
if shard.Schedule.IsLastBlock(currentBlockNumber.Uint64()) {
pendingEpoch = new(big.Int).Add(pendingEpoch, big.NewInt(1))
}
_, err = VerifyAndCreateValidatorFromMsg(pool.currentState, pendingEpoch, pendingBlockNumber, stkMsg)
return err
case staking.DirectiveEditValidator:
@ -782,6 +782,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
chainContext = nil // might use testing blockchain, set to nil for verifier to handle.
}
pendingBlockNumber := new(big.Int).Add(pool.chain.CurrentBlock().Number(), big.NewInt(1))
_, err = VerifyAndEditValidatorFromMsg(
pool.currentState, chainContext,
pool.chain.CurrentBlock().Epoch(),
@ -800,6 +801,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
if from != stkMsg.DelegatorAddress {
return errors.WithMessagef(ErrInvalidSender, "staking transaction sender is %s", b32)
}
_, _, err = VerifyAndDelegateFromMsg(pool.currentState, stkMsg)
return err
case staking.DirectiveUndelegate:
@ -818,6 +820,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
if shard.Schedule.IsLastBlock(pool.chain.CurrentBlock().Number().Uint64()) {
pendingEpoch = new(big.Int).Add(pendingEpoch, big.NewInt(1))
}
_, err = VerifyAndUndelegateFromMsg(pool.currentState, pendingEpoch, stkMsg)
return err
case staking.DirectiveCollectRewards:
@ -840,6 +843,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
if err != nil {
return err
}
_, _, err = VerifyAndCollectRewardsFromDelegation(pool.currentState, delegations)
return err
default:
@ -945,14 +949,6 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
return replace, nil
}
// Add adds a transaction to the pool if valid and passes it to the tx relay
// backend
func (pool *TxPool) Add(ctx context.Context, tx *types.PoolTransaction) error {
// TODO(ricl): placeholder
// TODO(minhdoan): follow with richard why we need this. As of now TxPool is not used now.
return nil
}
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!

@ -42,7 +42,7 @@ type StateDB interface {
SetCode(common.Address, []byte)
GetCodeSize(common.Address) int
ValidatorWrapper(common.Address) (*staking.ValidatorWrapper, error)
ValidatorWrapperCopy(common.Address) (*staking.ValidatorWrapper, error)
UpdateValidatorWrapper(common.Address, *staking.ValidatorWrapper) error
SetValidatorFlag(common.Address)
UnsetValidatorFlag(common.Address)

@ -336,12 +336,6 @@ func payoutUndelegations(
state.AddBalance(delegation.DelegatorAddress, totalWithdraw)
}
countTrack[validator] = len(wrapper.Delegations)
if err := state.UpdateValidatorWrapper(
validator, wrapper,
); err != nil {
const msg = "[Finalize] failed update validator info"
return errors.New(msg)
}
}
utils.Logger().Info().
@ -367,12 +361,6 @@ func setLastEpochInCommittee(header *block.Header, state *state.DB) error {
)
}
wrapper.LastEpochInCommittee = newShardState.Epoch
if err := state.UpdateValidatorWrapper(
addr, wrapper,
); err != nil {
const msg = "[Finalize] failed update validator info"
return errors.New(msg)
}
}
return nil
}

@ -145,6 +145,7 @@ func (w *Worker) CommitTransactions(
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs))
// THESE CODE ARE DUPLICATED AS ABOVE>>
// TODO(audit): add staking txn revert functionality
if _, err := w.commitStakingTransaction(tx, coinbase); err != nil {
txID := tx.Hash().Hex()
utils.Logger().Error().Err(err).
@ -460,10 +461,8 @@ func (w *Worker) FinalizeNewBlock(
return nil, err
}
}
state := w.current.state.Copy()
copyHeader := types.CopyHeader(w.current.header)
// TODO: feed coinbase into here so the proposer gets extra rewards.
block, _, err := w.engine.Finalize(
w.chain, copyHeader, state, w.current.txs, w.current.receipts,
w.current.outcxs, w.current.incxs, w.current.stakingTxs,

@ -108,12 +108,6 @@ func bumpCount(
wrapper.Counters.NumBlocksSigned, common.Big1,
)
}
if err := state.UpdateValidatorWrapper(
addr, wrapper,
); err != nil {
return err
}
}
}
@ -228,11 +222,5 @@ func ComputeAndMutateEPOSStatus(
// to leave the committee can actually leave.
}
if err := state.UpdateValidatorWrapper(
addr, wrapper,
); err != nil {
return err
}
return nil
}

@ -334,7 +334,8 @@ func delegatorSlashApply(
slashDebt := applySlashRate(delegationSnapshot.Amount, rate)
slashDiff := &Application{big.NewInt(0), big.NewInt(0)}
snapshotAddr := delegationSnapshot.DelegatorAddress
for _, delegationNow := range current.Delegations {
for i := range current.Delegations {
delegationNow := current.Delegations[i]
if nowAmt := delegationNow.Amount; delegationNow.DelegatorAddress == snapshotAddr {
utils.Logger().Info().
RawJSON("delegation-snapshot", []byte(delegationSnapshot.String())).
@ -473,9 +474,7 @@ func Apply(
RawJSON("slash", []byte(slash.String())).
Msg("about to update staking info for a validator after a slash")
if err := state.UpdateValidatorWrapper(
snapshot.Validator.Address, current,
); err != nil {
if err := current.SanityCheck(staking.DoNotEnforceMaxBLS); err != nil {
return nil, err
}
}

@ -26,7 +26,7 @@ const (
MaxSecurityContactLength = 140
MaxDetailsLength = 280
BLSVerificationStr = "harmony-one"
TenThousand = 10000
TenThousand = 10
)
var (

@ -101,8 +101,6 @@ func main() {
})
}
statedb.UpdateValidatorWrapper(validator.Address, validator)
startTime := time.Now()
validator, _ = statedb.ValidatorWrapper(msg.ValidatorAddress)
endTime := time.Now()

Loading…
Cancel
Save