@ -29,11 +29,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
)
const (
@ -163,15 +164,15 @@ var DefaultTxPoolConfig = TxPoolConfig{
func ( config * TxPoolConfig ) sanitize ( ) TxPoolConfig {
conf := * config
if conf . Rejournal < time . Second {
log . Warn ( "Sanitizing invalid txpool journal time" , "provided" , conf . Rejournal , "updated" , time . Second )
uti ls . GetL ogger ( ) . Warn ( "Sanitizing invalid txpool journal time" , "provided" , conf . Rejournal , "updated" , time . Second )
conf . Rejournal = time . Second
}
if conf . PriceLimit < 1 {
log . Warn ( "Sanitizing invalid txpool price limit" , "provided" , conf . PriceLimit , "updated" , DefaultTxPoolConfig . PriceLimit )
uti ls . GetL ogger ( ) . Warn ( "Sanitizing invalid txpool price limit" , "provided" , conf . PriceLimit , "updated" , DefaultTxPoolConfig . PriceLimit )
conf . PriceLimit = DefaultTxPoolConfig . PriceLimit
}
if conf . PriceBump < 1 {
log . Warn ( "Sanitizing invalid txpool price bump" , "provided" , conf . PriceBump , "updated" , DefaultTxPoolConfig . PriceBump )
uti ls . GetL ogger ( ) . Warn ( "Sanitizing invalid txpool price bump" , "provided" , conf . PriceBump , "updated" , DefaultTxPoolConfig . PriceBump )
conf . PriceBump = DefaultTxPoolConfig . PriceBump
}
return conf
@ -235,7 +236,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
}
pool . locals = newAccountSet ( pool . signer )
for _ , addr := range config . Locals {
log . Info ( "Setting new local account" , "address" , addr )
uti ls . GetL ogger ( ) . Info ( "Setting new local account" , "address" , addr )
pool . locals . add ( addr )
}
pool . priced = newTxPricedList ( pool . all )
@ -246,10 +247,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool . journal = newTxJournal ( config . Journal )
if err := pool . journal . load ( pool . AddLocals ) ; err != nil {
log . Warn ( "Failed to load transaction journal" , "err" , err )
uti ls . GetL ogger ( ) . Warn ( "Failed to load transaction journal" , "err" , err )
}
if err := pool . journal . rotate ( pool . local ( ) ) ; err != nil {
log . Warn ( "Failed to rotate transaction journal" , "err" , err )
uti ls . GetL ogger ( ) . Warn ( "Failed to rotate transaction journal" , "err" , err )
}
}
// Subscribe events from blockchain
@ -310,7 +311,7 @@ func (pool *TxPool) loop() {
pool . mu . RUnlock ( )
if pending != prevPending || queued != prevQueued || stales != prevStales {
log . Debug ( "Transaction pool status report" , "executable" , pending , "queued" , queued , "stales" , stales )
uti ls . GetL ogger ( ) . Debug ( "Transaction pool status report" , "executable" , pending , "queued" , queued , "stales" , stales )
prevPending , prevQueued , prevStales = pending , queued , stales
}
@ -336,7 +337,7 @@ func (pool *TxPool) loop() {
if pool . journal != nil {
pool . mu . Lock ( )
if err := pool . journal . rotate ( pool . local ( ) ) ; err != nil {
log . Warn ( "Failed to rotate local tx journal" , "err" , err )
uti ls . GetL ogger ( ) . Warn ( "Failed to rotate local tx journal" , "err" , err )
}
pool . mu . Unlock ( )
}
@ -365,7 +366,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
newNum := newHead . Number . Uint64 ( )
if depth := uint64 ( math . Abs ( float64 ( oldNum ) - float64 ( newNum ) ) ) ; depth > 64 {
log . Debug ( "Skipping deep transaction reorg" , "depth" , depth )
uti ls . GetL ogger ( ) . Debug ( "Skipping deep transaction reorg" , "depth" , depth )
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded , included types . Transactions
@ -377,26 +378,26 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
for rem . NumberU64 ( ) > add . NumberU64 ( ) {
discarded = append ( discarded , rem . Transactions ( ) ... )
if rem = pool . chain . GetBlock ( rem . ParentHash ( ) , rem . NumberU64 ( ) - 1 ) ; rem == nil {
log . Error ( "Unrooted old chain seen by tx pool" , "block" , oldHead . Number , "hash" , oldHead . Hash ( ) )
uti ls . GetL ogger ( ) . Error ( "Unrooted old chain seen by tx pool" , "block" , oldHead . Number , "hash" , oldHead . Hash ( ) )
return
}
}
for add . NumberU64 ( ) > rem . NumberU64 ( ) {
included = append ( included , add . Transactions ( ) ... )
if add = pool . chain . GetBlock ( add . ParentHash ( ) , add . NumberU64 ( ) - 1 ) ; add == nil {
log . Error ( "Unrooted new chain seen by tx pool" , "block" , newHead . Number , "hash" , newHead . Hash ( ) )
uti ls . GetL ogger ( ) . Error ( "Unrooted new chain seen by tx pool" , "block" , newHead . Number , "hash" , newHead . Hash ( ) )
return
}
}
for rem . Hash ( ) != add . Hash ( ) {
discarded = append ( discarded , rem . Transactions ( ) ... )
if rem = pool . chain . GetBlock ( rem . ParentHash ( ) , rem . NumberU64 ( ) - 1 ) ; rem == nil {
log . Error ( "Unrooted old chain seen by tx pool" , "block" , oldHead . Number , "hash" , oldHead . Hash ( ) )
uti ls . GetL ogger ( ) . Error ( "Unrooted old chain seen by tx pool" , "block" , oldHead . Number , "hash" , oldHead . Hash ( ) )
return
}
included = append ( included , add . Transactions ( ) ... )
if add = pool . chain . GetBlock ( add . ParentHash ( ) , add . NumberU64 ( ) - 1 ) ; add == nil {
log . Error ( "Unrooted new chain seen by tx pool" , "block" , newHead . Number , "hash" , newHead . Hash ( ) )
uti ls . GetL ogger ( ) . Error ( "Unrooted new chain seen by tx pool" , "block" , newHead . Number , "hash" , newHead . Hash ( ) )
return
}
}
@ -409,7 +410,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
}
statedb , err := pool . chain . StateAt ( newHead . Root )
if err != nil {
log . Error ( "Failed to reset txpool state" , "err" , err )
uti ls . GetL ogger ( ) . Error ( "Failed to reset txpool state" , "err" , err )
return
}
pool . currentState = statedb
@ -417,7 +418,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool . currentMaxGas = newHead . GasLimit
// Inject any transactions discarded due to reorgs
log . Debug ( "Reinjecting stale transactions" , "count" , len ( reinject ) )
uti ls . GetL ogger ( ) . Debug ( "Reinjecting stale transactions" , "count" , len ( reinject ) )
//senderCacher.recover(pool.signer, reinject)
pool . addTxsLocked ( reinject , false )
@ -449,7 +450,7 @@ func (pool *TxPool) Stop() {
if pool . journal != nil {
pool . journal . close ( )
}
log . Info ( "Transaction pool stopped" )
uti ls . GetL ogger ( ) . Info ( "Transaction pool stopped" )
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
@ -476,7 +477,7 @@ func (pool *TxPool) SetGasPrice(price *big.Int) {
for _ , tx := range pool . priced . Cap ( price , pool . locals ) {
pool . removeTx ( tx . Hash ( ) , false )
}
log . Info ( "Transaction pool price threshold updated" , "price" , price )
uti ls . GetL ogger ( ) . Info ( "Transaction pool price threshold updated" , "price" , price )
}
// State returns the virtual managed state of the transaction pool.
@ -622,12 +623,12 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction is already known, discard it
hash := tx . Hash ( )
if pool . all . Get ( hash ) != nil {
log . Trace ( "Discarding already known transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Discarding already known transaction" , "hash" , hash )
return false , fmt . Errorf ( "known transaction: %x" , hash )
}
// If the transaction fails basic validation, discard it
if err := pool . validateTx ( tx , local ) ; err != nil {
log . Trace ( "Discarding invalid transaction" , "hash" , hash , "err" , err )
uti ls . GetL ogger ( ) . Trace ( "Discarding invalid transaction" , "hash" , hash , "err" , err )
invalidTxCounter . Inc ( 1 )
return false , err
}
@ -635,14 +636,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
if uint64 ( pool . all . Count ( ) ) >= pool . config . GlobalSlots + pool . config . GlobalQueue {
// If the new transaction is underpriced, don't accept it
if ! local && pool . priced . Underpriced ( tx , pool . locals ) {
log . Trace ( "Discarding underpriced transaction" , "hash" , hash , "price" , tx . GasPrice ( ) )
uti ls . GetL ogger ( ) . Trace ( "Discarding underpriced transaction" , "hash" , hash , "price" , tx . GasPrice ( ) )
underpricedTxCounter . Inc ( 1 )
return false , ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it
drop := pool . priced . Discard ( pool . all . Count ( ) - int ( pool . config . GlobalSlots + pool . config . GlobalQueue - 1 ) , pool . locals )
for _ , tx := range drop {
log . Trace ( "Discarding freshly underpriced transaction" , "hash" , tx . Hash ( ) , "price" , tx . GasPrice ( ) )
uti ls . GetL ogger ( ) . Trace ( "Discarding freshly underpriced transaction" , "hash" , tx . Hash ( ) , "price" , tx . GasPrice ( ) )
underpricedTxCounter . Inc ( 1 )
pool . removeTx ( tx . Hash ( ) , false )
}
@ -666,7 +667,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool . priced . Put ( tx )
pool . journalTx ( from , tx )
log . Trace ( "Pooled new executable transaction" , "hash" , hash , "from" , from , "to" , tx . To ( ) )
uti ls . GetL ogger ( ) . Trace ( "Pooled new executable transaction" , "hash" , hash , "from" , from , "to" , tx . To ( ) )
// We've directly injected a replacement transaction, notify subsystems
// go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
@ -681,13 +682,13 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// Mark local addresses and journal local transactions
if local {
if ! pool . locals . contains ( from ) {
log . Info ( "Setting new local account" , "address" , from )
uti ls . GetL ogger ( ) . Info ( "Setting new local account" , "address" , from )
pool . locals . add ( from )
}
}
pool . journalTx ( from , tx )
log . Trace ( "Pooled new future transaction" , "hash" , hash , "from" , from , "to" , tx . To ( ) )
uti ls . GetL ogger ( ) . Trace ( "Pooled new future transaction" , "hash" , hash , "from" , from , "to" , tx . To ( ) )
return replace , nil
}
@ -735,7 +736,7 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
return
}
if err := pool . journal . insert ( tx ) ; err != nil {
log . Warn ( "Failed to journal local transaction" , "err" , err )
uti ls . GetL ogger ( ) . Warn ( "Failed to journal local transaction" , "err" , err )
}
}
@ -949,7 +950,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Drop all transactions that are deemed too old (low nonce)
for _ , tx := range list . Forward ( pool . currentState . GetNonce ( addr ) ) {
hash := tx . Hash ( )
log . Trace ( "Removed old queued transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Removed old queued transaction" , "hash" , hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
}
@ -957,7 +958,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
drops , _ := list . Filter ( pool . currentState . GetBalance ( addr ) , pool . currentMaxGas )
for _ , tx := range drops {
hash := tx . Hash ( )
log . Trace ( "Removed unpayable queued transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Removed unpayable queued transaction" , "hash" , hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
queuedNofundsCounter . Inc ( 1 )
@ -966,7 +967,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _ , tx := range list . Ready ( pool . pendingState . GetNonce ( addr ) ) {
hash := tx . Hash ( )
if pool . promoteTx ( addr , hash , tx ) {
log . Trace ( "Promoting queued transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Promoting queued transaction" , "hash" , hash )
promoted = append ( promoted , tx )
}
}
@ -977,7 +978,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
pool . all . Remove ( hash )
pool . priced . Removed ( )
queuedRateLimitCounter . Inc ( 1 )
log . Trace ( "Removed cap-exceeding queued transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Removed cap-exceeding queued transaction" , "hash" , hash )
}
}
// Delete the entire queue entry if it became empty.
@ -1030,7 +1031,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if nonce := tx . Nonce ( ) ; pool . pendingState . GetNonce ( offenders [ i ] ) > nonce {
pool . pendingState . SetNonce ( offenders [ i ] , nonce )
}
log . Trace ( "Removed fairness-exceeding pending transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Removed fairness-exceeding pending transaction" , "hash" , hash )
}
pending --
}
@ -1052,7 +1053,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if nonce := tx . Nonce ( ) ; pool . pendingState . GetNonce ( addr ) > nonce {
pool . pendingState . SetNonce ( addr , nonce )
}
log . Trace ( "Removed fairness-exceeding pending transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Removed fairness-exceeding pending transaction" , "hash" , hash )
}
pending --
}
@ -1113,7 +1114,7 @@ func (pool *TxPool) demoteUnexecutables() {
// Drop all transactions that are deemed too old (low nonce)
for _ , tx := range list . Forward ( nonce ) {
hash := tx . Hash ( )
log . Trace ( "Removed old pending transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Removed old pending transaction" , "hash" , hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
}
@ -1121,21 +1122,21 @@ func (pool *TxPool) demoteUnexecutables() {
drops , invalids := list . Filter ( pool . currentState . GetBalance ( addr ) , pool . currentMaxGas )
for _ , tx := range drops {
hash := tx . Hash ( )
log . Trace ( "Removed unpayable pending transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Removed unpayable pending transaction" , "hash" , hash )
pool . all . Remove ( hash )
pool . priced . Removed ( )
pendingNofundsCounter . Inc ( 1 )
}
for _ , tx := range invalids {
hash := tx . Hash ( )
log . Trace ( "Demoting pending transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Trace ( "Demoting pending transaction" , "hash" , hash )
pool . enqueueTx ( hash , tx )
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list . Len ( ) > 0 && list . txs . Get ( nonce ) == nil {
for _ , tx := range list . Cap ( 0 ) {
hash := tx . Hash ( )
log . Error ( "Demoting invalidated transaction" , "hash" , hash )
uti ls . GetL ogger ( ) . Error ( "Demoting invalidated transaction" , "hash" , hash )
pool . enqueueTx ( hash , tx )
}
}