package worker import ( "fmt" "math/big" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/block" blockfactory "github.com/harmony-one/harmony/block/factory" consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard/committee" staking "github.com/harmony-one/harmony/staking/types" ) // environment is the worker's current environment and holds all of the current state information. type environment struct { state *state.DB // apply state changes here gasPool *core.GasPool // available gas used to pack transactions header *block.Header txs []*types.Transaction stkingTxs staking.StakingTransactions receipts []*types.Receipt outcxs []*types.CXReceipt // cross shard transaction receipts (source shard) incxs []*types.CXReceiptsProof // cross shard receipts and its proof (desitinatin shard) } // Worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type Worker struct { config *params.ChainConfig factory blockfactory.Factory chain *core.BlockChain current *environment // An environment for current running cycle. engine consensus_engine.Engine gasFloor uint64 gasCeil uint64 } // Returns a tuple where the first value is the txs sender account address, // the second is the throttling result enum for the transaction of interest. // Throttling happens based on the amount, frequency, etc. func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) { var sender common.Address msg, err := tx.AsMessage(types.MakeSigner(w.config, w.chain.CurrentBlock().Epoch())) if err != nil { utils.Logger().Error().Err(err).Str("txId", tx.Hash().Hex()).Msg("Error when parsing tx into message") } else { sender = msg.From() } // do not throttle transactions if disabled if !txsThrottleConfig.EnableTxnThrottling { return sender, shardingconfig.TxSelect } // already selected max num txs if len(selected) > txsThrottleConfig.MaxNumTxsPerBlockLimit { utils.Logger().Info().Str("txId", tx.Hash().Hex()).Int("MaxNumTxsPerBlockLimit", txsThrottleConfig.MaxNumTxsPerBlockLimit).Msg("Throttling tx with max num txs per block limit") return sender, shardingconfig.TxUnselect } // throttle a single sender sending too many transactions in one block if tx.Value().Cmp(txsThrottleConfig.MaxTxAmountLimit) > 0 { utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("MaxTxAmountLimit", txsThrottleConfig.MaxTxAmountLimit.Uint64()).Uint64("txAmount", tx.Value().Uint64()).Msg("Throttling tx with max amount limit") return sender, shardingconfig.TxInvalid } // throttle too large transaction var numTxsPastHour uint64 for _, blockTxsCounts := range recentTxsStats { numTxsPastHour += blockTxsCounts[sender] } if numTxsPastHour >= txsThrottleConfig.MaxNumRecentTxsPerAccountLimit { utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("MaxNumRecentTxsPerAccountLimit", txsThrottleConfig.MaxNumRecentTxsPerAccountLimit).Msg("Throttling tx with max txs per account in a single block limit") return sender, shardingconfig.TxInvalid } return sender, shardingconfig.TxSelect } // SelectTransactionsForNewBlock selects transactions for new block. func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) { if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) } selected := types.Transactions{} unselected := types.Transactions{} invalid := types.Transactions{} for _, tx := range txs { if tx.ShardID() != w.chain.ShardID() { invalid = append(invalid, tx) continue } // If we don't have enough gas for any further transactions then we're done if w.current.gasPool.Gas() < params.TxGas { utils.Logger().Info().Str("Not enough gas for further transactions, have", w.current.gasPool.String()).Uint64("want", params.TxGas) unselected = append(unselected, tx) continue } sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx) switch flag { case shardingconfig.TxUnselect: unselected = append(unselected, tx) case shardingconfig.TxInvalid: invalid = append(invalid, tx) case shardingconfig.TxSelect: if tx.GasPrice().Uint64() == 0 { invalid = append(invalid, tx) } else { snap := w.current.state.Snapshot() _, err := w.commitTransaction(tx, coinbase) if err != nil { w.current.state.RevertToSnapshot(snap) invalid = append(invalid, tx) utils.Logger().Error().Err(err).Str("txId", tx.Hash().Hex()).Msg("Commit transaction error") } else { selected = append(selected, tx) // handle the case when msg was not able to extracted from tx if len(sender.String()) > 0 { recentTxsStats[newBlockNum][sender]++ } } } } // log invalid or unselected txs if flag == shardingconfig.TxUnselect || flag == shardingconfig.TxInvalid { utils.Logger().Info().Str("txId", tx.Hash().Hex()).Str("txThrottleFlag", flag.String()).Msg("Transaction Throttle flag") } utils.Logger().Info().Str("txId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("Transaction gas limit info") } utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Int("newTxns", len(selected)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info") return selected, unselected, invalid } // SelectStakingTransactionsForNewBlock selects staking transactions for new block. func (w *Worker) SelectStakingTransactionsForNewBlock( newBlockNum uint64, txs staking.StakingTransactions, coinbase common.Address) (staking.StakingTransactions, staking.StakingTransactions, staking.StakingTransactions) { // only beaconchain process staking transaction if w.chain.ShardID() != shard.BeaconChainShardID { utils.Logger().Warn().Msgf("Invalid shardID: %v", w.chain.ShardID()) return nil, nil, nil } if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) } selected := staking.StakingTransactions{} // TODO: chao add total gas fee checking when needed unselected := staking.StakingTransactions{} invalid := staking.StakingTransactions{} for _, tx := range txs { snap := w.current.state.Snapshot() _, err := w.commitStakingTransaction(tx, coinbase) if err != nil { w.current.state.RevertToSnapshot(snap) invalid = append(invalid, tx) utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Commit staking transaction error") } else { selected = append(selected, tx) utils.Logger().Info().Str("stakingTxId", tx.Hash().Hex()).Uint64("txGasLimit", tx.Gas()).Msg("StakingTransaction gas limit info") } } utils.Logger().Info().Uint64("newBlockNum", newBlockNum).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("[SelectStakingTransaction] Block gas limit and usage info") return selected, unselected, invalid } func (w *Worker) commitStakingTransaction(tx *staking.StakingTransaction, coinbase common.Address) ([]*types.Log, error) { snap := w.current.state.Snapshot() gasUsed := w.current.header.GasUsed() receipt, _, err := core.ApplyStakingTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &gasUsed, vm.Config{}) w.current.header.SetGasUsed(gasUsed) if err != nil { w.current.state.RevertToSnapshot(snap) return nil, err } if receipt == nil { return nil, fmt.Errorf("nil staking receipt") } w.current.stkingTxs = append(w.current.stkingTxs, tx) w.current.receipts = append(w.current.receipts, receipt) return receipt.Logs, nil } func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { snap := w.current.state.Snapshot() gasUsed := w.current.header.GasUsed() receipt, cx, _, err := core.ApplyTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &gasUsed, vm.Config{}) w.current.header.SetGasUsed(gasUsed) if err != nil { w.current.state.RevertToSnapshot(snap) utils.Logger().Error().Err(err).Str("stakingTxId", tx.Hash().Hex()).Msg("Offchain ValidatorMap Read/Write Error") return nil, err } if receipt == nil { utils.Logger().Warn().Interface("tx", tx).Interface("cx", cx).Msg("Receipt is Nil!") return nil, fmt.Errorf("Receipt is Nil") } w.current.txs = append(w.current.txs, tx) w.current.receipts = append(w.current.receipts, receipt) if cx != nil { w.current.outcxs = append(w.current.outcxs, cx) } return receipt.Logs, nil } // CommitTransactions commits transactions. func (w *Worker) CommitTransactions(txs types.Transactions, stakingTxns staking.StakingTransactions, coinbase common.Address) error { // Must update to the correct current state before processing potential txns if err := w.UpdateCurrent(coinbase); err != nil { utils.Logger().Error(). Err(err). Msg("Failed updating worker's state before committing txns") return err } if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) } for _, tx := range txs { snap := w.current.state.Snapshot() _, err := w.commitTransaction(tx, coinbase) if err != nil { w.current.state.RevertToSnapshot(snap) return err } } for _, tx := range stakingTxns { snap := w.current.state.Snapshot() _, err := w.commitStakingTransaction(tx, coinbase) if err != nil { w.current.state.RevertToSnapshot(snap) return err } } return nil } // CommitReceipts commits a list of already verified incoming cross shard receipts func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error { if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit()) } if len(receiptsList) == 0 { w.current.header.SetIncomingReceiptHash(types.EmptyRootHash) } else { w.current.header.SetIncomingReceiptHash(types.DeriveSha(types.CXReceiptsProofs(receiptsList))) } for _, cx := range receiptsList { err := core.ApplyIncomingReceipt(w.config, w.current.state, w.current.header, cx) if err != nil { return ctxerror.New("cannot apply receiptsList").WithCause(err) } } for _, cx := range receiptsList { w.current.incxs = append(w.current.incxs, cx) } return nil } // UpdateCurrent updates the current environment with the current state and header. func (w *Worker) UpdateCurrent(coinbase common.Address) error { parent := w.chain.CurrentBlock() num := parent.Number() timestamp := time.Now().Unix() epoch := w.GetNewEpoch() header := w.factory.NewHeader(epoch).With(). ParentHash(parent.Hash()). Number(num.Add(num, common.Big1)). GasLimit(core.CalcGasLimit(parent, w.gasFloor, w.gasCeil)). Time(big.NewInt(timestamp)). ShardID(w.chain.ShardID()). Coinbase(coinbase). Header() return w.makeCurrent(parent, header) } // makeCurrent creates a new environment for the current cycle. func (w *Worker) makeCurrent(parent *types.Block, header *block.Header) error { state, err := w.chain.StateAt(parent.Root()) if err != nil { return err } env := &environment{ state: state, header: header, } w.current = env return nil } // GetCurrentState gets the current state. func (w *Worker) GetCurrentState() *state.DB { return w.current.state } // GetNewEpoch gets the current epoch. func (w *Worker) GetNewEpoch() *big.Int { parent := w.chain.CurrentBlock() epoch := new(big.Int).Set(parent.Header().Epoch()) // TODO: Don't depend on sharding state for epoch change. if len(parent.Header().ShardState()) > 0 && parent.NumberU64() != 0 { // ... except if parent has a resharding assignment it increases by 1. epoch = epoch.Add(epoch, common.Big1) } return epoch } // GetCurrentReceipts get the receipts generated starting from the last state. func (w *Worker) GetCurrentReceipts() []*types.Receipt { return w.current.receipts } // OutgoingReceipts get the receipts generated starting from the last state. func (w *Worker) OutgoingReceipts() []*types.CXReceipt { return w.current.outcxs } // IncomingReceipts get incoming receipts in destination shard that is received from source shard func (w *Worker) IncomingReceipts() []*types.CXReceiptsProof { return w.current.incxs } // ProposeShardStateWithoutBeaconSync proposes the next shard state for next epoch. func (w *Worker) ProposeShardStateWithoutBeaconSync() shard.State { if !shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) { return nil } shardState, _ := committee.WithStakingEnabled.ReadFromComputation( new(big.Int).Add(w.current.header.Epoch(), common.Big1), *w.config, nil, ) return shardState } // FinalizeNewBlock generate a new block for the next consensus round. func (w *Worker) FinalizeNewBlock(sig []byte, signers []byte, viewID uint64, coinbase common.Address, crossLinks types.CrossLinks, shardState shard.State) (*types.Block, error) { if len(sig) > 0 && len(signers) > 0 { sig2 := w.current.header.LastCommitSignature() copy(sig2[:], sig[:]) w.current.header.SetLastCommitSignature(sig2) w.current.header.SetLastCommitBitmap(signers) } w.current.header.SetCoinbase(coinbase) w.current.header.SetViewID(new(big.Int).SetUint64(viewID)) // Cross Links if crossLinks != nil && len(crossLinks) != 0 { crossLinkData, err := rlp.EncodeToBytes(crossLinks) if err == nil { utils.Logger().Debug(). Uint64("blockNum", w.current.header.Number().Uint64()). Int("numCrossLinks", len(crossLinks)). Msg("Successfully proposed cross links into new block") w.current.header.SetCrossLinks(crossLinkData) } else { utils.Logger().Debug().Err(err).Msg("Failed to encode proposed cross links") return nil, err } } // Shard State if shardState != nil && len(shardState) != 0 { w.current.header.SetShardStateHash(shardState.Hash()) shardStateData, err := rlp.EncodeToBytes(shardState) if err == nil { w.current.header.SetShardState(shardStateData) } else { utils.Logger().Debug().Err(err).Msg("Failed to encode proposed shard state") return nil, err } } s := w.current.state.Copy() copyHeader := types.CopyHeader(w.current.header) block, err := w.engine.Finalize(w.chain, copyHeader, s, w.current.txs, w.current.receipts, w.current.outcxs, w.current.incxs, w.current.stkingTxs) if err != nil { return nil, ctxerror.New("cannot finalize block").WithCause(err) } return block, nil } // New create a new worker object. func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus_engine.Engine) *Worker { worker := &Worker{ config: config, factory: blockfactory.NewFactory(config), chain: chain, engine: engine, } worker.gasFloor = 80000000 worker.gasCeil = 120000000 parent := worker.chain.CurrentBlock() num := parent.Number() timestamp := time.Now().Unix() epoch := worker.GetNewEpoch() header := worker.factory.NewHeader(epoch).With(). ParentHash(parent.Hash()). Number(num.Add(num, common.Big1)). GasLimit(core.CalcGasLimit(parent, worker.gasFloor, worker.gasCeil)). Time(big.NewInt(timestamp)). ShardID(worker.chain.ShardID()). Header() worker.makeCurrent(parent, header) return worker }