@ -369,8 +369,13 @@ func (pool *TxPool) loop() {
}
}
// Any non-locals old enough should be removed
// Any non-locals old enough should be removed
if time . Since ( pool . beats [ addr ] ) > pool . config . Lifetime {
if time . Since ( pool . beats [ addr ] ) > pool . config . Lifetime {
b32addr , err := hmyCommon . AddressToBech32 ( addr )
if err != nil {
b32addr = "unknown"
}
for _ , tx := range pool . queue [ addr ] . Flatten ( ) {
for _ , tx := range pool . queue [ addr ] . Flatten ( ) {
pool . removeTx ( tx . Hash ( ) , true )
pool . removeTx ( tx . Hash ( ) , true )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "removed transaction for inactive account %v" , b32addr ) )
}
}
}
}
}
}
@ -557,6 +562,8 @@ func (pool *TxPool) SetGasPrice(price *big.Int) {
pool . gasPrice = price
pool . gasPrice = price
for _ , tx := range pool . priced . Cap ( price , pool . locals ) {
for _ , tx := range pool . priced . Cap ( price , pool . locals ) {
pool . removeTx ( tx . Hash ( ) , false )
pool . removeTx ( tx . Hash ( ) , false )
pool . txErrorSink . Add ( tx ,
fmt . Errorf ( "dropped transaction below new gas price threshold of %v" , price . String ( ) ) )
}
}
utils . Logger ( ) . Info ( ) . Str ( "price" , price . String ( ) ) . Msg ( "Transaction pool price threshold updated" )
utils . Logger ( ) . Info ( ) . Str ( "price" , price . String ( ) ) . Msg ( "Transaction pool price threshold updated" )
}
}
@ -868,7 +875,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
// If the transaction is already known, discard it
// If the transaction is already known, discard it
hash := tx . Hash ( )
hash := tx . Hash ( )
if pool . all . Get ( hash ) != nil {
if pool . all . Get ( hash ) != nil {
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Discarding already known transaction" )
logger . Info ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Discarding already known transaction" )
return false , errors . WithMessagef ( ErrKnownTransaction , "transaction hash %x" , hash )
return false , errors . WithMessagef ( ErrKnownTransaction , "transaction hash %x" , hash )
}
}
// If the transaction fails basic validation, discard it
// If the transaction fails basic validation, discard it
@ -893,12 +900,16 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
// New transaction is better than our worse ones, make room for it
// New transaction is better than our worse ones, make room for it
drop := pool . priced . Discard ( pool . all . Count ( ) - int ( pool . config . GlobalSlots + pool . config . GlobalQueue - 1 ) , pool . locals )
drop := pool . priced . Discard ( pool . all . Count ( ) - int ( pool . config . GlobalSlots + pool . config . GlobalQueue - 1 ) , pool . locals )
for _ , tx := range drop {
for _ , tx := range drop {
gasPrice := new ( big . Float ) . SetInt64 ( tx . GasPrice ( ) . Int64 ( ) )
gasPrice = gasPrice . Mul ( gasPrice , new ( big . Float ) . SetFloat64 ( 1e-9 ) ) // Gas-price is in Nano
pool . removeTx ( tx . Hash ( ) , false )
underpricedTxCounter . Inc ( 1 )
pool . txErrorSink . Add ( tx ,
errors . WithMessagef ( ErrUnderpriced , "transaction gas-price is %.18f ONE in full transaction pool" , gasPrice ) )
logger . Warn ( ) .
logger . Warn ( ) .
Str ( "hash" , tx . Hash ( ) . Hex ( ) ) .
Str ( "hash" , tx . Hash ( ) . Hex ( ) ) .
Str ( "price" , tx . GasPrice ( ) . String ( ) ) .
Str ( "price" , tx . GasPrice ( ) . String ( ) ) .
Msg ( "Discarding freshly underpriced transaction" )
Msg ( "Discarding freshly underpriced transaction" )
underpricedTxCounter . Inc ( 1 )
pool . removeTx ( tx . Hash ( ) , false )
}
}
}
}
// If the transaction is replacing an already pending one, do directly
// If the transaction is replacing an already pending one, do directly
@ -915,15 +926,23 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
pool . all . Remove ( old . Hash ( ) )
pool . all . Remove ( old . Hash ( ) )
pool . priced . Removed ( )
pool . priced . Removed ( )
pendingReplaceCounter . Inc ( 1 )
pendingReplaceCounter . Inc ( 1 )
pool . txErrorSink . Add ( old ,
fmt . Errorf ( "replaced transaction, new transaction %v has same nonce & higher price" , tx . Hash ( ) . String ( ) ) )
logger . Info ( ) .
Str ( "hash" , old . Hash ( ) . String ( ) ) .
Str ( "new-tx-hash" , tx . Hash ( ) . String ( ) ) .
Str ( "price" , old . GasPrice ( ) . String ( ) ) .
Msg ( "Replaced transaction" )
}
}
pool . all . Add ( tx )
pool . all . Add ( tx )
pool . priced . Put ( tx )
pool . priced . Put ( tx )
pool . journalTx ( from , tx )
pool . journalTx ( from , tx )
logger . Info ( ) .
logger . Warn ( ) .
Str ( "hash" , tx . Hash ( ) . Hex ( ) ) .
Str ( "hash" , tx . Hash ( ) . Hex ( ) ) .
Interface ( "from" , from ) .
Interface ( "from" , from ) .
Interface ( "to" , tx . To ( ) ) .
Interface ( "to" , tx . To ( ) ) .
Str ( "price" , tx . GasPrice ( ) . String ( ) ) .
Msg ( "Pooled new executable transaction" )
Msg ( "Pooled new executable transaction" )
// We've directly injected a replacement transaction, notify subsystems
// We've directly injected a replacement transaction, notify subsystems
@ -932,7 +951,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
return old != nil , nil
return old != nil , nil
}
}
// New transaction isn't replacing a pending one, push into queue
// New transaction isn't replacing a pending one, push into queue
replace , err := pool . enqueueTx ( hash , tx )
replace , err := pool . enqueueTx ( tx )
if err != nil {
if err != nil {
return false , err
return false , err
}
}
@ -945,7 +964,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
}
}
pool . journalTx ( from , tx )
pool . journalTx ( from , tx )
logger . Info ( ) .
logger . Warn ( ) .
Str ( "hash" , hash . Hex ( ) ) .
Str ( "hash" , hash . Hex ( ) ) .
Interface ( "from" , from ) .
Interface ( "from" , from ) .
Interface ( "to" , tx . To ( ) ) .
Interface ( "to" , tx . To ( ) ) .
@ -956,7 +975,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
// enqueueTx inserts a new transaction into the non-executable transaction queue.
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
//
// Note, this method assumes the pool lock is held!
// Note, this method assumes the pool lock is held!
func ( pool * TxPool ) enqueueTx ( hash common . Hash , tx types . PoolTransaction ) ( bool , error ) {
func ( pool * TxPool ) enqueueTx ( tx types . PoolTransaction ) ( bool , error ) {
// Try to insert the transaction into the future queue
// Try to insert the transaction into the future queue
from , _ := tx . SenderAddress ( ) // already validated
from , _ := tx . SenderAddress ( ) // already validated
if pool . queue [ from ] == nil {
if pool . queue [ from ] == nil {
@ -973,8 +992,15 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx types.PoolTransaction) (bool,
pool . all . Remove ( old . Hash ( ) )
pool . all . Remove ( old . Hash ( ) )
pool . priced . Removed ( )
pool . priced . Removed ( )
queuedReplaceCounter . Inc ( 1 )
queuedReplaceCounter . Inc ( 1 )
}
pool . txErrorSink . Add ( old ,
if pool . all . Get ( hash ) == nil {
fmt . Errorf ( "replaced enqueued non-executable transaction, new transaction %v has same nonce & higher price" , tx . Hash ( ) . String ( ) ) )
utils . Logger ( ) . Info ( ) .
Str ( "hash" , old . Hash ( ) . String ( ) ) .
Str ( "new-tx-hash" , tx . Hash ( ) . String ( ) ) .
Str ( "price" , old . GasPrice ( ) . String ( ) ) .
Msg ( "Replaced enqueued non-executable transaction" )
}
if pool . all . Get ( tx . Hash ( ) ) == nil {
pool . all . Add ( tx )
pool . all . Add ( tx )
pool . priced . Put ( tx )
pool . priced . Put ( tx )
}
}
@ -997,7 +1023,7 @@ func (pool *TxPool) journalTx(from common.Address, tx types.PoolTransaction) {
// and returns whether it was inserted or an older was better.
// and returns whether it was inserted or an older was better.
//
//
// Note, this method assumes the pool lock is held!
// Note, this method assumes the pool lock is held!
func ( pool * TxPool ) promoteTx ( addr common . Address , hash common . Hash , tx types . PoolTransaction ) bool {
func ( pool * TxPool ) promoteTx ( addr common . Address , tx types . PoolTransaction ) bool {
// Try to insert the transaction into the pending queue
// Try to insert the transaction into the pending queue
if pool . pending [ addr ] == nil {
if pool . pending [ addr ] == nil {
pool . pending [ addr ] = newTxList ( true )
pool . pending [ addr ] = newTxList ( true )
@ -1007,21 +1033,29 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx types.Po
inserted , old := list . Add ( tx , pool . config . PriceBump )
inserted , old := list . Add ( tx , pool . config . PriceBump )
if ! inserted {
if ! inserted {
// An older transaction was better, discard this
// An older transaction was better, discard this
pool . all . Remove ( hash )
pool . all . Remove ( tx . Hash ( ) )
pool . priced . Removed ( )
pool . priced . Removed ( )
pendingDiscardCounter . Inc ( 1 )
pendingDiscardCounter . Inc ( 1 )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "could not promote to executable" ) )
utils . Logger ( ) . Info ( ) .
Str ( "hash" , tx . Hash ( ) . String ( ) ) .
Msg ( "Could not promote to executable" )
return false
return false
}
}
// Otherwise discard any previous transaction and mark this
// Otherwise discard any previous transaction and mark this
if old != nil {
if old != nil {
pool . all . Remove ( old . Hash ( ) )
pool . all . Remove ( old . Hash ( ) )
pool . priced . Removed ( )
pool . priced . Removed ( )
pendingReplaceCounter . Inc ( 1 )
pendingReplaceCounter . Inc ( 1 )
pool . txErrorSink . Add ( old ,
fmt . Errorf ( "did not promote to executable, existing transaction %v has same nonce & higher price" , tx . Hash ( ) . String ( ) ) )
utils . Logger ( ) . Info ( ) .
Str ( "hash" , old . Hash ( ) . String ( ) ) .
Str ( "existing-tx-hash" , tx . Hash ( ) . String ( ) ) .
Msg ( "Did not promote to executable, new transaction has higher price" )
}
}
// Failsafe to work around direct pending inserts (tests)
// Failsafe to work around direct pending inserts (tests)
if pool . all . Get ( hash ) == nil {
if pool . all . Get ( tx . Hash ( ) ) == nil {
pool . all . Add ( tx )
pool . all . Add ( tx )
pool . priced . Put ( tx )
pool . priced . Put ( tx )
}
}
@ -1069,6 +1103,7 @@ func (pool *TxPool) addTx(tx types.PoolTransaction, local bool) error {
replace , err := pool . add ( tx , local )
replace , err := pool . add ( tx , local )
if err != nil {
if err != nil {
errCause := errors . Cause ( err )
errCause := errors . Cause ( err )
// Ignore known transaction for tx rebroadcast case.
if errCause != ErrKnownTransaction {
if errCause != ErrKnownTransaction {
pool . txErrorSink . Add ( tx , err )
pool . txErrorSink . Add ( tx , err )
}
}
@ -1104,6 +1139,7 @@ func (pool *TxPool) addTxsLocked(txs types.PoolTransactions, local bool) []error
dirty [ from ] = struct { } { }
dirty [ from ] = struct { } { }
}
}
errCause := errors . Cause ( err )
errCause := errors . Cause ( err )
// Ignore known transaction for tx rebroadcast case.
if err != nil && errCause != ErrKnownTransaction {
if err != nil && errCause != ErrKnownTransaction {
pool . txErrorSink . Add ( tx , err )
pool . txErrorSink . Add ( tx , err )
}
}
@ -1173,7 +1209,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
}
}
// Postpone any invalidated transactions
// Postpone any invalidated transactions
for _ , tx := range invalids {
for _ , tx := range invalids {
if _ , err := pool . enqueueTx ( tx . Hash ( ) , tx ) ; err != nil {
if _ , err := pool . enqueueTx ( tx ) ; err != nil {
pool . txErrorSink . Add ( tx , err )
pool . txErrorSink . Add ( tx , err )
}
}
}
}
@ -1220,24 +1256,26 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
nonce := pool . currentState . GetNonce ( addr )
nonce := pool . currentState . GetNonce ( addr )
for _ , tx := range list . Forward ( nonce ) {
for _ , tx := range list . Forward ( nonce ) {
hash := tx . Hash ( )
hash := tx . Hash ( )
logger . Info ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed old queued transaction" )
pool . all . Remove ( hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pool . priced . Removed ( )
logger . Info ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed old queued transaction" )
// Do not report to error sink as old txs are on chain or meaningful error caught elsewhere.
}
}
// Drop all transactions that are too costly (low balance or out of gas)
// Drop all transactions that are too costly (low balance or out of gas)
drops , _ := list . Filter ( pool . currentState . GetBalance ( addr ) , pool . currentMaxGas )
drops , _ := list . Filter ( pool . currentState . GetBalance ( addr ) , pool . currentMaxGas )
for _ , tx := range drops {
for _ , tx := range drops {
hash := tx . Hash ( )
hash := tx . Hash ( )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed unpayable queued transaction" )
pool . all . Remove ( hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pool . priced . Removed ( )
queuedNofundsCounter . Inc ( 1 )
queuedNofundsCounter . Inc ( 1 )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "removed unpayable queued transaction" ) )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed unpayable queued transaction" )
}
}
// Gather all executable transactions and promote them
// Gather all executable transactions and promote them
for _ , tx := range list . Ready ( pool . pendingState . GetNonce ( addr ) ) {
for _ , tx := range list . Ready ( pool . pendingState . GetNonce ( addr ) ) {
hash := tx . Hash ( )
hash := tx . Hash ( )
if pool . promoteTx ( addr , hash , tx ) {
if pool . promoteTx ( addr , tx ) {
logger . Info ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Promoting queued transaction" )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Promoting queued transaction" )
promoted = append ( promoted , tx )
promoted = append ( promoted , tx )
}
}
}
}
@ -1245,11 +1283,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if ! pool . locals . contains ( addr ) {
if ! pool . locals . contains ( addr ) {
for _ , tx := range list . Cap ( int ( pool . config . AccountQueue ) ) {
for _ , tx := range list . Cap ( int ( pool . config . AccountQueue ) ) {
hash := tx . Hash ( )
hash := tx . Hash ( )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed cap-exceeding queued transaction" )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "exceeds cap for queued transactions for account %s" , addr . String ( ) ) )
pool . all . Remove ( hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pool . priced . Removed ( )
queuedRateLimitCounter . Inc ( 1 )
queuedRateLimitCounter . Inc ( 1 )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "exceeds cap for queued transactions for account %s" , addr . String ( ) ) )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed cap-exceeding queued transaction" )
}
}
}
}
// Delete the entire queue entry if it became empty.
// Delete the entire queue entry if it became empty.
@ -1295,9 +1333,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _ , tx := range list . Cap ( list . Len ( ) - 1 ) {
for _ , tx := range list . Cap ( list . Len ( ) - 1 ) {
// Drop the transaction from the global pools too
// Drop the transaction from the global pools too
hash := tx . Hash ( )
hash := tx . Hash ( )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "fairness-exceeding pending transaction" ) )
pool . all . Remove ( hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pool . priced . Removed ( )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "fairness-exceeding pending transaction" ) )
// Update the account nonce to the dropped transaction
// Update the account nonce to the dropped transaction
if nonce := tx . Nonce ( ) ; pool . pendingState . GetNonce ( offenders [ i ] ) > nonce {
if nonce := tx . Nonce ( ) ; pool . pendingState . GetNonce ( offenders [ i ] ) > nonce {
@ -1318,9 +1356,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _ , tx := range list . Cap ( list . Len ( ) - 1 ) {
for _ , tx := range list . Cap ( list . Len ( ) - 1 ) {
// Drop the transaction from the global pools too
// Drop the transaction from the global pools too
hash := tx . Hash ( )
hash := tx . Hash ( )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "fairness-exceeding pending transaction" ) )
pool . all . Remove ( hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pool . priced . Removed ( )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "fairness-exceeding pending transaction" ) )
// Update the account nonce to the dropped transaction
// Update the account nonce to the dropped transaction
if nonce := tx . Nonce ( ) ; pool . pendingState . GetNonce ( addr ) > nonce {
if nonce := tx . Nonce ( ) ; pool . pendingState . GetNonce ( addr ) > nonce {
@ -1391,23 +1429,25 @@ func (pool *TxPool) demoteUnexecutables() {
// Drop all transactions that are deemed too old (low nonce)
// Drop all transactions that are deemed too old (low nonce)
for _ , tx := range list . Forward ( nonce ) {
for _ , tx := range list . Forward ( nonce ) {
hash := tx . Hash ( )
hash := tx . Hash ( )
logger . Info ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed old pending transaction" )
pool . all . Remove ( hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pool . priced . Removed ( )
logger . Info ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed old pending transaction" )
// Do not report to error sink as old txs are on chain or meaningful error caught elsewhere.
}
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops , invalids := list . Filter ( pool . currentState . GetBalance ( addr ) , pool . currentMaxGas )
drops , invalids := list . Filter ( pool . currentState . GetBalance ( addr ) , pool . currentMaxGas )
for _ , tx := range drops {
for _ , tx := range drops {
hash := tx . Hash ( )
hash := tx . Hash ( )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed unpayable pending transaction" )
pool . all . Remove ( hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pool . priced . Removed ( )
pendingNofundsCounter . Inc ( 1 )
pendingNofundsCounter . Inc ( 1 )
pool . txErrorSink . Add ( tx , fmt . Errorf ( "removed unpayable pending transaction" ) )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Removed unpayable pending transaction" )
}
}
for _ , tx := range invalids {
for _ , tx := range invalids {
hash := tx . Hash ( )
hash := tx . Hash ( )
logger . Info ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Demoting pending transaction" )
logger . Warn ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Demoting pending transaction" )
if _ , err := pool . enqueueTx ( hash , tx ) ; err != nil {
if _ , err := pool . enqueueTx ( tx ) ; err != nil {
pool . txErrorSink . Add ( tx , err )
pool . txErrorSink . Add ( tx , err )
}
}
}
}
@ -1416,7 +1456,7 @@ func (pool *TxPool) demoteUnexecutables() {
for _ , tx := range list . Cap ( 0 ) {
for _ , tx := range list . Cap ( 0 ) {
hash := tx . Hash ( )
hash := tx . Hash ( )
logger . Error ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Demoting invalidated transaction" )
logger . Error ( ) . Str ( "hash" , hash . Hex ( ) ) . Msg ( "Demoting invalidated transaction" )
if _ , err := pool . enqueueTx ( hash , tx ) ; err != nil {
if _ , err := pool . enqueueTx ( tx ) ; err != nil {
pool . txErrorSink . Add ( tx , err )
pool . txErrorSink . Add ( tx , err )
}
}
}
}