Optimize delegation reward distribution with caching (#2839)

* Optimize delegation reward distribution with caching

* fix build

* fix lint
pull/2841/head
Rongjian Lan 5 years ago committed by GitHub
parent cc37995a31
commit db8f1a05b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      core/blockchain.go
  2. 4
      core/chain_makers.go
  3. 2
      core/evm.go
  4. 5
      core/rawdb/accessors_offchain.go
  5. 18
      core/state/statedb.go
  6. 4
      core/vm/interface.go
  7. 4
      hmy/api_backend.go
  8. 2
      internal/chain/engine.go
  9. 81
      internal/chain/reward.go
  10. 2
      node/node_handler.go
  11. 10
      shard/committee/assignment.go
  12. 4
      staking/apr/compute.go
  13. 4
      staking/availability/measure.go
  14. 4
      staking/slash/double-sign.go
  15. 6
      staking/slash/double-sign_test.go
  16. 8
      staking/types/validator.go
  17. 49
      test/chain/reward/main.go

@ -2187,14 +2187,14 @@ func (bc *BlockChain) ReadValidatorInformation(
func (bc *BlockChain) ReadValidatorSnapshotAtEpoch(
epoch *big.Int,
addr common.Address,
) (*staking.ValidatorWrapper, error) {
) (*staking.ValidatorSnapshot, error) {
return rawdb.ReadValidatorSnapshot(bc.db, addr, epoch)
}
// ReadValidatorSnapshot reads the snapshot staking information of given validator address
func (bc *BlockChain) ReadValidatorSnapshot(
addr common.Address,
) (*staking.ValidatorWrapper, error) {
) (*staking.ValidatorSnapshot, error) {
epoch := bc.CurrentBlock().Epoch()
if cached, ok := bc.validatorCache.Get("validator-snapshot-" + string(addr.Bytes()) + epoch.String()); ok {
by := cached.([]byte)
@ -2202,7 +2202,8 @@ func (bc *BlockChain) ReadValidatorSnapshot(
if err := rlp.DecodeBytes(by, &v); err != nil {
return nil, err
}
return &v, nil
s := staking.ValidatorSnapshot{&v, epoch}
return &s, nil
}
return rawdb.ReadValidatorSnapshot(bc.db, addr, epoch)
}

@ -275,12 +275,12 @@ func (cr *fakeChainReader) ReadValidatorInformation(
}
func (cr *fakeChainReader) ReadValidatorSnapshot(
addr common.Address,
) (*staking.ValidatorWrapper, error) {
) (*staking.ValidatorSnapshot, error) {
return nil, nil
}
func (cr *fakeChainReader) ReadValidatorSnapshotAtEpoch(
epoch *big.Int, addr common.Address,
) (*staking.ValidatorWrapper, error) {
) (*staking.ValidatorSnapshot, error) {
return nil, nil
}

@ -40,7 +40,7 @@ type ChainContext interface {
ReadDelegationsByDelegator(common.Address) (staking.DelegationIndexes, error)
// ReadValidatorSnapshot returns the snapshot of validator at the beginning of current epoch.
ReadValidatorSnapshot(common.Address) (*staking.ValidatorWrapper, error)
ReadValidatorSnapshot(common.Address) (*staking.ValidatorSnapshot, error)
}
// NewEVMContext creates a new context for use in the EVM.

@ -171,7 +171,7 @@ func DeleteCXReceiptsProofSpent(db DatabaseDeleter, shardID uint32, number uint6
// ReadValidatorSnapshot retrieves validator's snapshot by its address
func ReadValidatorSnapshot(
db DatabaseReader, addr common.Address, epoch *big.Int,
) (*staking.ValidatorWrapper, error) {
) (*staking.ValidatorSnapshot, error) {
data, err := db.Get(validatorSnapshotKey(addr, epoch))
if err != nil || len(data) == 0 {
utils.Logger().Info().Err(err).Msg("ReadValidatorSnapshot")
@ -184,7 +184,8 @@ func ReadValidatorSnapshot(
Msg("Unable to decode validator snapshot from database")
return nil, err
}
return &v, nil
s := staking.ValidatorSnapshot{&v, epoch}
return &s, nil
}
// WriteValidatorSnapshot stores validator's snapshot by its address

@ -753,7 +753,7 @@ var (
)
// AddReward distributes the reward to all the delegators based on stake percentage.
func (db *DB) AddReward(snapshot *stk.ValidatorWrapper, reward *big.Int) error {
func (db *DB) AddReward(snapshot *stk.ValidatorWrapper, reward *big.Int, shareLookup map[common.Address]numeric.Dec) error {
if reward.Cmp(common.Big0) == 0 {
utils.Logger().Info().RawJSON("validator", []byte(snapshot.String())).
Msg("0 given as reward")
@ -783,19 +783,17 @@ func (db *DB) AddReward(snapshot *stk.ValidatorWrapper, reward *big.Int) error {
)
rewardPool.Sub(rewardPool, commissionInt)
}
totalRewardForDelegators := big.NewInt(0).Set(rewardPool)
// Payout each delegator's reward pro-rata
totalDelegationDec := numeric.NewDecFromBigInt(snapshot.TotalDelegation())
totalRewardForDelegators := big.NewInt(0).Set(rewardPool)
for i := range snapshot.Delegations {
delegation := snapshot.Delegations[i]
// NOTE percentage = <this_delegator_amount>/<total_delegation>
if totalDelegationDec.Equal(zero) {
utils.Logger().Info().
RawJSON("validator-snapshot", []byte(snapshot.String())).
Msg("zero total delegation during AddReward delegation payout")
return nil
percentage, ok := shareLookup[delegation.DelegatorAddress]
if !ok {
continue
}
percentage := numeric.NewDecFromBigInt(delegation.Amount).Quo(totalDelegationDec)
rewardInt := percentage.MulInt(totalRewardForDelegators).RoundInt()
curDelegation := curValidator.Delegations[i]
curDelegation.Reward.Add(curDelegation.Reward, rewardInt)

@ -19,6 +19,8 @@ package vm
import (
"math/big"
"github.com/harmony-one/harmony/numeric"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
staking "github.com/harmony-one/harmony/staking/types"
@ -45,7 +47,7 @@ type StateDB interface {
SetValidatorFlag(common.Address)
UnsetValidatorFlag(common.Address)
IsValidator(common.Address) bool
AddReward(*staking.ValidatorWrapper, *big.Int) error
AddReward(*staking.ValidatorWrapper, *big.Int, map[common.Address]numeric.Dec) error
AddRefund(uint64)
SubRefund(uint64)

@ -406,7 +406,7 @@ func (b *APIBackend) GetValidatorInformation(
}
computed := availability.ComputeCurrentSigning(
snapshot, wrapper,
snapshot.Validator, wrapper,
)
beaconChainBlocks := uint64(
b.hmy.BeaconChain().CurrentBlock().Header().Number().Int64(),
@ -470,7 +470,7 @@ func (b *APIBackend) GetTotalStakingSnapshot() *big.Int {
snapshot, _ := b.hmy.BlockChain().ReadValidatorSnapshot(candidates[i])
validator, _ := b.hmy.BlockChain().ReadValidatorInformation(candidates[i])
if !committee.IsEligibleForEPoSAuction(
snapshot, validator, b.hmy.BlockChain().CurrentBlock().Epoch(),
snapshot, validator,
) {
continue
}

@ -444,7 +444,7 @@ func applySlashes(
// Apply the slashes, invariant: assume been verified as legit slash by this point
var slashApplied *slash.Application
votingPower, err := lookupVotingPower(
header.Epoch(), new(big.Int).SetUint64(key.epoch), subComm,
big.NewInt(int64(key.epoch)), subComm,
)
if err != nil {
return errors.Wrapf(err, "could not lookup cached voting power in slash application")

@ -5,6 +5,9 @@ import (
"math/big"
"sort"
"github.com/harmony-one/harmony/numeric"
types2 "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
@ -42,11 +45,12 @@ func ballotResultBeaconchain(
}
var (
votingPowerCache singleflight.Group
votingPowerCache singleflight.Group
delegateShareCache singleflight.Group
)
func lookupVotingPower(
epoch, beaconCurrentEpoch *big.Int, subComm *shard.Committee,
epoch *big.Int, subComm *shard.Committee,
) (*votepower.Roster, error) {
key := fmt.Sprintf("%s-%d", epoch.String(), subComm.ShardID)
results, err, _ := votingPowerCache.Do(
@ -55,6 +59,12 @@ func lookupVotingPower(
if err != nil {
return nil, err
}
// For new calc, remove old data from 3 epochs ago
deleteEpoch := big.NewInt(0).Sub(epoch, big.NewInt(3))
deleteKey := fmt.Sprintf("%s-%d", deleteEpoch.String(), subComm.ShardID)
votingPowerCache.Forget(deleteKey)
return votingPower, nil
},
)
@ -62,14 +72,49 @@ func lookupVotingPower(
return nil, err
}
// TODO consider if this is the best way to clear the cache
if new(big.Int).Sub(beaconCurrentEpoch, epoch).Cmp(common.Big3) == 1 {
go func() {
votingPowerCache.Forget(key)
}()
return results.(*votepower.Roster), nil
}
// Lookup or compute the shares of stake for all delegators in a validator
func lookupDelegatorShares(
snapshot *types2.ValidatorSnapshot,
) (map[common.Address]numeric.Dec, error) {
epoch := snapshot.Epoch
validatorSnapshot := snapshot.Validator
key := fmt.Sprintf("%s-%s", epoch.String(), validatorSnapshot.Address.Hex())
shares, err, _ := delegateShareCache.Do(
key, func() (interface{}, error) {
result := map[common.Address]numeric.Dec{}
totalDelegationDec := numeric.NewDecFromBigInt(validatorSnapshot.TotalDelegation())
if totalDelegationDec.IsZero() {
utils.Logger().Info().
RawJSON("validator-snapshot", []byte(validatorSnapshot.String())).
Msg("zero total delegation during AddReward delegation payout")
return result, nil
}
for i := range validatorSnapshot.Delegations {
delegation := validatorSnapshot.Delegations[i]
// NOTE percentage = <this_delegator_amount>/<total_delegation>
percentage := numeric.NewDecFromBigInt(delegation.Amount).Quo(totalDelegationDec)
result[delegation.DelegatorAddress] = percentage
}
// For new calc, remove old data from 3 epochs ago
deleteEpoch := big.NewInt(0).Sub(epoch, big.NewInt(3))
deleteKey := fmt.Sprintf("%s-%s", deleteEpoch.String(), validatorSnapshot.Address.Hex())
votingPowerCache.Forget(deleteKey)
return result, nil
},
)
if err != nil {
return nil, err
}
return results.(*votepower.Roster), nil
return shares.(map[common.Address]numeric.Dec), nil
}
// AccumulateRewardsAndCountSigs credits the coinbase of the given block with the mining
@ -91,6 +136,7 @@ func AccumulateRewardsAndCountSigs(
if bc.Config().IsStaking(header.Epoch()) &&
bc.CurrentHeader().ShardID() != shard.BeaconChainShardID {
return network.EmptyPayout, nil
}
// After staking
@ -142,9 +188,8 @@ func AccumulateRewardsAndCountSigs(
); err != nil {
return network.EmptyPayout, err
}
beaconCurrentEpoch := beaconChain.CurrentHeader().Epoch()
votingPower, err := lookupVotingPower(
headerE, beaconCurrentEpoch, &subComm,
headerE, &subComm,
)
if err != nil {
return network.EmptyPayout, err
@ -167,7 +212,12 @@ func AccumulateRewardsAndCountSigs(
voter.OverallPercent.Quo(beaconExternalShare),
).RoundInt()
newRewards.Add(newRewards, due)
if err := state.AddReward(snapshot, due); err != nil {
shares, err := lookupDelegatorShares(snapshot)
if err != nil {
return network.EmptyPayout, err
}
if err := state.AddReward(snapshot.Validator, due, shares); err != nil {
return network.EmptyPayout, err
}
beaconP = append(beaconP, reward.Payout{
@ -237,7 +287,7 @@ func AccumulateRewardsAndCountSigs(
}
votingPower, err := lookupVotingPower(
epoch, beaconCurrentEpoch, subComm,
epoch, subComm,
)
if err != nil {
@ -303,7 +353,12 @@ func AccumulateRewardsAndCountSigs(
}
due := resultsHandle[bucket][payThem].payout
newRewards.Add(newRewards, due)
if err := state.AddReward(snapshot, due); err != nil {
shares, err := lookupDelegatorShares(snapshot)
if err != nil {
return network.EmptyPayout, err
}
if err := state.AddReward(snapshot.Validator, due, shares); err != nil {
return network.EmptyPayout, err
}
shardP = append(shardP, reward.Payout{

@ -464,7 +464,7 @@ func (node *Node) PostConsensusProcessing(
return
}
computed := availability.ComputeCurrentSigning(
snapshot, wrapper,
snapshot.Validator, wrapper,
)
beaconChainBlocks := uint64(node.Beaconchain().CurrentBlock().Header().Number().Int64()) %
shard.Schedule.BlocksPerEpoch()

@ -37,7 +37,7 @@ type Reader interface {
type StakingCandidatesReader interface {
CurrentBlock() *types.Block
ReadValidatorInformation(addr common.Address) (*staking.ValidatorWrapper, error)
ReadValidatorSnapshot(addr common.Address) (*staking.ValidatorWrapper, error)
ReadValidatorSnapshot(addr common.Address) (*staking.ValidatorSnapshot, error)
ValidatorCandidates() []common.Address
}
@ -142,7 +142,7 @@ func prepareOrders(
if err != nil {
return nil, err
}
if !IsEligibleForEPoSAuction(snapshot, validator, stakedReader.CurrentBlock().Epoch()) {
if !IsEligibleForEPoSAuction(snapshot, validator) {
continue
}
@ -184,18 +184,18 @@ func prepareOrders(
}
// IsEligibleForEPoSAuction ..
func IsEligibleForEPoSAuction(snapshot, validator *staking.ValidatorWrapper, curEpoch *big.Int) bool {
func IsEligibleForEPoSAuction(snapshot *staking.ValidatorSnapshot, validator *staking.ValidatorWrapper) bool {
// This original condition to check whether a validator is in last committee is not stable
// because cross-links may arrive after the epoch ends and it still got counted into the
// NumBlocksToSign, making this condition to be true when the validator is actually not in committee
//if snapshot.Counters.NumBlocksToSign.Cmp(validator.Counters.NumBlocksToSign) != 0 {
// Check whether the validator is in current committee
if validator.LastEpochInCommittee.Cmp(curEpoch) == 0 {
if validator.LastEpochInCommittee.Cmp(snapshot.Epoch) == 0 {
// validator was in last epoch's committee
// validator with below-threshold signing activity won't be considered for next epoch
// and their status will be turned to inactive in FinalizeNewBlock
computed := availability.ComputeCurrentSigning(snapshot, validator)
computed := availability.ComputeCurrentSigning(snapshot.Validator, validator)
if computed.IsBelowThreshold {
return false
}

@ -33,7 +33,7 @@ type Reader interface {
ReadValidatorSnapshotAtEpoch(
epoch *big.Int,
addr common.Address,
) (*staking.ValidatorWrapper, error)
) (*staking.ValidatorSnapshot, error)
}
const (
@ -121,7 +121,7 @@ func ComputeForValidator(
estimatedRewardPerYear, err := expectedRewardPerYear(
block.Header(), headerOneEpochAgo,
validatorNow, oneSnapshotAgo,
validatorNow, oneSnapshotAgo.Validator,
)
if err != nil {

@ -137,7 +137,7 @@ func IncrementValidatorSigningCounts(
type Reader interface {
ReadValidatorSnapshot(
addr common.Address,
) (*staking.ValidatorWrapper, error)
) (*staking.ValidatorSnapshot, error)
}
// ComputeCurrentSigning returns (signed, toSign, quotient, error)
@ -210,7 +210,7 @@ func ComputeAndMutateEPOSStatus(
return err
}
computed := ComputeCurrentSigning(snapshot, wrapper)
computed := ComputeCurrentSigning(snapshot.Validator, wrapper)
utils.Logger().
Info().Msg("check if signing percent is meeting required threshold")

@ -460,7 +460,7 @@ func Apply(
// stake, rest are external delegations.
// Bottom line: everyone will be slashed under the same rule.
if err := delegatorSlashApply(
snapshot, current, rate, state,
snapshot.Validator, current, rate, state,
slash.Reporter, slash.Evidence.Epoch, slashDiff,
); err != nil {
return nil, err
@ -474,7 +474,7 @@ func Apply(
Msg("about to update staking info for a validator after a slash")
if err := state.UpdateValidatorWrapper(
snapshot.Address, current,
snapshot.Validator.Address, current,
); err != nil {
return nil, err
}

@ -377,13 +377,13 @@ func exampleSlashRecords() Records {
}
type mockOutSnapshotReader struct {
snapshot staking.ValidatorWrapper
snapshot staking.ValidatorSnapshot
}
func (m mockOutSnapshotReader) ReadValidatorSnapshotAtEpoch(
epoch *big.Int,
addr common.Address,
) (*staking.ValidatorWrapper, error) {
) (*staking.ValidatorSnapshot, error) {
return &m.snapshot, nil
}
@ -448,7 +448,7 @@ func testScenario(
// state looks like as of this point
slashResult, err := Apply(
mockOutSnapshotReader{*s.snapshot},
mockOutSnapshotReader{staking.ValidatorSnapshot{s.snapshot, big.NewInt(0)}},
stateHandle,
slashes,
numeric.MustNewDecFromStr(

@ -69,7 +69,7 @@ type ValidatorSnapshotReader interface {
ReadValidatorSnapshotAtEpoch(
epoch *big.Int,
addr common.Address,
) (*ValidatorWrapper, error)
) (*ValidatorSnapshot, error)
}
type counters struct {
@ -91,6 +91,12 @@ type ValidatorWrapper struct {
BlockReward *big.Int `json:"-"`
}
// ValidatorSnapshot contains validator snapshot and the corresponding epoch
type ValidatorSnapshot struct {
Validator *ValidatorWrapper
Epoch *big.Int
}
// Computed represents current epoch
// availability measures, mostly for RPC
type Computed struct {

@ -6,6 +6,8 @@ import (
"math/rand"
"time"
"github.com/harmony-one/harmony/internal/utils"
common2 "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/harmony-one/bls/ffi/go/bls"
@ -27,7 +29,6 @@ var (
)
func init() {
bls.Init(bls.BLS12_381)
}
@ -62,12 +63,12 @@ func createValidator() *staking.CreateValidator {
MaxRate: maxRate,
MaxChangeRate: maxChangeRate,
}
minSelfDel := big.NewInt(1e18)
maxTotalDel := big.NewInt(9e18)
minSelfDel := new(big.Int).Mul(big.NewInt(5e18), big.NewInt(2000))
maxTotalDel := new(big.Int).Mul(big.NewInt(5e18), big.NewInt(100000))
pubKey, pubSig := generateBLSKeySigPair()
slotPubKeys := []shard.BLSPublicKey{pubKey}
slotKeySigs := []shard.BLSSignature{pubSig}
amount := big.NewInt(5e18)
amount := new(big.Int).Mul(big.NewInt(5e18), big.NewInt(2000))
v := staking.CreateValidator{
ValidatorAddress: validatorAddress,
Description: desc,
@ -84,10 +85,13 @@ func createValidator() *staking.CreateValidator {
func main() {
statedb, _ := state.New(common2.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
msg := createValidator()
statedb.AddBalance(msg.ValidatorAddress, big.NewInt(5e18))
validator, _ := core.VerifyAndCreateValidatorFromMsg(
statedb.AddBalance(msg.ValidatorAddress, new(big.Int).Mul(big.NewInt(5e18), big.NewInt(2000)))
validator, err := core.VerifyAndCreateValidatorFromMsg(
statedb, postStakingEpoch, big.NewInt(0), msg,
)
if err != nil {
fmt.Print(err)
}
for i := 0; i < 100000; i++ {
validator.Delegations = append(validator.Delegations, staking.Delegation{
common2.Address{},
@ -101,9 +105,36 @@ func main() {
startTime := time.Now()
validator, _ = statedb.ValidatorWrapper(msg.ValidatorAddress)
fmt.Printf("Total num delegations: %d\n", len(validator.Delegations))
statedb.AddReward(validator, big.NewInt(1000))
endTime := time.Now()
fmt.Printf("Time required to read validator: %f seconds\n", endTime.Sub(startTime).Seconds())
startTime = time.Now()
shares, _ := lookupDelegatorShares(validator)
endTime = time.Now()
fmt.Printf("Time required to calc percentage %d delegations: %f seconds\n", len(validator.Delegations), endTime.Sub(startTime).Seconds())
startTime = time.Now()
statedb.AddReward(validator, big.NewInt(1000), shares)
endTime = time.Now()
fmt.Printf("Time required to reward a validator with %d delegations: %f seconds\n", len(validator.Delegations), endTime.Sub(startTime).Seconds())
}
func lookupDelegatorShares(
snapshot *staking.ValidatorWrapper,
) (result map[common2.Address]numeric.Dec, err error) {
result = map[common2.Address]numeric.Dec{}
totalDelegationDec := numeric.NewDecFromBigInt(snapshot.TotalDelegation())
for i := range snapshot.Delegations {
delegation := snapshot.Delegations[i]
// NOTE percentage = <this_delegator_amount>/<total_delegation>
if totalDelegationDec.IsZero() {
utils.Logger().Info().
RawJSON("validator-snapshot", []byte(snapshot.String())).
Msg("zero total delegation during AddReward delegation payout")
return nil, nil
}
percentage := numeric.NewDecFromBigInt(delegation.Amount).Quo(totalDelegationDec)
result[delegation.DelegatorAddress] = percentage
}
return result, nil
}

Loading…
Cancel
Save