From b787f0dcd54997abd2532902e26a13d2cbc30165 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Wed, 14 Apr 2021 00:37:07 -0700 Subject: [PATCH] [txpool] more fixes --- core/tx_list.go | 96 +++++++++++++++++++++++++----------- core/tx_pool.go | 22 +++++---- core/tx_pool_test.go | 2 +- staking/types/transaction.go | 13 ++--- 4 files changed, 83 insertions(+), 50 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index eb1685b1d..be4e78af0 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -18,10 +18,13 @@ package core import ( "container/heap" + "errors" "math" "math/big" "sort" + staking "github.com/harmony-one/harmony/staking/types" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" @@ -245,6 +248,8 @@ func (m *txSortedMap) Flatten() types.PoolTransactions { return txs } +const stakingTxCheckThreshold = 10 // check staking transaction validation every 10 blocks + // txList is a "list" of transactions belonging to an account, sorted by account // nonce. The same type can be used both for storing contiguous transactions for // the executable/pending queue; and for storing gapped transactions for the non- @@ -253,8 +258,9 @@ type txList struct { strict bool // Whether nonces are strictly continuous or not txs *txSortedMap // Heap indexed sorted hash map of the transactions - costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) - gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) + lastStkCheck uint64 // Check all staking transaction validation every 10 blocks + costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) + gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) } // newTxList create a new transaction list for maintaining nonce-indexable fast, @@ -312,11 +318,39 @@ func (l *txList) Forward(threshold uint64) types.PoolTransactions { return l.txs.Forward(threshold) } -// FilterPrice returns all regular transactions from the list with a cost or gas limit higher -// than the provided thresholds and all staking transactions that can not be validated. -func (l *txList) FilterPrice( - txPool *TxPool, address common.Address, -) (types.PoolTransactions, types.PoolTransactions) { +// Filter out all invalid transactions from the txList. Return removed transactions, errors +// why the removed transactions are failed, and trailing transactions that are enqueued due +// to high nonce because removing the invalid transaction. +// Invalid transaction check as follows: +// 1. Not enough balance transactions. +// 2. Checked every 10 blocks for staking transaction validation. +func (l *txList) FilterValid(txPool *TxPool, address common.Address, bn uint64) (types.PoolTransactions, []error, types.PoolTransactions) { + var invalids types.PoolTransactions + + removed, errs := l.filterPrice(txPool, address) + if bn > l.lastStkCheck+stakingTxCheckThreshold { + l.lastStkCheck = bn + + stkRemoved, errRemoved := l.filterStaking(txPool) + removed = append(removed, stkRemoved...) + errs = append(errs, errRemoved...) + } + + if l.strict && len(removed) > 0 { + lowest := uint64(math.MaxUint64) + for _, tx := range removed { + if nonce := tx.Nonce(); lowest > nonce { + lowest = nonce + } + } + invalids = l.txs.Filter(func(tx types.PoolTransaction) bool { return tx.Nonce() > lowest }) + } + return removed, errs, invalids +} + +// filterPrice returns all regular transactions from the list with a cost or gas limit higher +// than the provided thresholds. +func (l *txList) filterPrice(txPool *TxPool, address common.Address) (types.PoolTransactions, []error) { costLimit := txPool.currentState.GetBalance(address) gasLimit := txPool.currentMaxGas // If all transactions are below the threshold, short circuit @@ -326,36 +360,38 @@ func (l *txList) FilterPrice( l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds l.gascap = gasLimit - return l.Filter(func(tx types.PoolTransaction) bool { + return l.filterWithError(func(tx types.PoolTransaction) error { cost, err := tx.Cost() if err != nil { - return true // failure should lead to removal of the tx + return err // failure should lead to removal of the tx } - return cost.Cmp(costLimit) == 1 || tx.GasLimit() > gasLimit + if cost.Cmp(costLimit) > 0 { + return errors.New("not enough balance") + } + if tx.GasLimit() > gasLimit { + return errors.New("exceed max transaction gas limit") + } + return nil }) } -// Filter iterates over the list of transactions and removes all of them for which -// the specified function evaluates to true. Moreover, it returns all transactions -// that were invalidated from the filter -func (l *txList) Filter( - filter func(types.PoolTransaction) bool, -) (types.PoolTransactions, types.PoolTransactions) { - // If the list was strict, filter anything above the lowest nonce - var invalids types.PoolTransactions - - // Filter out all the transactions above the account's funds - removed := l.txs.Filter(filter) - if l.strict && len(removed) > 0 { - lowest := uint64(math.MaxUint64) - for _, tx := range removed { - if nonce := tx.Nonce(); lowest > nonce { - lowest = nonce - } +// filterStaking validates and return invalid staking transaction with error, and trending +// invalid transactions for high nonce. +func (l *txList) filterStaking(txPool *TxPool) (types.PoolTransactions, []error) { + return l.filterWithError(func(tx types.PoolTransaction) error { + stkTxn, ok := tx.(*staking.StakingTransaction) + if !ok { + return nil } - invalids = l.txs.Filter(func(tx types.PoolTransaction) bool { return tx.Nonce() > lowest }) - } - return removed, invalids + return txPool.validateStakingTx(stkTxn) + }) +} + +// Filter iterates over the list of transactions and removes all of them for which +// the specified function invalidates the tx. Moreover, it returns all transactions +// that were invalidated from the filter. +func (l *txList) filterWithError(filter func(types.PoolTransaction) error) (types.PoolTransactions, []error) { + return l.txs.FilterWithError(filter) } // Cap places a hard limit on the number of items, returning all transactions diff --git a/core/tx_pool.go b/core/tx_pool.go index 3bc79b668..44ec81952 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -511,7 +511,7 @@ func (pool *TxPool) reset(oldHead, newHead *block.Header) { // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.demoteUnexecutables() + pool.demoteUnexecutables(newHead.Number().Uint64()) // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { @@ -1315,14 +1315,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. } // Drop all transactions that are too costly (low balance or out of gas) - drops, _ := list.FilterPrice(pool, addr) - for _, tx := range drops { + drops, errs, _ := list.FilterValid(pool, addr, 0) + for i, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) pool.priced.Removed() queuedNofundsCounter.Inc(1) - pool.txErrorSink.Add(tx, fmt.Errorf("removed unpayable queued transaction")) - logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction") + pool.txErrorSink.Add(tx, errs[i]) + logger.Warn().Str("hash", hash.Hex()).Err(errs[i]). + Msg("Removed unpayable queued transaction") } // Gather all executable transactions and promote them for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { @@ -1472,7 +1473,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // demoteUnexecutables removes invalid and processed transactions from the pools // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. -func (pool *TxPool) demoteUnexecutables() { +func (pool *TxPool) demoteUnexecutables(bn uint64) { // Iterate over all accounts and demote any non-executable transactions logger := utils.Logger().With().Stack().Logger() @@ -1488,14 +1489,15 @@ func (pool *TxPool) demoteUnexecutables() { // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. } // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later - drops, invalids := list.FilterPrice(pool, addr) - for _, tx := range drops { + drops, errs, invalids := list.FilterValid(pool, addr, bn) + for i, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) pool.priced.Removed() pendingNofundsCounter.Inc(1) - pool.txErrorSink.Add(tx, fmt.Errorf("removed unexecutable pending transaction")) - logger.Warn().Str("hash", hash.Hex()).Msg("Removed unexecutable pending transaction") + pool.txErrorSink.Add(tx, errs[i]) + logger.Warn().Str("hash", hash.Hex()).Err(errs[i]). + Msg("Removed unexecutable pending transaction") } for _, tx := range invalids { hash := tx.Hash() diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 83186410b..aa8b2223d 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -1532,7 +1532,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables() + pool.demoteUnexecutables(0) } } diff --git a/staking/types/transaction.go b/staking/types/transaction.go index a878da4c5..77671838f 100644 --- a/staking/types/transaction.go +++ b/staking/types/transaction.go @@ -168,15 +168,10 @@ func (tx *StakingTransaction) Cost() (*big.Int, error) { } total.Add(total, stkMsg.Amount) case DirectiveDelegate: - msg, err := RLPDecodeStakeMsg(tx.Data(), DirectiveDelegate) - if err != nil { - return nil, err - } - stkMsg, ok := msg.(*Delegate) - if !ok { - return nil, errStakingTransactionTypeCastErr - } - total.Add(total, stkMsg.Amount) + // Temporary hack: Cost function is not accurate for delegate transaction. + // Thus the cost validation is done in `txPool.validateTx`. + // TODO: refactor this hack. + default: } return total, nil }