[txpool] more fixes

pull/3656/head
Jacky Wang 4 years ago
parent 9b7990888e
commit b787f0dcd5
No known key found for this signature in database
GPG Key ID: 1085CE5F4FF5842C
  1. 96
      core/tx_list.go
  2. 22
      core/tx_pool.go
  3. 2
      core/tx_pool_test.go
  4. 13
      staking/types/transaction.go

@ -18,10 +18,13 @@ package core
import ( import (
"container/heap" "container/heap"
"errors"
"math" "math"
"math/big" "math/big"
"sort" "sort"
staking "github.com/harmony-one/harmony/staking/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
@ -245,6 +248,8 @@ func (m *txSortedMap) Flatten() types.PoolTransactions {
return txs return txs
} }
const stakingTxCheckThreshold = 10 // check staking transaction validation every 10 blocks
// txList is a "list" of transactions belonging to an account, sorted by account // txList is a "list" of transactions belonging to an account, sorted by account
// nonce. The same type can be used both for storing contiguous transactions for // nonce. The same type can be used both for storing contiguous transactions for
// the executable/pending queue; and for storing gapped transactions for the non- // the executable/pending queue; and for storing gapped transactions for the non-
@ -253,8 +258,9 @@ type txList struct {
strict bool // Whether nonces are strictly continuous or not strict bool // Whether nonces are strictly continuous or not
txs *txSortedMap // Heap indexed sorted hash map of the transactions txs *txSortedMap // Heap indexed sorted hash map of the transactions
costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) lastStkCheck uint64 // Check all staking transaction validation every 10 blocks
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
} }
// newTxList create a new transaction list for maintaining nonce-indexable fast, // newTxList create a new transaction list for maintaining nonce-indexable fast,
@ -312,11 +318,39 @@ func (l *txList) Forward(threshold uint64) types.PoolTransactions {
return l.txs.Forward(threshold) return l.txs.Forward(threshold)
} }
// FilterPrice returns all regular transactions from the list with a cost or gas limit higher // Filter out all invalid transactions from the txList. Return removed transactions, errors
// than the provided thresholds and all staking transactions that can not be validated. // why the removed transactions are failed, and trailing transactions that are enqueued due
func (l *txList) FilterPrice( // to high nonce because removing the invalid transaction.
txPool *TxPool, address common.Address, // Invalid transaction check as follows:
) (types.PoolTransactions, types.PoolTransactions) { // 1. Not enough balance transactions.
// 2. Checked every 10 blocks for staking transaction validation.
func (l *txList) FilterValid(txPool *TxPool, address common.Address, bn uint64) (types.PoolTransactions, []error, types.PoolTransactions) {
var invalids types.PoolTransactions
removed, errs := l.filterPrice(txPool, address)
if bn > l.lastStkCheck+stakingTxCheckThreshold {
l.lastStkCheck = bn
stkRemoved, errRemoved := l.filterStaking(txPool)
removed = append(removed, stkRemoved...)
errs = append(errs, errRemoved...)
}
if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
if nonce := tx.Nonce(); lowest > nonce {
lowest = nonce
}
}
invalids = l.txs.Filter(func(tx types.PoolTransaction) bool { return tx.Nonce() > lowest })
}
return removed, errs, invalids
}
// filterPrice returns all regular transactions from the list with a cost or gas limit higher
// than the provided thresholds.
func (l *txList) filterPrice(txPool *TxPool, address common.Address) (types.PoolTransactions, []error) {
costLimit := txPool.currentState.GetBalance(address) costLimit := txPool.currentState.GetBalance(address)
gasLimit := txPool.currentMaxGas gasLimit := txPool.currentMaxGas
// If all transactions are below the threshold, short circuit // If all transactions are below the threshold, short circuit
@ -326,36 +360,38 @@ func (l *txList) FilterPrice(
l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds
l.gascap = gasLimit l.gascap = gasLimit
return l.Filter(func(tx types.PoolTransaction) bool { return l.filterWithError(func(tx types.PoolTransaction) error {
cost, err := tx.Cost() cost, err := tx.Cost()
if err != nil { if err != nil {
return true // failure should lead to removal of the tx return err // failure should lead to removal of the tx
} }
return cost.Cmp(costLimit) == 1 || tx.GasLimit() > gasLimit if cost.Cmp(costLimit) > 0 {
return errors.New("not enough balance")
}
if tx.GasLimit() > gasLimit {
return errors.New("exceed max transaction gas limit")
}
return nil
}) })
} }
// Filter iterates over the list of transactions and removes all of them for which // filterStaking validates and return invalid staking transaction with error, and trending
// the specified function evaluates to true. Moreover, it returns all transactions // invalid transactions for high nonce.
// that were invalidated from the filter func (l *txList) filterStaking(txPool *TxPool) (types.PoolTransactions, []error) {
func (l *txList) Filter( return l.filterWithError(func(tx types.PoolTransaction) error {
filter func(types.PoolTransaction) bool, stkTxn, ok := tx.(*staking.StakingTransaction)
) (types.PoolTransactions, types.PoolTransactions) { if !ok {
// If the list was strict, filter anything above the lowest nonce return nil
var invalids types.PoolTransactions
// Filter out all the transactions above the account's funds
removed := l.txs.Filter(filter)
if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
if nonce := tx.Nonce(); lowest > nonce {
lowest = nonce
}
} }
invalids = l.txs.Filter(func(tx types.PoolTransaction) bool { return tx.Nonce() > lowest }) return txPool.validateStakingTx(stkTxn)
} })
return removed, invalids }
// Filter iterates over the list of transactions and removes all of them for which
// the specified function invalidates the tx. Moreover, it returns all transactions
// that were invalidated from the filter.
func (l *txList) filterWithError(filter func(types.PoolTransaction) error) (types.PoolTransactions, []error) {
return l.txs.FilterWithError(filter)
} }
// Cap places a hard limit on the number of items, returning all transactions // Cap places a hard limit on the number of items, returning all transactions

@ -511,7 +511,7 @@ func (pool *TxPool) reset(oldHead, newHead *block.Header) {
// any transactions that have been included in the block or // any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g. // have been invalidated because of another transaction (e.g.
// higher gas price) // higher gas price)
pool.demoteUnexecutables() pool.demoteUnexecutables(newHead.Number().Uint64())
// Update all accounts to the latest known pending nonce // Update all accounts to the latest known pending nonce
for addr, list := range pool.pending { for addr, list := range pool.pending {
@ -1315,14 +1315,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere.
} }
// Drop all transactions that are too costly (low balance or out of gas) // Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.FilterPrice(pool, addr) drops, errs, _ := list.FilterValid(pool, addr, 0)
for _, tx := range drops { for i, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() pool.priced.Removed()
queuedNofundsCounter.Inc(1) queuedNofundsCounter.Inc(1)
pool.txErrorSink.Add(tx, fmt.Errorf("removed unpayable queued transaction")) pool.txErrorSink.Add(tx, errs[i])
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction") logger.Warn().Str("hash", hash.Hex()).Err(errs[i]).
Msg("Removed unpayable queued transaction")
} }
// Gather all executable transactions and promote them // Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
@ -1472,7 +1473,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// demoteUnexecutables removes invalid and processed transactions from the pools // demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable // executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue. // are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables() { func (pool *TxPool) demoteUnexecutables(bn uint64) {
// Iterate over all accounts and demote any non-executable transactions // Iterate over all accounts and demote any non-executable transactions
logger := utils.Logger().With().Stack().Logger() logger := utils.Logger().With().Stack().Logger()
@ -1488,14 +1489,15 @@ func (pool *TxPool) demoteUnexecutables() {
// Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere.
} }
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.FilterPrice(pool, addr) drops, errs, invalids := list.FilterValid(pool, addr, bn)
for _, tx := range drops { for i, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() pool.priced.Removed()
pendingNofundsCounter.Inc(1) pendingNofundsCounter.Inc(1)
pool.txErrorSink.Add(tx, fmt.Errorf("removed unexecutable pending transaction")) pool.txErrorSink.Add(tx, errs[i])
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unexecutable pending transaction") logger.Warn().Str("hash", hash.Hex()).Err(errs[i]).
Msg("Removed unexecutable pending transaction")
} }
for _, tx := range invalids { for _, tx := range invalids {
hash := tx.Hash() hash := tx.Hash()

@ -1532,7 +1532,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
// Benchmark the speed of pool validation // Benchmark the speed of pool validation
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
pool.demoteUnexecutables() pool.demoteUnexecutables(0)
} }
} }

@ -168,15 +168,10 @@ func (tx *StakingTransaction) Cost() (*big.Int, error) {
} }
total.Add(total, stkMsg.Amount) total.Add(total, stkMsg.Amount)
case DirectiveDelegate: case DirectiveDelegate:
msg, err := RLPDecodeStakeMsg(tx.Data(), DirectiveDelegate) // Temporary hack: Cost function is not accurate for delegate transaction.
if err != nil { // Thus the cost validation is done in `txPool.validateTx`.
return nil, err // TODO: refactor this hack.
} default:
stkMsg, ok := msg.(*Delegate)
if !ok {
return nil, errStakingTransactionTypeCastErr
}
total.Add(total, stkMsg.Amount)
} }
return total, nil return total, nil
} }

Loading…
Cancel
Save