The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
woop/node/worker/worker.go

605 lines
19 KiB

package worker
import (
"bytes"
"fmt"
"math/big"
"sort"
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
blockfactory "github.com/harmony-one/harmony/block/factory"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/staking/slash"
staking "github.com/harmony-one/harmony/staking/types"
"github.com/pkg/errors"
)
// environment is the worker's current environment and holds all of the current state information.
type environment struct {
signer types.Signer
state *state.DB // apply state changes here
gasPool *core.GasPool // available gas used to pack transactions
header *block.Header
ethTxs []*types.EthTransaction
txs []*types.Transaction
stakingTxs []*staking.StakingTransaction
receipts []*types.Receipt
outcxs []*types.CXReceipt // cross shard transaction receipts (source shard)
incxs []*types.CXReceiptsProof // cross shard receipts and its proof (desitinatin shard)
slashes slash.Records
}
// Worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type Worker struct {
config *params.ChainConfig
factory blockfactory.Factory
chain *core.BlockChain
current *environment // An environment for current running cycle.
engine consensus_engine.Engine
gasFloor uint64
gasCeil uint64
}
// CommitSortedTransactions commits transactions for new block.
func (w *Worker) CommitSortedTransactions(
txs *types.TransactionsByPriceAndNonce,
coinbase common.Address,
) {
for {
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
utils.Logger().Info().Uint64("have", w.current.gasPool.Gas()).Uint64("want", params.TxGas).Msg("Not enough gas for further transactions")
break
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(w.current.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.config.IsEIP155(w.current.header.Epoch()) {
utils.Logger().Info().Str("hash", tx.Hash().Hex()).Str("eip155Epoch", w.config.EIP155Epoch.String()).Msg("Ignoring reply protected transaction")
txs.Pop()
continue
}
if tx.ShardID() != w.chain.ShardID() {
txs.Shift()
continue
}
var err error
// Start executing the transaction
switch tx.(type) {
case *types.EthTransaction:
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.ethTxs))
_, err = w.commitEthTransaction(tx.(*types.EthTransaction), coinbase)
case *types.Transaction:
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.ethTxs)+len(w.current.txs))
_, err = w.commitHmyTransaction(tx.(*types.Transaction), coinbase)
default:
txs.Shift()
continue
}
sender, _ := common2.AddressToBech32(from)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
utils.Logger().Info().Str("sender", sender).Msg("Gas limit exceeded for current block")
txs.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
utils.Logger().Info().Str("sender", sender).Uint64("nonce", tx.Nonce()).Msg("Skipping transaction with low nonce")
txs.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
utils.Logger().Info().Str("sender", sender).Uint64("nonce", tx.Nonce()).Msg("Skipping account with high nonce")
txs.Pop()
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
txs.Shift()
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
utils.Logger().Info().Str("hash", tx.Hash().Hex()).AnErr("err", err).Msg("Transaction failed, account skipped")
txs.Shift()
}
}
}
// CommitTransactions commits transactions for new block.
func (w *Worker) CommitTransactions(
pendingEth map[common.Address]types.InternalTransactions,
pendingNormal map[common.Address]types.InternalTransactions,
pendingStaking staking.StakingTransactions, coinbase common.Address,
) error {
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
// ETHEREUM COMPATIBLE TXNS
ethTxns := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingEth)
w.CommitSortedTransactions(ethTxns, coinbase)
// HARMONY TXNS
normalTxns := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingNormal)
w.CommitSortedTransactions(normalTxns, coinbase)
// STAKING - only beaconchain process staking transaction
if w.chain.ShardID() == shard.BeaconChainShardID {
for _, tx := range pendingStaking {
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
utils.Logger().Info().Uint64("have", w.current.gasPool.Gas()).Uint64("want", params.TxGas).Msg("Not enough gas for further transactions")
break
}
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.config.IsEIP155(w.current.header.Epoch()) {
utils.Logger().Info().Str("hash", tx.Hash().Hex()).Str("eip155Epoch", w.config.EIP155Epoch.String()).Msg("Ignoring reply protected transaction")
continue
}
// Start executing the transaction
w.current.state.Prepare(tx.Hash(), common.Hash{}, len(w.current.txs)+len(w.current.stakingTxs))
// THESE CODE ARE DUPLICATED AS ABOVE>>
if _, err := w.commitStakingTransaction(tx, coinbase); err != nil {
txID := tx.Hash().Hex()
utils.Logger().Error().Err(err).
Str("stakingTxID", txID).
Interface("stakingTx", tx).
Msg("Failed committing staking transaction")
} else {
utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).
Uint64("txGasLimit", tx.GasLimit()).
Msg("Successfully committed staking transaction")
}
}
}
utils.Logger().Info().
Int("newTxns", len(w.current.txs)).
Int("newStakingTxns", len(w.current.stakingTxs)).
Uint64("blockGasLimit", w.current.header.GasLimit()).
Uint64("blockGasUsed", w.current.header.GasUsed()).
Msg("Block gas limit and usage info")
return nil
}
func (w *Worker) commitStakingTransaction(
tx *staking.StakingTransaction, coinbase common.Address,
) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
gasUsed := w.current.header.GasUsed()
receipt, _, err := core.ApplyStakingTransaction(
w.config, w.chain, &coinbase, w.current.gasPool,
w.current.state, w.current.header, tx, &gasUsed, vm.Config{},
)
w.current.header.SetGasUsed(gasUsed)
if err != nil {
w.current.state.RevertToSnapshot(snap)
utils.Logger().Error().
Err(err).Interface("stkTxn", tx).
Msg("Staking transaction failed commitment")
return nil, err
}
if receipt == nil {
return nil, fmt.Errorf("nil staking receipt")
}
w.current.stakingTxs = append(w.current.stakingTxs, tx)
w.current.receipts = append(w.current.receipts, receipt)
return receipt.Logs, nil
}
var (
errNilReceipt = errors.New("nil receipt")
)
func (w *Worker) commitEthTransaction(
tx *types.EthTransaction, coinbase common.Address,
) ([]*types.Log, error) {
receipt, cx, err := w.commitTransaction(tx, coinbase)
if err == nil {
w.current.ethTxs = append(w.current.ethTxs, tx)
w.current.receipts = append(w.current.receipts, receipt)
if cx != nil {
w.current.outcxs = append(w.current.outcxs, cx)
}
} else {
return nil, err
}
return receipt.Logs, nil
}
func (w *Worker) commitHmyTransaction(
tx *types.Transaction, coinbase common.Address,
) ([]*types.Log, error) {
receipt, cx, err := w.commitTransaction(tx, coinbase)
if err == nil {
w.current.txs = append(w.current.txs, tx)
w.current.receipts = append(w.current.receipts, receipt)
if cx != nil {
w.current.outcxs = append(w.current.outcxs, cx)
}
} else {
return nil, err
}
return receipt.Logs, nil
}
func (w *Worker) commitTransaction(
tx types.InternalTransaction, coinbase common.Address,
) (*types.Receipt, *types.CXReceipt, error) {
snap := w.current.state.Snapshot()
gasUsed := w.current.header.GasUsed()
receipt, cx, _, err := core.ApplyTransaction(
w.config,
w.chain,
&coinbase,
w.current.gasPool,
w.current.state,
w.current.header,
tx,
&gasUsed,
vm.Config{},
)
w.current.header.SetGasUsed(gasUsed)
if err != nil {
w.current.state.RevertToSnapshot(snap)
utils.Logger().Error().
Err(err).Interface("txn", tx).
Msg("Transaction failed commitment")
return nil, nil, errNilReceipt
}
if receipt == nil {
utils.Logger().Warn().Interface("tx", tx).Interface("cx", cx).Msg("Receipt is Nil!")
return nil, nil, errNilReceipt
}
return receipt, cx, nil
}
// CommitReceipts commits a list of already verified incoming cross shard receipts
func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error {
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit())
}
if len(receiptsList) == 0 {
w.current.header.SetIncomingReceiptHash(types.EmptyRootHash)
} else {
w.current.header.SetIncomingReceiptHash(
types.DeriveSha(types.CXReceiptsProofs(receiptsList)),
)
}
for _, cx := range receiptsList {
if err := core.ApplyIncomingReceipt(
w.config, w.current.state, w.current.header, cx,
); err != nil {
return errors.Wrapf(err, "Failed applying cross-shard receipts")
}
}
w.current.incxs = append(w.current.incxs, receiptsList...)
return nil
}
// UpdateCurrent updates the current environment with the current state and header.
func (w *Worker) UpdateCurrent() error {
parent := w.chain.CurrentBlock()
num := parent.Number()
timestamp := time.Now().Unix()
epoch := w.GetNewEpoch()
header := w.factory.NewHeader(epoch).With().
ParentHash(parent.Hash()).
Number(num.Add(num, common.Big1)).
GasLimit(core.CalcGasLimit(parent, w.gasFloor, w.gasCeil)).
Time(big.NewInt(timestamp)).
ShardID(w.chain.ShardID()).
Header()
return w.makeCurrent(parent, header)
}
// GetCurrentHeader returns the current header to propose
func (w *Worker) GetCurrentHeader() *block.Header {
return w.current.header
}
// makeCurrent creates a new environment for the current cycle.
func (w *Worker) makeCurrent(parent *types.Block, header *block.Header) error {
state, err := w.chain.StateAt(parent.Root())
if err != nil {
return err
}
env := &environment{
signer: types.NewEIP155Signer(w.config.ChainID),
state: state,
header: header,
}
w.current = env
return nil
}
// GetCurrentState gets the current state.
func (w *Worker) GetCurrentState() *state.DB {
return w.current.state
}
// GetNewEpoch gets the current epoch.
func (w *Worker) GetNewEpoch() *big.Int {
parent := w.chain.CurrentBlock()
epoch := new(big.Int).Set(parent.Header().Epoch())
shardState, err := parent.Header().GetShardState()
if err == nil &&
shardState.Epoch != nil &&
w.config.IsStaking(shardState.Epoch) {
// For shard state of staking epochs, the shard state will
// have an epoch and it will decide the next epoch for following blocks
epoch = new(big.Int).Set(shardState.Epoch)
} else {
if parent.IsLastBlockInEpoch() && parent.NumberU64() != 0 {
// if parent has proposed a new shard state it increases by 1, except for genesis block.
epoch = epoch.Add(epoch, common.Big1)
}
}
return epoch
}
// GetCurrentReceipts get the receipts generated starting from the last state.
func (w *Worker) GetCurrentReceipts() []*types.Receipt {
return w.current.receipts
}
// OutgoingReceipts get the receipts generated starting from the last state.
func (w *Worker) OutgoingReceipts() []*types.CXReceipt {
return w.current.outcxs
}
// IncomingReceipts get incoming receipts in destination shard that is received from source shard
func (w *Worker) IncomingReceipts() []*types.CXReceiptsProof {
return w.current.incxs
}
// CollectVerifiedSlashes sets w.current.slashes only to those that
// past verification
func (w *Worker) CollectVerifiedSlashes() error {
pending, failures :=
w.chain.ReadPendingSlashingCandidates(), slash.Records{}
if d := pending; len(d) > 0 {
pending, failures = w.verifySlashes(d)
}
if f := failures; len(f) > 0 {
if err := w.chain.DeleteFromPendingSlashingCandidates(f); err != nil {
return err
}
}
w.current.slashes = pending
return nil
}
// returns (successes, failures, error)
func (w *Worker) verifySlashes(
d slash.Records,
) (slash.Records, slash.Records) {
successes, failures := slash.Records{}, slash.Records{}
// Enforce order, reproducibility
sort.SliceStable(d,
func(i, j int) bool {
return bytes.Compare(
d[i].Reporter.Bytes(), d[j].Reporter.Bytes(),
) == -1
},
)
// Always base the state on current tip of the chain
workingState, err := w.chain.State()
if err != nil {
return successes, failures
}
seenEvidences := map[common.Hash]struct{}{}
for i := range d {
evidenceHash := hash.FromRLPNew256(d[i].Evidence)
if existing, ok := seenEvidences[evidenceHash]; ok {
utils.Logger().Warn().
Interface("slashRecord1", existing).
Interface("slashRecord2", d[i]).
Msg("Duplicate slash records with different reporters")
failures = append(failures, d[i])
} else {
seenEvidences[evidenceHash] = struct{}{}
// In addition, need to count the same evidence with first and second vote swapped as seen
swapVote := d[i].Evidence
tmp := swapVote.ConflictingVotes.FirstVote
swapVote.ConflictingVotes.FirstVote = swapVote.ConflictingVotes.SecondVote
swapVote.ConflictingVotes.SecondVote = tmp
swapHash := hash.FromRLPNew256(swapVote)
seenEvidences[swapHash] = struct{}{}
}
if err := slash.Verify(
w.chain, workingState, &d[i],
); err != nil {
utils.Logger().Warn().Err(err).
Interface("slashRecord", d[i]).
Msg("Slash failed verification")
failures = append(failures, d[i])
continue
}
successes = append(successes, d[i])
}
if f := len(failures); f > 0 {
utils.Logger().Debug().
Int("count", f).
Msg("invalid slash records passed over in block proposal")
}
return successes, failures
}
// FinalizeNewBlock generate a new block for the next consensus round.
func (w *Worker) FinalizeNewBlock(
commitSigs chan []byte, viewID func() uint64, coinbase common.Address,
crossLinks types.CrossLinks, shardState *shard.State,
) (*types.Block, error) {
w.current.header.SetCoinbase(coinbase)
// Put crosslinks into header
if len(crossLinks) > 0 {
crossLinks.Sort()
crossLinkData, err := rlp.EncodeToBytes(crossLinks)
if err == nil {
utils.Logger().Debug().
Uint64("blockNum", w.current.header.Number().Uint64()).
Int("numCrossLinks", len(crossLinks)).
Msg("Successfully proposed cross links into new block")
w.current.header.SetCrossLinks(crossLinkData)
} else {
utils.Logger().Debug().Err(err).Msg("Failed to encode proposed cross links")
return nil, err
}
} else {
utils.Logger().Debug().Msg("Zero crosslinks to finalize")
}
// Put slashes into header
if w.config.IsStaking(w.current.header.Epoch()) {
doubleSigners := w.current.slashes
if len(doubleSigners) > 0 {
if data, err := rlp.EncodeToBytes(doubleSigners); err == nil {
w.current.header.SetSlashes(data)
utils.Logger().Info().
Msg("encoded slashes into headers of proposed new block")
} else {
utils.Logger().Debug().Err(err).Msg("Failed to encode proposed slashes")
return nil, err
}
}
}
// Put shard state into header
if shardState != nil && len(shardState.Shards) != 0 {
//we store shardstatehash in header only before prestaking epoch (header v0,v1,v2)
if !w.config.IsPreStaking(w.current.header.Epoch()) {
w.current.header.SetShardStateHash(shardState.Hash())
}
isStaking := false
if shardState.Epoch != nil && w.config.IsStaking(shardState.Epoch) {
isStaking = true
}
// NOTE: Besides genesis, this is the only place where the shard state is encoded.
shardStateData, err := shard.EncodeWrapper(*shardState, isStaking)
if err == nil {
w.current.header.SetShardState(shardStateData)
} else {
utils.Logger().Debug().Err(err).Msg("Failed to encode proposed shard state")
return nil, err
}
}
state := w.current.state.Copy()
copyHeader := types.CopyHeader(w.current.header)
sigsReady := make(chan bool)
go func() {
select {
case sigs := <-commitSigs:
sig, signers, err := bls.SeparateSigAndMask(sigs)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to parse commit sigs")
sigsReady <- false
}
// Put sig, signers, viewID, coinbase into header
if len(sig) > 0 && len(signers) > 0 {
sig2 := copyHeader.LastCommitSignature()
copy(sig2[:], sig[:])
utils.Logger().Info().Hex("sigs", sig).Hex("bitmap", signers).Msg("Setting commit sigs")
copyHeader.SetLastCommitSignature(sig2)
copyHeader.SetLastCommitBitmap(signers)
}
sigsReady <- true
case <-time.After(consensus.CommitSigReceiverTimeout):
// Exit goroutine
utils.Logger().Warn().Msg("Timeout waiting for commit sigs")
}
}()
block, _, err := w.engine.Finalize(
w.chain, copyHeader, state, w.current.txs, w.current.receipts,
w.current.outcxs, w.current.incxs, w.current.stakingTxs,
w.current.slashes, sigsReady, viewID,
)
if err != nil {
return nil, errors.Wrapf(err, "cannot finalize block")
}
return block, nil
}
// New create a new worker object.
func New(
config *params.ChainConfig, chain *core.BlockChain, engine consensus_engine.Engine,
) *Worker {
worker := &Worker{
config: config,
factory: blockfactory.NewFactory(config),
chain: chain,
engine: engine,
}
worker.gasFloor = 80000000
worker.gasCeil = 120000000
parent := worker.chain.CurrentBlock()
num := parent.Number()
timestamp := time.Now().Unix()
epoch := worker.GetNewEpoch()
header := worker.factory.NewHeader(epoch).With().
ParentHash(parent.Hash()).
Number(num.Add(num, common.Big1)).
GasLimit(core.CalcGasLimit(parent, worker.gasFloor, worker.gasCeil)).
Time(big.NewInt(timestamp)).
ShardID(worker.chain.ShardID()).
Header()
worker.makeCurrent(parent, header)
return worker
}