Report failed (non-staking) transactions (#2160)

* [node/worker] Remove regular txn error sink

* [tests] Remove error sink fn field

* [core/types] Add `NewRPCTransactionError` fn

* [core] Report failed txs from tx_pool via error sink & wrap errs with useful msgs

* [core/test] Update tests for tx_pool error sink

* [node] Make all nodes record txs & add tx error sink RPC callback to `NewTxPool`

Co-authored-by: Edgar Aroutiounian <edgar.factorial@gmail.com>
pull/2171/head
Daniel Van Der Maden 5 years ago committed by Edgar Aroutiounian
parent c4f36484ea
commit 9fe8792c02
  1. 105
      core/tx_pool.go
  2. 26
      core/tx_pool_test.go
  3. 10
      core/types/transaction.go
  4. 23
      node/node.go
  5. 2
      node/node_handler_test.go
  6. 8
      node/node_newblock.go
  7. 9
      node/worker/worker.go
  8. 1
      node/worker/worker_test.go
  9. 4
      test/chain/main.go

@ -18,7 +18,6 @@ package core
import (
"context"
"errors"
"fmt"
"math"
"math/big"
@ -31,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
"github.com/harmony-one/harmony/internal/params"
"github.com/pkg/errors"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/state"
@ -79,6 +79,10 @@ var (
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")
// ErrKnownTransaction is returned if a transaction that is already in the pool
// attempting to be added to the pool.
ErrKnownTransaction = errors.New("known transaction")
)
var (
@ -222,27 +226,30 @@ type TxPool struct {
wg sync.WaitGroup // for shutdown sync
txnErrorSink func([]types.RPCTransactionError)
homestead bool
}
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain, txnErrorSink func([]types.RPCTransactionError)) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
txnErrorSink: txnErrorSink,
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
@ -601,30 +608,32 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
if tx.Size() > 32*1024 {
return ErrOversizedData
return errors.WithMessagef(ErrOversizedData, "transaction size is %s", tx.Size().String())
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return ErrNegativeValue
return errors.WithMessagef(ErrNegativeValue, "transaction value is %s", tx.Value().String())
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
return errors.WithMessagef(ErrGasLimit, "transaction gas is %d", tx.Gas())
}
// Make sure the transaction is signed properly
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
return errors.WithMessagef(ErrInvalidSender, "transaction sender is %v", from)
}
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
gasPrice := new(big.Float).SetInt64(tx.GasPrice().Int64())
gasPrice = gasPrice.Mul(gasPrice, new(big.Float).SetFloat64(1e-9)) // Gas-price is in Nano
return errors.WithMessagef(ErrUnderpriced, "transaction gas-price is %.18f ONE", gasPrice)
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
return errors.WithMessagef(ErrNonceTooLow, "transaction nonce is %d", tx.Nonce())
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
@ -636,7 +645,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return err
}
if tx.Gas() < intrGas {
return ErrIntrinsicGas
return errors.WithMessagef(ErrIntrinsicGas, "transaction gas is %d", tx.Gas())
}
return nil
}
@ -655,7 +664,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
hash := tx.Hash()
if pool.all.Get(hash) != nil {
logger.Warn().Str("hash", hash.Hex()).Msg("Discarding already known transaction")
return false, fmt.Errorf("known transaction: %x", hash)
return false, errors.WithMessagef(ErrKnownTransaction, "transaction hash %x", hash)
}
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
@ -667,12 +676,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !local && pool.priced.Underpriced(tx, pool.locals) {
gasPrice := new(big.Float).SetInt64(tx.GasPrice().Int64())
gasPrice = gasPrice.Mul(gasPrice, new(big.Float).SetFloat64(1e-9)) // Gas-price is in Nano
logger.Warn().
Str("hash", hash.Hex()).
Str("price", tx.GasPrice().String()).
Msg("Discarding underpriced transaction")
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
return false, errors.WithMessagef(ErrUnderpriced, "transaction gas-price is %.18f ONE in full transaction pool", gasPrice)
}
// New transaction is better than our worse ones, make room for it
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
@ -692,7 +703,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
return false, errors.WithMessage(ErrReplaceUnderpriced, "existing transaction price was not bumped enough")
}
// New transaction is better, replace old one
if old != nil {
@ -828,14 +839,14 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// the sender as a local one in the mean time, ensuring it goes around the local
// pricing constraints.
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
return errors.Cause(pool.addTx(tx, !pool.config.NoLocals))
}
// AddRemote enqueues a single transaction into the pool if it is valid. If the
// sender is not among the locally tracked ones, full pricing constraints will
// apply.
func (pool *TxPool) AddRemote(tx *types.Transaction) error {
return pool.addTx(tx, false)
return errors.Cause(pool.addTx(tx, false))
}
// AddLocals enqueues a batch of transactions into the pool if they are valid,
@ -860,6 +871,9 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
// Try to inject the transaction and update any state
replace, err := pool.add(tx, local)
if err != nil {
if errors.Cause(err) != ErrKnownTransaction {
pool.txnErrorSink([]types.RPCTransactionError{*types.NewRPCTransactionError(tx.Hash(), err)})
}
return err
}
// If we added a new transaction, run promotion checks and return
@ -884,6 +898,7 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
// Add the batch of transaction, tracking the accepted ones
dirty := make(map[common.Address]struct{})
errs := make([]error, len(txs))
erroredTxns := []types.RPCTransactionError{}
for i, tx := range txs {
var replace bool
@ -891,6 +906,9 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
}
if errs[i] != nil && errors.Cause(errs[i]) != ErrKnownTransaction {
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), errs[i]))
}
}
// Only reprocess the internal state if something was actually added
if len(dirty) > 0 {
@ -900,6 +918,8 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
}
pool.promoteExecutables(addrs)
}
pool.txnErrorSink(erroredTxns)
return errs
}
@ -979,6 +999,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
logger := utils.Logger().With().Stack().Logger()
erroredTxns := []types.RPCTransactionError{}
// Gather all the accounts potentially needing updates
if accounts == nil {
@ -994,9 +1015,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
nonce := pool.currentState.GetNonce(addr)
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed old queued transaction")
if pool.chain.CurrentBlock().Transaction(hash) == nil {
err := fmt.Errorf("old transaction, nonce %d is too low", nonce)
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
}
pool.all.Remove(hash)
pool.priced.Removed()
}
@ -1005,6 +1031,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range drops {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction")
err := fmt.Errorf("unpayable transaction, out of gas or balance of %d cannot pay cost of %d", tx.Value(), tx.Cost())
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
@ -1021,10 +1049,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction")
err := fmt.Errorf("exceeds cap for queued transactions for account %s", addr.String())
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction")
}
}
// Delete the entire queue entry if it became empty.
@ -1070,6 +1100,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
err := fmt.Errorf("fairness-exceeding pending transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
@ -1092,6 +1124,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
err := fmt.Errorf("fairness-exceeding pending transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
@ -1132,6 +1166,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
err := fmt.Errorf("exceeds global cap for queued transactions")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.removeTx(tx.Hash(), true)
}
drop -= size
@ -1141,12 +1177,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
err := fmt.Errorf("exceeds global cap for queued transactions")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(txs[i].Hash(), err))
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
pool.txnErrorSink(erroredTxns)
}
// demoteUnexecutables removes invalid and processed transactions from the pools
@ -1155,6 +1195,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
func (pool *TxPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
logger := utils.Logger().With().Stack().Logger()
erroredTxns := []types.RPCTransactionError{}
for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)
@ -1163,6 +1204,10 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed old pending transaction")
if pool.chain.CurrentBlock().Transaction(hash) == nil {
err := fmt.Errorf("old transaction, nonce %d is too low", nonce)
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
}
pool.all.Remove(hash)
pool.priced.Removed()
}
@ -1171,6 +1216,8 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range drops {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable pending transaction")
err := fmt.Errorf("unpayable transaction, out of gas or balance of %d cannot pay cost of %d", tx.Value(), tx.Cost())
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
pendingNofundsCounter.Inc(1)
@ -1178,6 +1225,8 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range invalids {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Demoting pending transaction")
err := fmt.Errorf("demoting pending transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.enqueueTx(hash, tx)
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
@ -1185,6 +1234,8 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range list.Cap(0) {
hash := tx.Hash()
logger.Error().Str("hash", hash.Hex()).Msg("Demoting invalidated transaction")
err := fmt.Errorf("demoting invalid transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.enqueueTx(hash, tx)
}
}
@ -1193,6 +1244,8 @@ func (pool *TxPool) demoteUnexecutables() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
pool.txnErrorSink(erroredTxns)
}
}

@ -84,7 +84,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
key, _ := crypto.GenerateKey()
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
return pool, key
}
@ -194,7 +194,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
tx0 := transaction(0, 100000, key)
tx1 := transaction(1, 100000, key)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
nonce := pool.State().GetNonce(address)
@ -559,7 +559,7 @@ func TestTransactionPostponing(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create two test accounts to produce different gap profiles with
@ -721,7 +721,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
config.NoLocals = nolocals
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
pool := NewTxPool(config, params.TestChainConfig, blockchain)
pool := NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create a number of test accounts and fund them (last one will be the local)
@ -809,7 +809,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
config.Lifetime = time.Second
config.NoLocals = nolocals
pool := NewTxPool(config, params.TestChainConfig, blockchain)
pool := NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create two test accounts to ensure remotes expire but locals do not
@ -921,7 +921,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
config := testTxPoolConfig
config.GlobalSlots = config.AccountSlots * 10
pool := NewTxPool(config, params.TestChainConfig, blockchain)
pool := NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create a number of test accounts and fund them
@ -969,7 +969,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
config.AccountQueue = 2
config.GlobalSlots = 8
pool := NewTxPool(config, params.TestChainConfig, blockchain)
pool := NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create a number of test accounts and fund them
@ -1001,7 +1001,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
config := testTxPoolConfig
config.GlobalSlots = 0
pool := NewTxPool(config, params.TestChainConfig, blockchain)
pool := NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create a number of test accounts and fund them
@ -1043,7 +1043,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create a number of test accounts and fund them
@ -1122,7 +1122,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
config.Journal = journal
config.Rejournal = time.Second
pool := NewTxPool(config, params.TestChainConfig, blockchain)
pool := NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
@ -1159,7 +1159,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool = NewTxPool(config, params.TestChainConfig, blockchain)
pool = NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
pending, queued = pool.Stats()
if queued != 0 {
@ -1185,7 +1185,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool = NewTxPool(config, params.TestChainConfig, blockchain)
pool = NewTxPool(config, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
pending, queued = pool.Stats()
if pending != 0 {
@ -1215,7 +1215,7 @@ func TestTransactionStatusCheck(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, func([]types.RPCTransactionError) {})
defer pool.Stop()
// Create the test accounts to check various transaction statuses with

@ -23,6 +23,7 @@ import (
"io"
"math/big"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -77,6 +78,15 @@ type RPCTransactionError struct {
ErrMessage string `json:"error-message"`
}
// NewRPCTransactionError ...
func NewRPCTransactionError(hash common.Hash, err error) *RPCTransactionError {
return &RPCTransactionError{
TxHashID: hash.Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
}
}
//String print mode string
func (txType TransactionType) String() string {
if txType == SameShardTx {

@ -330,13 +330,14 @@ func (node *Node) AddPendingStakingTransaction(
// AddPendingTransaction adds one new transaction to the pending transaction list.
// This is only called from SDK.
func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
// TODO: everyone should record txns, not just leader
if node.Consensus.IsLeader() && newTx.ShardID() == node.NodeConfig.ShardID {
if newTx.ShardID() == node.NodeConfig.ShardID {
node.addPendingTransactions(types.Transactions{newTx})
} else {
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx)
if node.NodeConfig.Role() != nodeconfig.ExplorerNode {
return
}
}
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx)
}
// AddPendingReceipts adds one receipt message to pending list.
@ -501,7 +502,17 @@ func New(host p2p.Host, consensusObj *consensus.Consensus,
node.BlockChannel = make(chan *types.Block)
node.ConfirmedBlockChannel = make(chan *types.Block)
node.BeaconBlockChannel = make(chan *types.Block)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain,
func(payload []types.RPCTransactionError) {
if len(payload) > 0 {
node.errorSink.Lock()
for i := range payload {
node.errorSink.failedTxns.Value = payload[i]
node.errorSink.failedTxns = node.errorSink.failedTxns.Next()
}
node.errorSink.Unlock()
}
})
node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine)

@ -40,7 +40,6 @@ func TestAddNewBlock(t *testing.T) {
node.Worker.CommitTransactions(
txs, stks, common.Address{},
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
@ -77,7 +76,6 @@ func TestVerifyNewBlock(t *testing.T) {
node.Worker.CommitTransactions(
txs, stks, common.Address{},
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)

@ -131,14 +131,6 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
}
node.errorSink.Unlock()
},
func(payload []types.RPCTransactionError) {
node.errorSink.Lock()
for i := range payload {
node.errorSink.failedTxns.Value = payload[i]
node.errorSink.failedTxns = node.errorSink.failedTxns.Next()
}
node.errorSink.Unlock()
},
); err != nil {
utils.Logger().Error().Err(err).Msg("cannot commit transactions")
return nil, err

@ -53,7 +53,6 @@ func (w *Worker) CommitTransactions(
pendingNormal map[common.Address]types.Transactions,
pendingStaking staking.StakingTransactions, coinbase common.Address,
stkingTxErrorSink func([]staking.RPCTransactionError),
txnErrorSink func([]types.RPCTransactionError),
) error {
if w.current.gasPool == nil {
@ -62,7 +61,6 @@ func (w *Worker) CommitTransactions(
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingNormal)
coalescedLogs := []*types.Log{}
erroredTxns := []types.RPCTransactionError{}
erroredStakingTxns := []staking.RPCTransactionError{}
// NORMAL
for {
@ -96,11 +94,7 @@ func (w *Worker) CommitTransactions(
}
logs, err := w.commitTransaction(tx, coinbase)
if err != nil {
erroredTxns = append(erroredTxns, types.RPCTransactionError{
tx.Hash().Hex(), time.Now().Unix(), err.Error(),
})
}
sender, _ := common2.AddressToBech32(from)
switch err {
case core.ErrGasLimitReached:
@ -156,7 +150,6 @@ func (w *Worker) CommitTransactions(
}
// Here call the error functions
stkingTxErrorSink(erroredStakingTxns)
txnErrorSink(erroredTxns)
utils.Logger().Info().
Int("newTxns", len(w.current.txs)).

@ -81,7 +81,6 @@ func TestCommitTransactions(t *testing.T) {
err := worker.CommitTransactions(
txs, nil, testBankAddress,
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
if err != nil {
t.Error(err)

@ -130,7 +130,6 @@ func fundFaucetContract(chain *core.BlockChain) {
err := contractworker.CommitTransactions(
txmap, nil, testUserAddress,
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
if err != nil {
fmt.Println(err)
@ -173,7 +172,6 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) {
err = contractworker.CommitTransactions(
txmap, nil, testUserAddress,
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
if err != nil {
fmt.Println(err)
@ -205,7 +203,7 @@ func main() {
genesis := gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil)
txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain)
txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain, func([]types.RPCTransactionError) {})
backend := &testWorkerBackend{
db: database,

Loading…
Cancel
Save