[reward] Do not update state in reward, thread through payout, write payout on each block

pull/1942/head
Edgar Aroutiounian 5 years ago
parent d907421b93
commit c30886b1d4
  1. 10
      consensus/engine/consensus_engine.go
  2. 41
      core/blockchain.go
  3. 9
      core/chain_makers.go
  4. 8
      core/rawdb/accessors_chain.go
  5. 11
      core/rawdb/schema.go
  6. 16
      core/state_processor.go
  7. 6
      core/types.go
  8. 18
      internal/chain/engine.go
  9. 43
      internal/chain/reward.go
  10. 10
      node/worker/worker.go

@ -48,12 +48,10 @@ type ChainReader interface {
// Methods needed for EPoS committee assignment calculation
committee.StakingCandidatesReader
//BlockRewardAccumulator is the current accumulator
BlockRewardAccumulator() (*big.Int, error)
// UpdateBlockRewardAccumulator => accumulator = accumulator + diff
UpdateBlockRewardAccumulator(diff *big.Int) error
//ReadBlockRewardAccumulator is the block-reward given for block number
ReadBlockRewardAccumulator(uint64) (*big.Int, error)
// WriteBlockRewardAccumulator writes directly to the accumulator field
WriteBlockRewardAccumulator(reward *big.Int) error
WriteBlockRewardAccumulator(reward *big.Int, number uint64) error
}
// Engine is an algorithm agnostic consensus engine.
@ -115,7 +113,7 @@ type Engine interface {
state *state.DB, txs []*types.Transaction,
receipts []*types.Receipt, outcxs []*types.CXReceipt,
incxs []*types.CXReceiptsProof, stks []*staking.StakingTransaction,
) (*types.Block, error)
) (*types.Block, *big.Int, error)
// Seal generates a new sealing request for the given input block and pushes
// the result into the given channel.

@ -241,7 +241,7 @@ func (bc *BlockChain) ValidateNewBlock(block *types.Block) error {
}
// Process block using the parent state as reference point.
receipts, cxReceipts, _, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
receipts, cxReceipts, _, usedGas, _, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
return err
@ -1016,7 +1016,10 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
}
// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, cxReceipts []*types.CXReceipt, state *state.DB) (status WriteStatus, err error) {
func (bc *BlockChain) WriteBlockWithState(
block *types.Block, receipts []*types.Receipt,
cxReceipts []*types.CXReceipt, payout *big.Int, state *state.DB,
) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
@ -1305,11 +1308,17 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
len(block.Header().ShardState()) > 0 &&
!bc.chainConfig.IsStaking(block.Epoch())
if curHeader := bc.CurrentHeader(); isFirstTimeStaking &&
curHeader := bc.CurrentHeader()
if isFirstTimeStaking &&
curHeader.ShardID() == shard.BeaconChainShardID {
bc.WriteBlockRewardAccumulator(big.NewInt(0))
bc.WriteBlockRewardAccumulator(big.NewInt(0), curHeader.Number().Uint64())
}
if payout != nil &&
curHeader.ShardID() == shard.BeaconChainShardID &&
bc.chainConfig.IsStaking(block.Epoch()) {
bc.WriteBlockRewardAccumulator(payout, block.Number().Uint64())
}
/////////////////////////// END
// If the total difficulty is higher than our known, add it to the canonical chain
@ -1518,7 +1527,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
}
// Process block using the parent state as reference point.
receipts, cxReceipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
receipts, cxReceipts, logs, usedGas, payout, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err
@ -1533,7 +1542,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
proctime := time.Since(bstart)
// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, cxReceipts, state)
status, err := bc.WriteBlockWithState(block, receipts, cxReceipts, payout, state)
if err != nil {
return i, events, coalescedLogs, err
}
@ -2850,25 +2859,15 @@ func (bc *BlockChain) UpdateStakingMetaData(tx *staking.StakingTransaction, root
return nil
}
// BlockRewardAccumulator ..
func (bc *BlockChain) BlockRewardAccumulator() (*big.Int, error) {
return rawdb.ReadBlockRewardAccumulator(bc.db)
// ReadBlockRewardAccumulator ..
func (bc *BlockChain) ReadBlockRewardAccumulator(number uint64) (*big.Int, error) {
return rawdb.ReadBlockRewardAccumulator(bc.db, number)
}
// WriteBlockRewardAccumulator directly writes the BlockRewardAccumulator value
// Note: this should only be called once during staking launch.
func (bc *BlockChain) WriteBlockRewardAccumulator(reward *big.Int) error {
return rawdb.WriteBlockRewardAccumulator(bc.db, reward)
}
//UpdateBlockRewardAccumulator ..
// Note: this should only be called within the blockchain insertBlock process.
func (bc *BlockChain) UpdateBlockRewardAccumulator(diff *big.Int) error {
current, err := bc.BlockRewardAccumulator()
if err != nil {
return err
}
return bc.WriteBlockRewardAccumulator(new(big.Int).Add(current, diff))
func (bc *BlockChain) WriteBlockRewardAccumulator(reward *big.Int, number uint64) error {
return rawdb.WriteBlockRewardAccumulator(bc.db, reward, number)
}
// Note this should read from the state of current block in concern (root == newBlock.root)

@ -188,7 +188,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
if b.engine != nil {
// Finalize and seal the block
block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts, nil, nil, nil)
block, _, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts, nil, nil, nil)
if err != nil {
panic(err)
}
@ -278,7 +278,8 @@ func (cr *fakeChainReader) ReadValidatorInformation(addr common.Address) (*staki
func (cr *fakeChainReader) ReadValidatorSnapshot(addr common.Address) (*staking.ValidatorWrapper, error) {
return nil, nil
}
func (cr *fakeChainReader) BlockRewardAccumulator() (*big.Int, error) { return nil, nil }
func (cr *fakeChainReader) UpdateBlockRewardAccumulator(diff *big.Int) error { return nil }
func (cr *fakeChainReader) ReadBlockRewardAccumulator(uint64) (*big.Int, error) { return nil, nil }
func (cr *fakeChainReader) ValidatorStakingWithDelegation(addr common.Address) *big.Int { return nil }
func (cr *fakeChainReader) WriteBlockRewardAccumulator(reward *big.Int) error { return nil }
func (cr *fakeChainReader) WriteBlockRewardAccumulator(reward *big.Int, number uint64) error {
return nil
}

@ -766,8 +766,8 @@ func WriteDelegationsByDelegator(db DatabaseWriter, delegator common.Address, in
}
// ReadBlockRewardAccumulator ..
func ReadBlockRewardAccumulator(db DatabaseReader) (*big.Int, error) {
data, err := db.Get(CurrentRewardGivenOut)
func ReadBlockRewardAccumulator(db DatabaseReader, number uint64) (*big.Int, error) {
data, err := db.Get(blockRewardAccumKey(number))
if err != nil {
return nil, err
}
@ -775,6 +775,6 @@ func ReadBlockRewardAccumulator(db DatabaseReader) (*big.Int, error) {
}
// WriteBlockRewardAccumulator ..
func WriteBlockRewardAccumulator(db DatabaseWriter, newAccum *big.Int) error {
return db.Put(CurrentRewardGivenOut, newAccum.Bytes())
func WriteBlockRewardAccumulator(db DatabaseWriter, newAccum *big.Int, number uint64) error {
return db.Put(blockRewardAccumKey(number), newAccum.Bytes())
}

@ -93,10 +93,9 @@ var (
// Chain index prefixes (use `i` + single byte to avoid mixing data types).
BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress
preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
// CurrentRewardGivenOut ..
CurrentRewardGivenOut = []byte("total-reward-given-out")
preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
currentRewardGivenOutPrefix = []byte("blk-rwd-")
)
// TxLookupEntry is a positional metadata to help looking up the data content of
@ -251,3 +250,7 @@ func validatorStatsKey(addr common.Address) []byte {
prefix := validatorStatsPrefix
return append(prefix, addr.Bytes()...)
}
func blockRewardAccumKey(number uint64) []byte {
return append(currentRewardGivenOutPrefix, encodeBlockNumber(number)...)
}

@ -60,7 +60,9 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
// Process returns the receipts and logs accumulated during the process and
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.Config) (types.Receipts, types.CXReceipts, []*types.Log, uint64, error) {
func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.Config) (
types.Receipts, types.CXReceipts, []*types.Log, uint64, *big.Int, error,
) {
var (
receipts types.Receipts
outcxs types.CXReceipts
@ -78,7 +80,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C
statedb.Prepare(tx.Hash(), block.Hash(), i)
receipt, cxReceipt, _, err := ApplyTransaction(p.config, p.bc, &coinbase, gp, statedb, header, tx, usedGas, cfg)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, 0, nil, err
}
receipts = append(receipts, receipt)
if cxReceipt != nil {
@ -95,7 +97,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C
ApplyStakingTransaction(p.config, p.bc, &coinbase, gp, statedb, header, tx, usedGas, cfg)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, 0, nil, err
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
@ -105,17 +107,17 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C
for _, cx := range block.IncomingReceipts() {
err := ApplyIncomingReceipt(p.config, statedb, header, cx)
if err != nil {
return nil, nil, nil, 0, ctxerror.New("cannot apply incoming receipts").WithCause(err)
return nil, nil, nil, 0, nil, ctxerror.New("cannot apply incoming receipts").WithCause(err)
}
}
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
_, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts, outcxs, incxs, block.StakingTransactions())
_, payout, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts, outcxs, incxs, block.StakingTransactions())
if err != nil {
return nil, nil, nil, 0, ctxerror.New("cannot finalize block").WithCause(err)
return nil, nil, nil, 0, nil, ctxerror.New("cannot finalize block").WithCause(err)
}
return receipts, outcxs, allLogs, *usedGas, nil
return receipts, outcxs, allLogs, *usedGas, payout, nil
}
// return true if it is valid

@ -17,6 +17,8 @@
package core
import (
"math/big"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
@ -54,5 +56,7 @@ type Validator interface {
// of gas used in the process and return an error if any of the internal rules
// failed.
type Processor interface {
Process(block *types.Block, statedb *state.DB, cfg vm.Config) (types.Receipts, types.CXReceipts, []*types.Log, uint64, error)
Process(block *types.Block, statedb *state.DB, cfg vm.Config) (
types.Receipts, types.CXReceipts, []*types.Log, uint64, *big.Int, error,
)
}

@ -2,6 +2,7 @@ package chain
import (
"encoding/binary"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
@ -191,14 +192,15 @@ func (e *engineImpl) Finalize(
state *state.DB, txs []*types.Transaction,
receipts []*types.Receipt, outcxs []*types.CXReceipt,
incxs []*types.CXReceiptsProof, stks []*staking.StakingTransaction,
) (*types.Block, error) {
) (*types.Block, *big.Int, error) {
// Accumulate any block and uncle rewards and commit the final state root
// Header seems complete, assemble into a block and return
if err := AccumulateRewards(
payout, err := AccumulateRewards(
chain, state, header, e.Rewarder(), e.Slasher(), e.Beaconchain(),
); err != nil {
return nil, ctxerror.New("cannot pay block reward").WithCause(err)
)
if err != nil {
return nil, nil, ctxerror.New("cannot pay block reward").WithCause(err)
}
// TODO Shouldnt this logic only apply to beaconchain, right?
@ -208,7 +210,7 @@ func (e *engineImpl) Finalize(
// TODO: make sure we are using the correct validator list
validators, err := chain.ReadActiveValidatorList()
if err != nil {
return nil, ctxerror.New("failed to read active validators").WithCause(err)
return nil, nil, ctxerror.New("failed to read active validators").WithCause(err)
}
for _, validator := range validators {
wrapper := state.GetStakingInfo(validator)
@ -219,16 +221,16 @@ func (e *engineImpl) Finalize(
state.AddBalance(delegation.DelegatorAddress, totalWithdraw)
}
if err := state.UpdateStakingInfo(validator, wrapper); err != nil {
return nil, ctxerror.New("failed update validator info").WithCause(err)
return nil, nil, ctxerror.New("failed update validator info").WithCause(err)
}
} else {
err = errors.New("validator came back empty " + common2.MustAddressToBech32(validator))
return nil, ctxerror.New("failed getting validator info").WithCause(err)
return nil, nil, ctxerror.New("failed getting validator info").WithCause(err)
}
}
}
header.SetRoot(state.IntermediateRoot(chain.Config().IsS3(header.Epoch())))
return types.NewBlock(header, txs, receipts, outcxs, incxs, stks), nil
return types.NewBlock(header, txs, receipts, outcxs, incxs, stks), payout, nil
}
// QuorumForBlock returns the quorum for the given block header.

@ -140,7 +140,9 @@ func whatPercentStakedNow(
return nil, err
}
soFarDoledOut, err := beaconchain.BlockRewardAccumulator()
soFarDoledOut, err := beaconchain.ReadBlockRewardAccumulator(
beaconchain.CurrentHeader().Number().Uint64() - 1,
)
if err != nil {
return nil, err
@ -169,17 +171,17 @@ func AccumulateRewards(
bc engine.ChainReader, state *state.DB, header *block.Header,
rewarder reward.Distributor, slasher slash.Slasher,
beaconChain engine.ChainReader,
) error {
) (*big.Int, error) {
blockNum := header.Number().Uint64()
if blockNum == 0 {
// genesis block has no parent to reward.
return nil
return nil, nil
}
if bc.Config().IsStaking(header.Epoch()) &&
bc.CurrentHeader().ShardID() != shard.BeaconChainShardID {
return nil
return nil, nil
}
//// After staking
@ -191,7 +193,7 @@ func AccumulateRewards(
// TODO Use cached result in off-chain db instead of full computation
percentageStaked, err := whatPercentStakedNow(beaconChain, header.Time().Int64())
if err != nil {
return err
return nil, err
}
howMuchOff := targetStakedPercentage.Sub(*percentageStaked)
adjustBy := adjust(
@ -207,7 +209,7 @@ func AccumulateRewards(
// If too much is staked, then possible to have negative reward,
// not an error, just a possible economic situation, hence we return
if defaultReward.IsNegative() {
return nil
return nil, nil
}
newRewards := big.NewInt(0)
@ -215,12 +217,12 @@ func AccumulateRewards(
// Take care of my own beacon chain committee, _ is missing, for slashing
members, payable, _, err := ballotResultBeaconchain(beaconChain, header)
if err != nil {
return err
return nil, err
}
votingPower, err := votepower.Compute(members)
if err != nil {
return err
return nil, err
}
for beaconMember := range payable {
@ -230,7 +232,7 @@ func AccumulateRewards(
if !voter.IsHarmonyNode {
snapshot, err := bc.ReadValidatorSnapshot(voter.EarningAccount)
if err != nil {
return err
return nil, err
}
due := defaultReward.Mul(
voter.EffectivePercent.Quo(votepower.StakersShare),
@ -245,7 +247,7 @@ func AccumulateRewards(
crossLinks := types.CrossLinks{}
err := rlp.DecodeBytes(cxLinks, &crossLinks)
if err != nil {
return err
return nil, err
}
type slotPayable struct {
@ -262,7 +264,7 @@ func AccumulateRewards(
shardState, err := bc.ReadShardState(cxLink.Epoch())
if err != nil {
return err
return nil, err
}
subComm := shardState.FindCommitteeByID(cxLink.ShardID())
@ -270,12 +272,12 @@ func AccumulateRewards(
payableSigners, _, err := blockSigners(cxLink.Bitmap(), subComm)
if err != nil {
return err
return nil, err
}
votingPower, err := votepower.Compute(payableSigners)
if err != nil {
return err
return nil, err
}
for j := range payableSigners {
voter := votingPower.Voters[payableSigners[j].BlsPublicKey]
@ -319,7 +321,7 @@ func AccumulateRewards(
for payThem := range resultsHandle[bucket] {
snapshot, err := bc.ReadValidatorSnapshot(resultsHandle[bucket][payThem].payee)
if err != nil {
return err
return nil, err
}
due := resultsHandle[bucket][payThem].payout.TruncateInt()
newRewards = new(big.Int).Add(newRewards, due)
@ -327,9 +329,10 @@ func AccumulateRewards(
}
}
return bc.UpdateBlockRewardAccumulator(newRewards)
return newRewards, nil
// return bc.UpdateBlockRewardAccumulator(newRewards)
}
return nil
return nil, nil
}
//// Before staking
@ -343,13 +346,13 @@ func AccumulateRewards(
if parentHeader.Number().Cmp(common.Big0) == 0 {
// Parent is an epoch block,
// which is not signed in the usual manner therefore rewards nothing.
return nil
return nil, nil
}
_, signers, _, err := ballotResult(bc, header, header.ShardID())
if err != nil {
return err
return nil, err
}
totalAmount := rewarder.Award(
@ -368,7 +371,7 @@ func AccumulateRewards(
Int64("block-reward", BlockReward.Int64()).
Int64("total-amount-paid-out", totalAmount.Int64()).
Msg("Total paid out was not equal to block-reward")
return errors.Wrapf(
return nil, errors.Wrapf(
errPayoutNotEqualBlockReward, "payout "+totalAmount.String(),
)
}
@ -382,5 +385,5 @@ func AccumulateRewards(
Str("TotalAmount", totalAmount.String()).
Msg("[Block Reward] Successfully paid out block reward")
return nil
return totalAmount, nil
}

@ -360,7 +360,10 @@ func (w *Worker) SuperCommitteeForNextEpoch(
}
// FinalizeNewBlock generate a new block for the next consensus round.
func (w *Worker) FinalizeNewBlock(sig []byte, signers []byte, viewID uint64, coinbase common.Address, crossLinks types.CrossLinks, shardState *shard.State) (*types.Block, error) {
func (w *Worker) FinalizeNewBlock(
sig []byte, signers []byte, viewID uint64, coinbase common.Address,
crossLinks types.CrossLinks, shardState *shard.State,
) (*types.Block, error) {
if len(sig) > 0 && len(signers) > 0 {
sig2 := w.current.header.LastCommitSignature()
copy(sig2[:], sig[:])
@ -412,7 +415,10 @@ func (w *Worker) FinalizeNewBlock(sig []byte, signers []byte, viewID uint64, coi
copyHeader := types.CopyHeader(w.current.header)
// TODO: feed coinbase into here so the proposer gets extra rewards.
block, err := w.engine.Finalize(w.chain, copyHeader, state, w.current.txs, w.current.receipts, w.current.outcxs, w.current.incxs, w.current.stakingTxs)
block, _, err := w.engine.Finalize(
w.chain, copyHeader, state, w.current.txs, w.current.receipts,
w.current.outcxs, w.current.incxs, w.current.stakingTxs,
)
if err != nil {
return nil, ctxerror.New("cannot finalize block").WithCause(err)
}

Loading…
Cancel
Save