[txPool] more fix in transaction pool

pull/3656/head
Jacky Wang 4 years ago
parent ee1644ce06
commit 0859ff37b4
No known key found for this signature in database
GPG Key ID: 1085CE5F4FF5842C
  1. 60
      core/tx_list.go
  2. 17
      core/tx_pool.go

@ -22,6 +22,8 @@ import (
"math/big" "math/big"
"sort" "sort"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
@ -123,6 +125,36 @@ func (m *txSortedMap) Filter(filter func(types.PoolTransaction) bool) types.Pool
return removed return removed
} }
// Filter iterates over the list of transactions and removes all of them for which
// the specified function evaluates to true. Return the filtered transactions and
// corresponding errors.
func (m *txSortedMap) FilterWithError(filter func(types.PoolTransaction) error) (types.PoolTransactions, []error) {
var (
removed types.PoolTransactions
errs []error
)
// Collect all the transactions to filter out
for nonce, tx := range m.items {
if err := filter(tx); err != nil {
removed = append(removed, tx)
errs = append(errs, err)
delete(m.items, nonce)
}
}
// If transactions were removed, the heap and cache are ruined
if len(removed) > 0 {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)
m.cache = nil
}
return removed, errs
}
// Cap places a hard limit on the number of items, returning all transactions // Cap places a hard limit on the number of items, returning all transactions
// exceeding that limit. // exceeding that limit.
func (m *txSortedMap) Cap(threshold int) types.PoolTransactions { func (m *txSortedMap) Cap(threshold int) types.PoolTransactions {
@ -287,26 +319,34 @@ func (l *txList) Forward(threshold uint64) types.PoolTransactions {
// than the provided thresholds and all staking transactions that can not be validated. // than the provided thresholds and all staking transactions that can not be validated.
func (l *txList) FilterValid( func (l *txList) FilterValid(
txPool *TxPool, address common.Address, txPool *TxPool, address common.Address,
) (types.PoolTransactions, types.PoolTransactions) { ) (types.PoolTransactions, types.PoolTransactions, []error) {
costLimit := txPool.currentState.GetBalance(address) costLimit := txPool.currentState.GetBalance(address)
gasLimit := txPool.currentMaxGas gasLimit := txPool.currentMaxGas
// If all transactions are below the threshold, short circuit // If all transactions are below the threshold, short circuit
if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit { if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
return nil, nil return nil, nil, nil
} }
l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds
l.gascap = gasLimit l.gascap = gasLimit
return l.Filter(func(tx types.PoolTransaction) bool { return l.Filter(func(tx types.PoolTransaction) error {
cost, err := tx.Cost() cost, err := tx.Cost()
if err != nil { if err != nil {
return true // failure should lead to removal of the tx return err // failure should lead to removal of the tx
}
if cost.Cmp(costLimit) > 0 {
return errors.New("not enough balance")
}
if tx.GasLimit() > gasLimit {
return errors.New("transaction gas limit exceeding block limit")
} }
if _, ok := tx.(*staking.StakingTransaction); ok { if _, ok := tx.(*staking.StakingTransaction); ok {
err := txPool.validateTx(tx, false) err := txPool.validateTx(tx, false)
return err != nil if err != nil {
return err
}
} }
return cost.Cmp(costLimit) == 1 || tx.GasLimit() > gasLimit return nil
}) })
} }
@ -314,13 +354,13 @@ func (l *txList) FilterValid(
// the specified function evaluates to true. Moreover, it returns all transactions // the specified function evaluates to true. Moreover, it returns all transactions
// that were invalidated from the filter // that were invalidated from the filter
func (l *txList) Filter( func (l *txList) Filter(
filter func(types.PoolTransaction) bool, filter func(types.PoolTransaction) error,
) (types.PoolTransactions, types.PoolTransactions) { ) (types.PoolTransactions, types.PoolTransactions, []error) {
// If the list was strict, filter anything above the lowest nonce // If the list was strict, filter anything above the lowest nonce
var invalids types.PoolTransactions var invalids types.PoolTransactions
// Filter out all the transactions above the account's funds // Filter out all the transactions above the account's funds
removed := l.txs.Filter(filter) removed, errs := l.txs.FilterWithError(filter)
if l.strict && len(removed) > 0 { if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64) lowest := uint64(math.MaxUint64)
for _, tx := range removed { for _, tx := range removed {
@ -330,7 +370,7 @@ func (l *txList) Filter(
} }
invalids = l.txs.Filter(func(tx types.PoolTransaction) bool { return tx.Nonce() > lowest }) invalids = l.txs.Filter(func(tx types.PoolTransaction) bool { return tx.Nonce() > lowest })
} }
return removed, invalids return removed, invalids, errs
} }
// Cap places a hard limit on the number of items, returning all transactions // Cap places a hard limit on the number of items, returning all transactions

@ -1315,14 +1315,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere.
} }
// Drop all transactions that are too costly (low balance or out of gas) // Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.FilterValid(pool, addr) drops, _, errs := list.FilterValid(pool, addr)
for _, tx := range drops { for i, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() pool.priced.Removed()
queuedNofundsCounter.Inc(1) queuedNofundsCounter.Inc(1)
pool.txErrorSink.Add(tx, fmt.Errorf("removed unpayable queued transaction")) pool.txErrorSink.Add(tx, errs[i])
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction") logger.Warn().Str("hash", hash.Hex()).Err(errs[i]).Msg("Removed unpayable queued transaction")
} }
// Gather all executable transactions and promote them // Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
@ -1488,14 +1488,15 @@ func (pool *TxPool) demoteUnexecutables() {
// Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere.
} }
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.FilterValid(pool, addr) drops, invalids, errs := list.FilterValid(pool, addr)
for _, tx := range drops { for i, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() pool.priced.Removed()
pendingNofundsCounter.Inc(1) pendingNofundsCounter.Inc(1)
pool.txErrorSink.Add(tx, fmt.Errorf("removed unexecutable pending transaction")) pool.txErrorSink.Add(tx, errs[i])
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unexecutable pending transaction") logger.Warn().Str("hash", hash.Hex()).Err(errs[i]).
Msg("Removed unexecutable pending transaction")
} }
for _, tx := range invalids { for _, tx := range invalids {
hash := tx.Hash() hash := tx.Hash()

Loading…
Cancel
Save