From 614f528f2ceebd1d4e340a82e33f77f9624541b3 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 21 Apr 2020 09:28:45 -0700 Subject: [PATCH] Add cache to state validator modification; refactor snapshot read/write (#2844) * Add cache to state validator access; refactor snapshot read/write * make validator update happen at commit * fix lint * make sure slashing applies on validator wrapper in state * Revert live update of validtor wrapper in staking txn * add init population of validator cache logic * Fix validator cache commit logic * Separate validatorWrapper func * fix build * Fix lint --- core/blockchain.go | 67 ++++++++++++++------------------- core/staking_verifier.go | 13 ++----- core/state/statedb.go | 43 +++++++++++++++++++-- core/state_transition.go | 5 +-- core/tx_pool.go | 14 +++---- core/vm/interface.go | 2 +- internal/chain/engine.go | 12 ------ node/worker/worker.go | 3 +- staking/availability/measure.go | 12 ------ staking/slash/double-sign.go | 7 ++-- staking/types/validator.go | 2 +- test/chain/reward/main.go | 2 - 12 files changed, 86 insertions(+), 96 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 87d0790c3..d41d61e5d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -148,7 +148,7 @@ type BlockChain struct { lastCommitsCache *lru.Cache epochCache *lru.Cache // Cache epoch number → first block number randomnessCache *lru.Cache // Cache for vrf/vdf - validatorCache *lru.Cache // Cache for validator info + validatorSnapshotCache *lru.Cache // Cache for validator snapshot validatorStatsCache *lru.Cache // Cache for validator stats validatorListCache *lru.Cache // Cache of validator list validatorListByDelegatorCache *lru.Cache // Cache of validator list by delegator @@ -217,7 +217,7 @@ func NewBlockChain( lastCommitsCache: commitsCache, epochCache: epochCache, randomnessCache: randomnessCache, - validatorCache: validatorCache, + validatorSnapshotCache: validatorCache, validatorStatsCache: validatorStatsCache, validatorListCache: validatorListCache, validatorListByDelegatorCache: validatorListByDelegatorCache, @@ -2196,48 +2196,25 @@ func (bc *BlockChain) ReadValidatorSnapshot( addr common.Address, ) (*staking.ValidatorSnapshot, error) { epoch := bc.CurrentBlock().Epoch() - if cached, ok := bc.validatorCache.Get("validator-snapshot-" + string(addr.Bytes()) + epoch.String()); ok { - by := cached.([]byte) - v := staking.ValidatorWrapper{} - if err := rlp.DecodeBytes(by, &v); err != nil { - return nil, err - } - s := staking.ValidatorSnapshot{&v, epoch} - return &s, nil + key := addr.Hex() + epoch.String() + if cached, ok := bc.validatorSnapshotCache.Get(key); ok { + return cached.(*staking.ValidatorSnapshot), nil } return rawdb.ReadValidatorSnapshot(bc.db, addr, epoch) } -// writeValidatorSnapshots writes the snapshot of provided list of validators -func (bc *BlockChain) writeValidatorSnapshots( - batch rawdb.DatabaseWriter, addrs []common.Address, epoch *big.Int, state *state.DB, +// WriteValidatorSnapshot writes the snapshot of provided validator +func (bc *BlockChain) WriteValidatorSnapshot( + batch rawdb.DatabaseWriter, snapshot *staking.ValidatorSnapshot, ) error { - // Read all validator's current data - validators := []*staking.ValidatorWrapper{} - for i := range addrs { - // The snapshot will be captured in the state after the last epoch block is finalized - validator, err := state.ValidatorWrapper(addrs[i]) - if err != nil { - return err - } - validators = append(validators, validator) - } - // Batch write the current data as snapshot - for i := range validators { - if err := rawdb.WriteValidatorSnapshot(batch, validators[i], epoch); err != nil { - return err - } + if err := rawdb.WriteValidatorSnapshot(batch, snapshot.Validator, snapshot.Epoch); err != nil { + return err } // Update cache - for i := range validators { - by, err := rlp.EncodeToBytes(validators[i]) - if err == nil { - key := "validator-snapshot-" + string(validators[i].Address.Bytes()) + epoch.String() - bc.validatorCache.Add(key, by) - } - } + key := snapshot.Validator.Address.Hex() + snapshot.Epoch.String() + bc.validatorSnapshotCache.Add(key, snapshot) return nil } @@ -2381,7 +2358,21 @@ func (bc *BlockChain) UpdateValidatorSnapshots( allValidators = append(allValidators, newValidators...) - return bc.writeValidatorSnapshots(batch, allValidators, epoch, state) + // Read all validator's current data and snapshot them + for i := range allValidators { + // The snapshot will be captured in the state after the last epoch block is finalized + validator, err := state.ValidatorWrapper(allValidators[i]) + if err != nil { + return err + } + + snapshot := &staking.ValidatorSnapshot{validator, epoch} + if err := bc.WriteValidatorSnapshot(batch, snapshot); err != nil { + return err + } + } + + return nil } // ReadValidatorList reads the addresses of current all validators @@ -2494,13 +2485,13 @@ func (bc *BlockChain) UpdateStakingMetaData( return newValidators, err } - if err := rawdb.WriteValidatorSnapshot(batch, validator, epoch); err != nil { + if err := bc.WriteValidatorSnapshot(batch, &staking.ValidatorSnapshot{validator, epoch}); err != nil { return newValidators, err } // For validator created at exactly the last block of an epoch, we should create the snapshot // for next epoch too. if newEpoch.Cmp(epoch) > 0 { - if err := rawdb.WriteValidatorSnapshot(batch, validator, newEpoch); err != nil { + if err := bc.WriteValidatorSnapshot(batch, &staking.ValidatorSnapshot{validator, newEpoch}); err != nil { return newValidators, err } } diff --git a/core/staking_verifier.go b/core/staking_verifier.go index 3002ba98e..2c6b7bfd4 100644 --- a/core/staking_verifier.go +++ b/core/staking_verifier.go @@ -95,7 +95,7 @@ func VerifyAndEditValidatorFromMsg( if !stateDB.IsValidator(msg.ValidatorAddress) { return nil, errValidatorNotExist } - wrapper, err := stateDB.ValidatorWrapper(msg.ValidatorAddress) + wrapper, err := stateDB.ValidatorWrapperCopy(msg.ValidatorAddress) if err != nil { return nil, err } @@ -158,7 +158,7 @@ func VerifyAndDelegateFromMsg( if !stateDB.IsValidator(msg.ValidatorAddress) { return nil, nil, errValidatorNotExist } - wrapper, err := stateDB.ValidatorWrapper(msg.ValidatorAddress) + wrapper, err := stateDB.ValidatorWrapperCopy(msg.ValidatorAddress) if err != nil { return nil, nil, err } @@ -252,7 +252,7 @@ func VerifyAndUndelegateFromMsg( return nil, errValidatorNotExist } - wrapper, err := stateDB.ValidatorWrapper(msg.ValidatorAddress) + wrapper, err := stateDB.ValidatorWrapperCopy(msg.ValidatorAddress) if err != nil { return nil, err } @@ -295,7 +295,7 @@ func VerifyAndCollectRewardsFromDelegation( totalRewards := big.NewInt(0) for i := range delegations { delegation := &delegations[i] - wrapper, err := stateDB.ValidatorWrapper(delegation.ValidatorAddress) + wrapper, err := stateDB.ValidatorWrapperCopy(delegation.ValidatorAddress) if err != nil { return nil, nil, err } @@ -313,11 +313,6 @@ func VerifyAndCollectRewardsFromDelegation( Msg("Delegation index out of bound") return nil, nil, errors.New("Delegation index out of bound") } - if err := wrapper.SanityCheck( - staking.DoNotEnforceMaxBLS, - ); err != nil { - return nil, nil, err - } updatedValidatorWrappers = append(updatedValidatorWrappers, wrapper) } if totalRewards.Int64() == 0 { diff --git a/core/state/statedb.go b/core/state/statedb.go index 17bafbcc9..989f1c33e 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -69,6 +69,7 @@ type DB struct { // This map holds 'live' objects, which will get modified while processing a state transition. stateObjects map[common.Address]*Object stateObjectsDirty map[common.Address]struct{} + stateValidators map[common.Address]*stk.ValidatorWrapper // DB error. // State objects are used by the consensus core and VM which are @@ -105,6 +106,7 @@ func New(root common.Hash, db Database) (*DB, error) { trie: tr, stateObjects: make(map[common.Address]*Object), stateObjectsDirty: make(map[common.Address]struct{}), + stateValidators: make(map[common.Address]*stk.ValidatorWrapper), logs: make(map[common.Hash][]*types.Log), preimages: make(map[common.Hash][]byte), journal: newJournal(), @@ -132,6 +134,7 @@ func (db *DB) Reset(root common.Hash) error { db.trie = tr db.stateObjects = make(map[common.Address]*Object) db.stateObjectsDirty = make(map[common.Address]struct{}) + db.stateValidators = make(map[common.Address]*stk.ValidatorWrapper) db.thash = common.Hash{} db.bhash = common.Hash{} db.txIndex = 0 @@ -520,6 +523,7 @@ func (db *DB) Copy() *DB { trie: db.db.CopyTrie(db.trie), stateObjects: make(map[common.Address]*Object, len(db.journal.dirties)), stateObjectsDirty: make(map[common.Address]struct{}, len(db.journal.dirties)), + stateValidators: make(map[common.Address]*stk.ValidatorWrapper), refund: db.refund, logs: make(map[common.Hash][]*types.Log, len(db.logs)), logSize: db.logSize, @@ -546,6 +550,7 @@ func (db *DB) Copy() *DB { state.stateObjectsDirty[addr] = struct{}{} } } + for hash, logs := range db.logs { cpy := make([]*types.Log, len(logs)) for i, l := range logs { @@ -592,6 +597,11 @@ func (db *DB) GetRefund() uint64 { // Finalise finalises the state by removing the db destructed objects // and clears the journal as well as the refunds. func (db *DB) Finalise(deleteEmptyObjects bool) { + // Commit validator changes in cache to stateObjects + for addr, val := range db.stateValidators { + db.UpdateValidatorWrapper(addr, val) + } + for addr := range db.journal.dirties { stateObject, exist := db.stateObjects[addr] if !exist { @@ -691,9 +701,34 @@ var ( errAddressNotPresent = errors.New("address not present in state") ) -// ValidatorWrapper .. +// ValidatorWrapper retrieves the existing validator in the cache. +// The return value is a reference to the actual validator object in state. +// The modification on it will be committed to the state object when Finalize() +// is called. func (db *DB) ValidatorWrapper( addr common.Address, +) (*stk.ValidatorWrapper, error) { + // Read cache first + cached, ok := db.stateValidators[addr] + if ok { + return cached, nil + } + + val, err := db.ValidatorWrapperCopy(addr) + if err != nil { + return nil, err + } + // populate cache if the validator is not in it + db.stateValidators[addr] = val + return val, nil + +} + +// ValidatorWrapperCopy retrieves the existing validator as a copy from state object. +// Changes on the copy has to be explicitly commited with UpdateValidatorWrapper() +// to take effect. +func (db *DB) ValidatorWrapperCopy( + addr common.Address, ) (*stk.ValidatorWrapper, error) { by := db.GetCode(addr) if len(by) == 0 { @@ -726,6 +761,8 @@ func (db *DB) UpdateValidatorWrapper( return err } db.SetCode(addr, by) + // update cache + db.stateValidators[addr] = val return nil } @@ -791,7 +828,7 @@ func (db *DB) AddReward(snapshot *stk.ValidatorWrapper, reward *big.Int, shareLo percentage, ok := shareLookup[delegation.DelegatorAddress] if !ok { - continue + return errors.Wrapf(err, "missing delegation shares for reward distribution") } rewardInt := percentage.MulInt(totalRewardForDelegators).RoundInt() @@ -806,5 +843,5 @@ func (db *DB) AddReward(snapshot *stk.ValidatorWrapper, reward *big.Int, shareLo curValidator.Delegations[0].Reward.Add(curValidator.Delegations[0].Reward, rewardPool) } - return db.UpdateValidatorWrapper(curValidator.Address, curValidator) + return nil } diff --git a/core/state_transition.go b/core/state_transition.go index 3d579fe08..247f588ee 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -387,7 +387,6 @@ func (st *StateTransition) StakingTransitionDb() (usedGas uint64, err error) { func (st *StateTransition) verifyAndApplyCreateValidatorTx( createValidator *staking.CreateValidator, blockNum *big.Int, ) error { - wrapper, err := VerifyAndCreateValidatorFromMsg( st.state, st.evm.EpochNumber, blockNum, createValidator, ) @@ -397,8 +396,8 @@ func (st *StateTransition) verifyAndApplyCreateValidatorTx( if err := st.state.UpdateValidatorWrapper(wrapper.Address, wrapper); err != nil { return err } - st.state.SetValidatorFlag(wrapper.Address) - st.state.SubBalance(wrapper.Address, createValidator.Amount) + st.state.SetValidatorFlag(createValidator.ValidatorAddress) + st.state.SubBalance(createValidator.ValidatorAddress, createValidator.Amount) return nil } diff --git a/core/tx_pool.go b/core/tx_pool.go index 8c57e4072..9c86eabf7 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -17,7 +17,6 @@ package core import ( - "context" "fmt" "math" "math/big" @@ -763,6 +762,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { if shard.Schedule.IsLastBlock(currentBlockNumber.Uint64()) { pendingEpoch = new(big.Int).Add(pendingEpoch, big.NewInt(1)) } + _, err = VerifyAndCreateValidatorFromMsg(pool.currentState, pendingEpoch, pendingBlockNumber, stkMsg) return err case staking.DirectiveEditValidator: @@ -782,6 +782,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { chainContext = nil // might use testing blockchain, set to nil for verifier to handle. } pendingBlockNumber := new(big.Int).Add(pool.chain.CurrentBlock().Number(), big.NewInt(1)) + _, err = VerifyAndEditValidatorFromMsg( pool.currentState, chainContext, pool.chain.CurrentBlock().Epoch(), @@ -800,6 +801,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { if from != stkMsg.DelegatorAddress { return errors.WithMessagef(ErrInvalidSender, "staking transaction sender is %s", b32) } + _, _, err = VerifyAndDelegateFromMsg(pool.currentState, stkMsg) return err case staking.DirectiveUndelegate: @@ -818,6 +820,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { if shard.Schedule.IsLastBlock(pool.chain.CurrentBlock().Number().Uint64()) { pendingEpoch = new(big.Int).Add(pendingEpoch, big.NewInt(1)) } + _, err = VerifyAndUndelegateFromMsg(pool.currentState, pendingEpoch, stkMsg) return err case staking.DirectiveCollectRewards: @@ -840,6 +843,7 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error { if err != nil { return err } + _, _, err = VerifyAndCollectRewardsFromDelegation(pool.currentState, delegations) return err default: @@ -945,14 +949,6 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { return replace, nil } -// Add adds a transaction to the pool if valid and passes it to the tx relay -// backend -func (pool *TxPool) Add(ctx context.Context, tx *types.PoolTransaction) error { - // TODO(ricl): placeholder - // TODO(minhdoan): follow with richard why we need this. As of now TxPool is not used now. - return nil -} - // enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! diff --git a/core/vm/interface.go b/core/vm/interface.go index 17e5b20a4..cc3eae5bb 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -42,7 +42,7 @@ type StateDB interface { SetCode(common.Address, []byte) GetCodeSize(common.Address) int - ValidatorWrapper(common.Address) (*staking.ValidatorWrapper, error) + ValidatorWrapperCopy(common.Address) (*staking.ValidatorWrapper, error) UpdateValidatorWrapper(common.Address, *staking.ValidatorWrapper) error SetValidatorFlag(common.Address) UnsetValidatorFlag(common.Address) diff --git a/internal/chain/engine.go b/internal/chain/engine.go index 4f0521a55..fddec42be 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -336,12 +336,6 @@ func payoutUndelegations( state.AddBalance(delegation.DelegatorAddress, totalWithdraw) } countTrack[validator] = len(wrapper.Delegations) - if err := state.UpdateValidatorWrapper( - validator, wrapper, - ); err != nil { - const msg = "[Finalize] failed update validator info" - return errors.New(msg) - } } utils.Logger().Info(). @@ -367,12 +361,6 @@ func setLastEpochInCommittee(header *block.Header, state *state.DB) error { ) } wrapper.LastEpochInCommittee = newShardState.Epoch - if err := state.UpdateValidatorWrapper( - addr, wrapper, - ); err != nil { - const msg = "[Finalize] failed update validator info" - return errors.New(msg) - } } return nil } diff --git a/node/worker/worker.go b/node/worker/worker.go index ef5177964..f9aee4bd3 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -145,6 +145,7 @@ func (w *Worker) CommitTransactions( // Start executing the transaction w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs)) // THESE CODE ARE DUPLICATED AS ABOVE>> + // TODO(audit): add staking txn revert functionality if _, err := w.commitStakingTransaction(tx, coinbase); err != nil { txID := tx.Hash().Hex() utils.Logger().Error().Err(err). @@ -460,10 +461,8 @@ func (w *Worker) FinalizeNewBlock( return nil, err } } - state := w.current.state.Copy() copyHeader := types.CopyHeader(w.current.header) - // TODO: feed coinbase into here so the proposer gets extra rewards. block, _, err := w.engine.Finalize( w.chain, copyHeader, state, w.current.txs, w.current.receipts, w.current.outcxs, w.current.incxs, w.current.stakingTxs, diff --git a/staking/availability/measure.go b/staking/availability/measure.go index 459df0798..324f8a874 100644 --- a/staking/availability/measure.go +++ b/staking/availability/measure.go @@ -108,12 +108,6 @@ func bumpCount( wrapper.Counters.NumBlocksSigned, common.Big1, ) } - - if err := state.UpdateValidatorWrapper( - addr, wrapper, - ); err != nil { - return err - } } } @@ -228,11 +222,5 @@ func ComputeAndMutateEPOSStatus( // to leave the committee can actually leave. } - if err := state.UpdateValidatorWrapper( - addr, wrapper, - ); err != nil { - return err - } - return nil } diff --git a/staking/slash/double-sign.go b/staking/slash/double-sign.go index 290695f82..c28977d16 100644 --- a/staking/slash/double-sign.go +++ b/staking/slash/double-sign.go @@ -334,7 +334,8 @@ func delegatorSlashApply( slashDebt := applySlashRate(delegationSnapshot.Amount, rate) slashDiff := &Application{big.NewInt(0), big.NewInt(0)} snapshotAddr := delegationSnapshot.DelegatorAddress - for _, delegationNow := range current.Delegations { + for i := range current.Delegations { + delegationNow := current.Delegations[i] if nowAmt := delegationNow.Amount; delegationNow.DelegatorAddress == snapshotAddr { utils.Logger().Info(). RawJSON("delegation-snapshot", []byte(delegationSnapshot.String())). @@ -473,9 +474,7 @@ func Apply( RawJSON("slash", []byte(slash.String())). Msg("about to update staking info for a validator after a slash") - if err := state.UpdateValidatorWrapper( - snapshot.Validator.Address, current, - ); err != nil { + if err := current.SanityCheck(staking.DoNotEnforceMaxBLS); err != nil { return nil, err } } diff --git a/staking/types/validator.go b/staking/types/validator.go index b5cf915cc..d70d227c5 100644 --- a/staking/types/validator.go +++ b/staking/types/validator.go @@ -26,7 +26,7 @@ const ( MaxSecurityContactLength = 140 MaxDetailsLength = 280 BLSVerificationStr = "harmony-one" - TenThousand = 10000 + TenThousand = 10 ) var ( diff --git a/test/chain/reward/main.go b/test/chain/reward/main.go index 158bcd1bc..6ef7b2a79 100644 --- a/test/chain/reward/main.go +++ b/test/chain/reward/main.go @@ -101,8 +101,6 @@ func main() { }) } - statedb.UpdateValidatorWrapper(validator.Address, validator) - startTime := time.Now() validator, _ = statedb.ValidatorWrapper(msg.ValidatorAddress) endTime := time.Now()