Fix error sink msg duplication & false positives (#2924)

* [types] Add TransactionErrorSink to report failed txs

[node] Create ErrorSink handler for unique error msgs

Implemented with a LRU cache.

[node] Rename ErrorSink to TransactionErrorSink

* Rename RPCTransactionError to TransactionError

[node] Make tx error sink not return err on Add & Remove

[node] Make tx errorSink Contains check take string as tx hash param

[errorsink] Move tx error sink into errorsink pkg

[errorsink] Rename Errors and Count methods

[errorsink] Rename NewTransactionErrorSink to NewTransactionSink

[types] Move error sink to core/types

* Rename NewTransactionSink to NewTransactionErrorSink

* [types] Fix log msg for unfound errors

* [types] Rename TransactionError to TransactionErrorReport

* [core] Remove RPCTransactionError & refactor tx_pool to use TxErrorSink

* [staking] Remove RPCTransactionError

* [node] Refactor tx_pool init to use new TxErrorSink

* [main] Construct transaction error sink before initing the node

* [node] Refactor error sink reporting at RPC layer

* [rpc] Refactor returned type of ErrorSink RPCs to 1 type

* [core] Remove tx from TxErrorSink on Add to tx_pool

* [types] Make NewTransactionErrorSink not return err

* [node] Make node.New create error sink
* [cmd] Revert to origin main.go

* [core] Add TxErrorSink unit test & fix bad ErrExcessiveBLSKeys in tests

* [testnet config] Change testnet config to allow for 5*4 external keys

* [cmd] Revert main.go to original node instantiation

* [rpc] Add GetPoolStats rpc to fetch pending and queued tx counts
pull/2934/head
Daniel Van Der Maden 5 years ago committed by GitHub
parent 7ed4f22dcf
commit 5d87fdb59e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 127
      core/tx_pool.go
  2. 139
      core/tx_pool_test.go
  3. 17
      core/types/transaction.go
  4. 160
      core/types/tx_errorsink.go
  5. 14
      hmy/api_backend.go
  6. 4
      hmy/backend.go
  7. 2
      internal/configs/sharding/testnet.go
  8. 6
      internal/hmyapi/apiv1/backend.go
  9. 13
      internal/hmyapi/apiv1/transactionpool.go
  10. 5
      internal/hmyapi/apiv2/backend.go
  11. 9
      internal/hmyapi/apiv2/transactionpool.go
  12. 5
      internal/hmyapi/backend.go
  13. 41
      node/node.go
  14. 29
      node/rpc.go
  15. 20
      staking/types/transaction.go
  16. 5
      test/chain/main.go

@ -250,33 +250,32 @@ type TxPool struct {
wg sync.WaitGroup // for shutdown sync
errorReporter *txPoolErrorReporter // The reporter for the tx error sinks
txErrorSink *types.TransactionErrorSink // All failed txs gets reported here
homestead bool
}
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain,
txnErrorSink func([]types.RPCTransactionError),
stakingTxnErrorSink func([]staking.RPCTransactionError),
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig,
chain blockChain, txErrorSink *types.TransactionErrorSink,
) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
errorReporter: newTxPoolErrorReporter(txnErrorSink, stakingTxnErrorSink),
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
txErrorSink: txErrorSink,
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
@ -864,6 +863,10 @@ func (pool *TxPool) validateStakingTx(tx *staking.StakingTransaction) error {
// the pool due to pricing constraints.
func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
logger := utils.Logger().With().Stack().Logger()
// If the transaction is in the error sink, remove it as it may succeed
if pool.txErrorSink.Contains(tx.Hash().String()) {
pool.txErrorSink.Remove(tx)
}
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
@ -1069,7 +1072,7 @@ func (pool *TxPool) addTx(tx types.PoolTransaction, local bool) error {
if err != nil {
errCause := errors.Cause(err)
if errCause != ErrKnownTransaction {
pool.errorReporter.add(tx, err)
pool.txErrorSink.Add(tx, err)
}
return errCause
}
@ -1078,10 +1081,6 @@ func (pool *TxPool) addTx(tx types.PoolTransaction, local bool) error {
from, _ := types.PoolTransactionSender(pool.signer, tx) // already validated
pool.promoteExecutables([]common.Address{from})
}
if err := pool.errorReporter.report(); err != nil {
utils.Logger().Error().Err(err).
Msg("could not report failed transactions in tx pool when adding 1 tx")
}
return nil
}
@ -1108,7 +1107,7 @@ func (pool *TxPool) addTxsLocked(txs types.PoolTransactions, local bool) []error
}
errCause := errors.Cause(err)
if err != nil && errCause != ErrKnownTransaction {
pool.errorReporter.add(tx, err)
pool.txErrorSink.Add(tx, err)
}
errs[i] = errCause
}
@ -1122,11 +1121,6 @@ func (pool *TxPool) addTxsLocked(txs types.PoolTransactions, local bool) []error
}
pool.promoteExecutables(addrs)
}
if err := pool.errorReporter.report(); err != nil {
utils.Logger().Error().Err(err).
Msg("could not report failed transactions in tx pool when adding txs")
}
return errs
}
@ -1182,7 +1176,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Postpone any invalidated transactions
for _, tx := range invalids {
if _, err := pool.enqueueTx(tx.Hash(), tx); err != nil {
pool.errorReporter.add(tx, err)
pool.txErrorSink.Add(tx, err)
}
}
// Update the account nonce if needed
@ -1199,11 +1193,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
delete(pool.queue, addr)
}
}
if err := pool.errorReporter.report(); err != nil {
utils.Logger().Error().Err(err).
Msg("could not report failed transactions in tx pool when removing tx from queue")
}
}
// promoteExecutables moves transactions that have become processable from the
@ -1259,7 +1248,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction")
pool.errorReporter.add(tx, fmt.Errorf("exceeds cap for queued transactions for account %s", addr.String()))
pool.txErrorSink.Add(tx, fmt.Errorf("exceeds cap for queued transactions for account %s", addr.String()))
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
@ -1308,7 +1297,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.errorReporter.add(tx, fmt.Errorf("fairness-exceeding pending transaction"))
pool.txErrorSink.Add(tx, fmt.Errorf("fairness-exceeding pending transaction"))
pool.all.Remove(hash)
pool.priced.Removed()
@ -1331,7 +1320,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.errorReporter.add(tx, fmt.Errorf("fairness-exceeding pending transaction"))
pool.txErrorSink.Add(tx, fmt.Errorf("fairness-exceeding pending transaction"))
pool.all.Remove(hash)
pool.priced.Removed()
@ -1372,7 +1361,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.errorReporter.add(tx, fmt.Errorf("exceeds global cap for queued transactions"))
pool.txErrorSink.Add(tx, fmt.Errorf("exceeds global cap for queued transactions"))
pool.removeTx(tx.Hash(), true)
}
drop -= size
@ -1382,18 +1371,13 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.errorReporter.add(txs[i], fmt.Errorf("exceeds global cap for queued transactions"))
pool.txErrorSink.Add(txs[i], fmt.Errorf("exceeds global cap for queued transactions"))
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
if err := pool.errorReporter.report(); err != nil {
logger.Error().Err(err).
Msg("could not report failed transactions in tx pool when promoting executables")
}
}
// demoteUnexecutables removes invalid and processed transactions from the pools
@ -1426,7 +1410,7 @@ func (pool *TxPool) demoteUnexecutables() {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Demoting pending transaction")
if _, err := pool.enqueueTx(hash, tx); err != nil {
pool.errorReporter.add(tx, err)
pool.txErrorSink.Add(tx, err)
}
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
@ -1435,7 +1419,7 @@ func (pool *TxPool) demoteUnexecutables() {
hash := tx.Hash()
logger.Error().Str("hash", hash.Hex()).Msg("Demoting invalidated transaction")
if _, err := pool.enqueueTx(hash, tx); err != nil {
pool.errorReporter.add(tx, err)
pool.txErrorSink.Add(tx, err)
}
}
}
@ -1444,64 +1428,9 @@ func (pool *TxPool) demoteUnexecutables() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
if err := pool.errorReporter.report(); err != nil {
logger.Error().Err(err).
Msg("could not report failed transactions in tx pool when demoting unexecutables")
}
}
}
// txPoolErrorReporter holds and reports transaction errors in the tx-pool.
// Format assumes that error i in errors corresponds to transaction i in transactions.
type txPoolErrorReporter struct {
transactions types.PoolTransactions
errors []error
txnErrorReportSink func([]types.RPCTransactionError)
stkTxnErrorReportSink func([]staking.RPCTransactionError)
}
func newTxPoolErrorReporter(txnErrorSink func([]types.RPCTransactionError),
stakingTxnErrorSink func([]staking.RPCTransactionError),
) *txPoolErrorReporter {
return &txPoolErrorReporter{
transactions: types.PoolTransactions{},
errors: []error{},
txnErrorReportSink: txnErrorSink,
stkTxnErrorReportSink: stakingTxnErrorSink,
}
}
func (txErrs *txPoolErrorReporter) add(tx types.PoolTransaction, err error) {
txErrs.transactions = append(txErrs.transactions, tx)
txErrs.errors = append(txErrs.errors, err)
}
func (txErrs *txPoolErrorReporter) reset() {
txErrs.transactions = types.PoolTransactions{}
txErrs.errors = []error{}
}
// report errors thrown in the tx pool to the appropriate error sink.
// It resets the held errors after the errors are reported to the sink.
func (txErrs *txPoolErrorReporter) report() error {
plainTxErrors := []types.RPCTransactionError{}
stakingTxErrors := []staking.RPCTransactionError{}
for i, tx := range txErrs.transactions {
if plainTx, ok := tx.(*types.Transaction); ok {
plainTxErrors = append(plainTxErrors, types.NewRPCTransactionError(plainTx.Hash(), txErrs.errors[i]))
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
stakingTxErrors = append(stakingTxErrors, staking.NewRPCTransactionError(stakingTx.Hash(), stakingTx.StakingType(), txErrs.errors[i]))
} else {
return types.ErrUnknownPoolTxType
}
}
txErrs.txnErrorReportSink(plainTxErrors)
txErrs.stkTxnErrorReportSink(stakingTxErrors)
txErrs.reset()
return nil
}
// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address

@ -50,9 +50,10 @@ var (
testBLSPubKey = "30b2c38b1316da91e068ac3bd8751c0901ef6c02a1d58bc712104918302c6ed03d5894671d0c816dad2b4d303320f202"
testBLSPrvKey = "c6d7603520311f7a4e6aac0b26701fc433b75b38df504cd416ef2b900cd66205"
gasPrice = big.NewInt(1e9)
gasLimit = big.NewInt(int64(params.TxGasValidatorCreation))
cost = big.NewInt(1).Mul(gasPrice, gasLimit)
gasPrice = big.NewInt(1e9)
gasLimit = big.NewInt(int64(params.TxGasValidatorCreation))
cost = big.NewInt(1).Mul(gasPrice, gasLimit)
dummyErrorSink = types.NewTransactionErrorSink()
)
func init() {
@ -163,8 +164,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
blockchain := &testBlockChain{statedb, 1e18, new(event.Feed)}
key, _ := crypto.GenerateKey()
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, dummyErrorSink)
return pool, key
}
@ -246,8 +246,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
tx0 := transaction(0, 0, 100000, key)
tx1 := transaction(0, 1, 100000, key)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
nonce := pool.State().GetNonce(address)
@ -319,6 +318,78 @@ func TestInvalidTransactions(t *testing.T) {
}
}
func TestErrorSink(t *testing.T) {
t.Parallel()
pool, key := setupTxPool()
pool.chain = createBlockChain()
defer pool.Stop()
testTxErrorSink := types.NewTransactionErrorSink()
pool.txErrorSink = testTxErrorSink
tx := transaction(0, 0, 100, key)
from, _ := deriveSender(tx)
stxKey, _ := crypto.GenerateKey()
stx, err := stakingCreateValidatorTransaction(stxKey)
if err != nil {
t.Errorf("cannot create new staking transaction, %v\n", err)
}
fromStx, _ := stx.SenderAddress()
pool.currentState.SetNonce(from, 1)
pool.currentState.AddBalance(from, big.NewInt(0xffffffffffffff))
tx = transaction(0, 0, 100000, key)
if err := pool.AddRemote(tx); err != ErrNonceTooLow {
t.Error("expected", ErrNonceTooLow)
}
if !testTxErrorSink.Contains(tx.Hash().String()) {
t.Error("expected errored transaction in tx pool")
}
pool.currentState.SetNonce(from, 0)
tx = transaction(0, 0, 100000, key)
if err := pool.AddRemote(tx); err != nil {
t.Error("expected successful transaction got", err)
}
if testTxErrorSink.Contains(tx.Hash().String()) {
t.Error("expected successful transaction to not be in error sink")
}
pool.currentState.SetNonce(from, 2)
tx = transaction(0, 2, 100000, key)
pool.currentState.SetBalance(from, big.NewInt(0x0))
pool.currentState.SetBalance(fromStx, big.NewInt(0x0))
if err := pool.AddRemote(tx); err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds)
}
if err := pool.AddRemote(stx); err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds)
}
if !testTxErrorSink.Contains(tx.Hash().String()) {
t.Error("expected errored transaction in tx pool")
}
if !testTxErrorSink.Contains(stx.Hash().String()) {
t.Error("expected errored transaction in tx pool")
}
pool.currentState.SetBalance(from, twelveK)
pool.currentState.SetBalance(fromStx, twelveK)
if err := pool.AddRemote(tx); err != nil {
t.Error("expected successful transaction got", err)
}
if err := pool.AddRemote(stx); err != nil {
t.Error("expected successful transaction got", err)
}
if testTxErrorSink.Contains(tx.Hash().String()) {
t.Error("expected successful transaction to not be in error sink")
}
if testTxErrorSink.Contains(stx.Hash().String()) {
t.Error("expected successful transaction to not be in error sink")
}
}
func TestCreateValidatorTransaction(t *testing.T) {
t.Parallel()
@ -336,15 +407,13 @@ func TestCreateValidatorTransaction(t *testing.T) {
// Add additional create validator tx cost
pool.currentState.AddBalance(senderAddr, cost)
// TODO remove the exception on more slot keys than allowed
if err = pool.AddRemote(stx); err != nil && err != staking.ErrExcessiveBLSKeys {
if err = pool.AddRemote(stx); err != nil {
t.Error(err.Error())
}
// TODO Comment back in after the fix of previous TODO
// if pool.pending[senderAddr] == nil || pool.pending[senderAddr].Len() != 1 {
// t.Error("Expected 1 pending transaction")
// }
if pool.pending[senderAddr] == nil || pool.pending[senderAddr].Len() != 1 {
t.Error("Expected 1 pending transaction")
}
}
func TestMixedTransactions(t *testing.T) {
@ -371,15 +440,14 @@ func TestMixedTransactions(t *testing.T) {
errs := pool.AddRemotes(types.PoolTransactions{stx, tx})
for _, err := range errs {
// TODO remove the exception on more slot keys than allowed
if err != nil && err != staking.ErrExcessiveBLSKeys {
if err != nil {
t.Error(err)
}
}
// TODO Comment back in after the fix of previous TODO
// if pool.pending[stxAddr] == nil || pool.pending[stxAddr].Len() != 0 {
// t.Error("Expected 1 pending transaction")
// }
if pool.pending[stxAddr] == nil || pool.pending[stxAddr].Len() != 1 {
t.Error("Expected 1 pending transaction")
}
}
func TestBlacklistedTransactions(t *testing.T) {
@ -733,8 +801,7 @@ func TestTransactionPostponing(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create two test accounts to produce different gap profiles with
@ -896,8 +963,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
config.NoLocals = nolocals
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
pool := NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create a number of test accounts and fund them (last one will be the local)
@ -987,8 +1053,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
config.Lifetime = time.Second
config.NoLocals = nolocals
pool := NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create two test accounts to ensure remotes expire but locals do not
@ -1102,8 +1167,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
config := testTxPoolConfig
config.GlobalSlots = config.AccountSlots * 10
pool := NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create a number of test accounts and fund them
@ -1151,8 +1215,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
config.AccountQueue = 2
config.GlobalSlots = 8
pool := NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create a number of test accounts and fund them
@ -1184,8 +1247,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
config := testTxPoolConfig
config.GlobalSlots = 0
pool := NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create a number of test accounts and fund them
@ -1227,8 +1289,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create a number of test accounts and fund them
@ -1307,8 +1368,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
config.Journal = journal
config.Rejournal = time.Second
pool := NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
@ -1345,8 +1405,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool = NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool = NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
pending, queued = pool.Stats()
if queued != 0 {
@ -1372,8 +1431,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool = NewTxPool(config, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool = NewTxPool(config, params.TestChainConfig, blockchain, dummyErrorSink)
pending, queued = pool.Stats()
if pending != 0 {
@ -1403,8 +1461,7 @@ func TestTransactionStatusCheck(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, dummyErrorSink)
defer pool.Stop()
// Create the test accounts to check various transaction statuses with

@ -23,7 +23,6 @@ import (
"io"
"math/big"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -71,22 +70,6 @@ type Transaction struct {
from atomic.Value
}
// RPCTransactionError ..
type RPCTransactionError struct {
TxHashID string `json:"tx-hash-id"`
TimestampOfRejection int64 `json:"time-at-rejection"`
ErrMessage string `json:"error-message"`
}
// NewRPCTransactionError ...
func NewRPCTransactionError(hash common.Hash, err error) RPCTransactionError {
return RPCTransactionError{
TxHashID: hash.Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
}
}
//String print mode string
func (txType TransactionType) String() string {
if txType == SameShardTx {

@ -0,0 +1,160 @@
package types
import (
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/harmony-one/harmony/internal/utils"
staking "github.com/harmony-one/harmony/staking/types"
)
const (
plainTxSinkLimit = 1024
stakingTxSinkLimit = 1024
logTag = "[TransactionErrorSink]"
)
// TransactionErrorReport ..
type TransactionErrorReport struct {
TxHashID string `json:"tx-hash-id"`
StakingDirective string `json:"directive-kind,omitempty"`
TimestampOfRejection int64 `json:"time-at-rejection"`
ErrMessage string `json:"error-message"`
}
// TransactionErrorReports ..
type TransactionErrorReports []*TransactionErrorReport
// TransactionErrorSink is where all failed transactions get reported.
// Note that the keys of the lru caches are tx-hash strings.
type TransactionErrorSink struct {
failedPlainTxs *lru.Cache
failedStakingTxs *lru.Cache
}
// NewTransactionErrorSink ..
func NewTransactionErrorSink() *TransactionErrorSink {
failedPlainTx, _ := lru.New(plainTxSinkLimit)
failedStakingTx, _ := lru.New(stakingTxSinkLimit)
return &TransactionErrorSink{
failedPlainTxs: failedPlainTx,
failedStakingTxs: failedStakingTx,
}
}
// Add a transaction to the error sink with the given error
func (sink *TransactionErrorSink) Add(tx PoolTransaction, err error) {
// no-op if no error is provided
if err == nil {
return
}
if plainTx, ok := tx.(*Transaction); ok {
hash := plainTx.Hash().String()
sink.failedPlainTxs.Add(hash, &TransactionErrorReport{
TxHashID: hash,
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
})
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Msgf("Added plain transaction error message")
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
hash := stakingTx.Hash().String()
sink.failedStakingTxs.Add(hash, &TransactionErrorReport{
TxHashID: hash,
StakingDirective: stakingTx.StakingType().String(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
})
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Msgf("Added staking transaction error message")
} else {
utils.Logger().Error().
Str("tag", logTag).
Interface("tx", tx).
Msg("Attempted to add an unknown transaction type")
}
}
// Contains checks if there is an error associated with the given hash
// Note that the keys of the lru caches are tx-hash strings.
func (sink *TransactionErrorSink) Contains(hash string) bool {
return sink.failedPlainTxs.Contains(hash) || sink.failedStakingTxs.Contains(hash)
}
// Remove a transaction's error from the error sink
func (sink *TransactionErrorSink) Remove(tx PoolTransaction) {
if plainTx, ok := tx.(*Transaction); ok {
hash := plainTx.Hash().String()
present := sink.failedPlainTxs.Remove(hash)
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Bool("was-in-error-sink", present).
Msgf("Removed plain transaction error message")
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
hash := stakingTx.Hash().String()
present := sink.failedStakingTxs.Remove(hash)
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Bool("was-in-error-sink", present).
Msgf("Removed staking transaction error message")
} else {
utils.Logger().Error().
Str("tag", logTag).
Interface("tx", tx).
Msg("Attempted to remove an unknown transaction type")
}
}
// PlainReport ..
func (sink *TransactionErrorSink) PlainReport() TransactionErrorReports {
return reportErrorsFromLruCache(sink.failedPlainTxs)
}
// StakingReport ..
func (sink *TransactionErrorSink) StakingReport() TransactionErrorReports {
return reportErrorsFromLruCache(sink.failedStakingTxs)
}
// PlainCount ..
func (sink *TransactionErrorSink) PlainCount() int {
return sink.failedPlainTxs.Len()
}
// StakingCount ..
func (sink *TransactionErrorSink) StakingCount() int {
return sink.failedStakingTxs.Len()
}
// reportErrorsFromLruCache is a helper for reporting errors
// from the TransactionErrorSink's lru cache. Do not use this function directly,
// use the respective public methods of TransactionErrorSink.
func reportErrorsFromLruCache(lruCache *lru.Cache) TransactionErrorReports {
rpcErrors := TransactionErrorReports{}
for _, txHash := range lruCache.Keys() {
rpcErrorFetch, ok := lruCache.Get(txHash)
if !ok {
utils.Logger().Warn().
Str("tag", logTag).
Interface("tx-hash-id", txHash).
Msgf("Error not found in sink")
continue
}
rpcError, ok := rpcErrorFetch.(*TransactionErrorReport)
if !ok {
utils.Logger().Error().
Str("tag", logTag).
Interface("tx-hash-id", txHash).
Msgf("Invalid type of value in sink")
continue
}
rpcErrors = append(rpcErrors, rpcError)
}
return rpcErrors
}

@ -229,7 +229,6 @@ func (b *APIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscripti
}
// GetPoolTransactions returns pool transactions.
// TODO: this is not implemented or verified yet for harmony.
func (b *APIBackend) GetPoolTransactions() (types.PoolTransactions, error) {
pending, err := b.hmy.txPool.Pending()
if err != nil {
@ -242,6 +241,11 @@ func (b *APIBackend) GetPoolTransactions() (types.PoolTransactions, error) {
return txs, nil
}
// GetPoolStats returns the number of pending and queued transactions
func (b *APIBackend) GetPoolStats() (pendingCount, queuedCount int) {
return b.hmy.txPool.Stats()
}
// GetAccountNonce returns the nonce value of the given address for the given block number
func (b *APIBackend) GetAccountNonce(
ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (uint64, error) {
@ -617,13 +621,13 @@ func (b *APIBackend) GetShardState() (*shard.State, error) {
}
// GetCurrentStakingErrorSink ..
func (b *APIBackend) GetCurrentStakingErrorSink() []staking.RPCTransactionError {
return b.hmy.nodeAPI.ErroredStakingTransactionSink()
func (b *APIBackend) GetCurrentStakingErrorSink() types.TransactionErrorReports {
return b.hmy.nodeAPI.ReportStakingErrorSink()
}
// GetCurrentTransactionErrorSink ..
func (b *APIBackend) GetCurrentTransactionErrorSink() []types.RPCTransactionError {
return b.hmy.nodeAPI.ErroredTransactionSink()
func (b *APIBackend) GetCurrentTransactionErrorSink() types.TransactionErrorReports {
return b.hmy.nodeAPI.ReportPlainErrorSink()
}
// GetPendingCXReceipts ..

@ -48,8 +48,8 @@ type NodeAPI interface {
GetTransactionsCount(address, txType string) (uint64, error)
GetStakingTransactionsCount(address, txType string) (uint64, error)
IsCurrentlyLeader() bool
ErroredStakingTransactionSink() []staking.RPCTransactionError
ErroredTransactionSink() []types.RPCTransactionError
ReportStakingErrorSink() types.TransactionErrorReports
ReportPlainErrorSink() types.TransactionErrorReports
PendingCXReceipts() []*types.CXReceiptsProof
GetNodeBootTime() int64
}

@ -82,5 +82,5 @@ var testnetReshardingEpoch = []*big.Int{
params.TestnetChainConfig.StakingEpoch,
}
var testnetV0 = MustNewInstance(4, 25, 25, numeric.OneDec(), genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, testnetReshardingEpoch, TestnetSchedule.BlocksPerEpoch())
var testnetV0 = MustNewInstance(4, 30, 25, numeric.OneDec(), genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, testnetReshardingEpoch, TestnetSchedule.BlocksPerEpoch())
var testnetV1 = MustNewInstance(4, 50, 25, numeric.MustNewDecFromStr("0.68"), genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, testnetReshardingEpoch, TestnetSchedule.BlocksPerEpoch())

@ -51,10 +51,10 @@ type Backend interface {
// GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
GetPoolTransactions() (types.PoolTransactions, error)
GetPoolTransaction(txHash common.Hash) types.PoolTransaction
GetPoolStats() (pendingCount, queuedCount int)
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
// Get account nonce
GetAccountNonce(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (uint64, error)
// Stats() (pending int, queued int)
// TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
ChainConfig() *params.ChainConfig
@ -79,8 +79,8 @@ type Backend interface {
GetDelegationsByDelegator(delegator common.Address) ([]common.Address, []*staking.Delegation)
GetValidatorSelfDelegation(addr common.Address) *big.Int
GetShardState() (*shard.State, error)
GetCurrentStakingErrorSink() []staking.RPCTransactionError
GetCurrentTransactionErrorSink() []types.RPCTransactionError
GetCurrentStakingErrorSink() types.TransactionErrorReports
GetCurrentTransactionErrorSink() types.TransactionErrorReports
GetMedianRawStakeSnapshot() (*committee.CompletedEPoSRound, error)
GetPendingCXReceipts() []*types.CXReceiptsProof
GetCurrentUtilityMetrics() (*network.UtilityMetric, error)

@ -337,6 +337,15 @@ func (s *PublicTransactionPoolAPI) GetTransactionReceipt(ctx context.Context, ha
return fields, nil
}
// GetPoolStats returns stats for the tx-pool
func (s *PublicTransactionPoolAPI) GetPoolStats() map[string]interface{} {
pendingCount, queuedCount := s.b.GetPoolStats()
return map[string]interface{}{
"executable-count": pendingCount,
"non-executable-count": queuedCount,
}
}
// PendingTransactions returns the plain transactions that are in the transaction pool
func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, error) {
pending, err := s.b.GetPoolTransactions()
@ -376,12 +385,12 @@ func (s *PublicTransactionPoolAPI) PendingStakingTransactions() ([]*RPCStakingTr
}
// GetCurrentTransactionErrorSink ..
func (s *PublicTransactionPoolAPI) GetCurrentTransactionErrorSink() []types.RPCTransactionError {
func (s *PublicTransactionPoolAPI) GetCurrentTransactionErrorSink() types.TransactionErrorReports {
return s.b.GetCurrentTransactionErrorSink()
}
// GetCurrentStakingErrorSink ..
func (s *PublicTransactionPoolAPI) GetCurrentStakingErrorSink() []staking.RPCTransactionError {
func (s *PublicTransactionPoolAPI) GetCurrentStakingErrorSink() types.TransactionErrorReports {
return s.b.GetCurrentStakingErrorSink()
}

@ -51,6 +51,7 @@ type Backend interface {
SendTx(ctx context.Context, signedTx *types.Transaction) error
GetPoolTransactions() (types.PoolTransactions, error)
GetPoolTransaction(txHash common.Hash) types.PoolTransaction
GetPoolStats() (pendingCount, queuedCount int)
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
GetAccountNonce(ctx context.Context, addr common.Address, blockNr rpc.BlockNumber) (uint64, error)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
@ -74,8 +75,8 @@ type Backend interface {
GetDelegationsByDelegator(delegator common.Address) ([]common.Address, []*staking.Delegation)
GetValidatorSelfDelegation(addr common.Address) *big.Int
GetShardState() (*shard.State, error)
GetCurrentStakingErrorSink() []staking.RPCTransactionError
GetCurrentTransactionErrorSink() []types.RPCTransactionError
GetCurrentStakingErrorSink() types.TransactionErrorReports
GetCurrentTransactionErrorSink() types.TransactionErrorReports
GetMedianRawStakeSnapshot() (*committee.CompletedEPoSRound, error)
GetPendingCXReceipts() []*types.CXReceiptsProof
GetCurrentUtilityMetrics() (*network.UtilityMetric, error)

@ -361,6 +361,11 @@ func (s *PublicTransactionPoolAPI) GetTransactionReceipt(ctx context.Context, ha
return fields, nil
}
// GetPoolStats returns stats for the tx-pool
func (s *PublicTransactionPoolAPI) GetPoolStats() (pendingCount, queuedCount int) {
return s.b.GetPoolStats()
}
// PendingTransactions returns the plain transactions that are in the transaction pool
// and have a from address that is one of the accounts this node manages.
func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, error) {
@ -418,12 +423,12 @@ func (s *PublicTransactionPoolAPI) PendingStakingTransactions() ([]*RPCStakingTr
}
// GetCurrentTransactionErrorSink ..
func (s *PublicTransactionPoolAPI) GetCurrentTransactionErrorSink() []types.RPCTransactionError {
func (s *PublicTransactionPoolAPI) GetCurrentTransactionErrorSink() types.TransactionErrorReports {
return s.b.GetCurrentTransactionErrorSink()
}
// GetCurrentStakingErrorSink ..
func (s *PublicTransactionPoolAPI) GetCurrentStakingErrorSink() []staking.RPCTransactionError {
func (s *PublicTransactionPoolAPI) GetCurrentStakingErrorSink() types.TransactionErrorReports {
return s.b.GetCurrentStakingErrorSink()
}

@ -46,6 +46,7 @@ type Backend interface {
SendTx(ctx context.Context, signedTx *types.Transaction) error
GetPoolTransactions() (types.PoolTransactions, error)
GetPoolTransaction(txHash common.Hash) types.PoolTransaction
GetPoolStats() (pendingCount, queuedCount int)
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
GetAccountNonce(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (uint64, error)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
@ -68,8 +69,8 @@ type Backend interface {
GetDelegationsByDelegator(delegator common.Address) ([]common.Address, []*staking.Delegation)
GetValidatorSelfDelegation(addr common.Address) *big.Int
GetShardState() (*shard.State, error)
GetCurrentStakingErrorSink() []staking.RPCTransactionError
GetCurrentTransactionErrorSink() []types.RPCTransactionError
GetCurrentStakingErrorSink() types.TransactionErrorReports
GetCurrentTransactionErrorSink() types.TransactionErrorReports
GetMedianRawStakeSnapshot() (*committee.CompletedEPoSRound, error)
GetPendingCXReceipts() []*types.CXReceiptsProof
GetCurrentUtilityMetrics() (*network.UtilityMetric, error)

@ -1,7 +1,6 @@
package node
import (
"container/ring"
"context"
"crypto/ecdsa"
"fmt"
@ -148,19 +147,15 @@ type Node struct {
// Chain configuration.
chainConfig params.ChainConfig
// map of service type to its message channel.
serviceMessageChan map[service.Type]chan *msg_pb.Message
isFirstTime bool // the node was started with a fresh database
// Last 1024 staking transaction error, only in memory
errorSink struct {
sync.Mutex
failedStakingTxns *ring.Ring
failedTxns *ring.Ring
}
serviceMessageChan map[service.Type]chan *msg_pb.Message
isFirstTime bool // the node was started with a fresh database
unixTimeAtNodeStart int64
// KeysToAddrs holds the addresses of bls keys run by the node
KeysToAddrs map[string]common.Address
keysToAddrsEpoch *big.Int
keysToAddrsMutex sync.Mutex
// TransactionErrorSink contains error messages for any failed transaction, in memory only
TransactionErrorSink *types.TransactionErrorSink
}
// Blockchain returns the blockchain for the node's current shard.
@ -432,12 +427,7 @@ func New(
) *Node {
node := Node{}
node.unixTimeAtNodeStart = time.Now().Unix()
const sinkSize = 4096
node.errorSink = struct {
sync.Mutex
failedStakingTxns *ring.Ring
failedTxns *ring.Ring
}{sync.Mutex{}, ring.New(sinkSize), ring.New(sinkSize)}
node.TransactionErrorSink = types.NewTransactionErrorSink()
// Get the node config that's created in the harmony.go program.
if consensusObj != nil {
node.NodeConfig = nodeconfig.GetShardConfig(consensusObj.ShardID)
@ -489,26 +479,7 @@ func New(
node.BeaconBlockChannel = make(chan *types.Block)
txPoolConfig := core.DefaultTxPoolConfig
txPoolConfig.Blacklist = blacklist
node.TxPool = core.NewTxPool(txPoolConfig, node.Blockchain().Config(), blockchain,
func(payload []types.RPCTransactionError) {
if len(payload) > 0 {
node.errorSink.Lock()
for i := range payload {
node.errorSink.failedTxns.Value = payload[i]
node.errorSink.failedTxns = node.errorSink.failedTxns.Next()
}
node.errorSink.Unlock()
}
},
func(payload []staking.RPCTransactionError) {
node.errorSink.Lock()
for i := range payload {
node.errorSink.failedStakingTxns.Value = payload[i]
node.errorSink.failedStakingTxns = node.errorSink.failedStakingTxns.Next()
}
node.errorSink.Unlock()
},
)
node.TxPool = core.NewTxPool(txPoolConfig, node.Blockchain().Config(), blockchain, node.TransactionErrorSink)
node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine)

@ -16,7 +16,6 @@ import (
"github.com/harmony-one/harmony/internal/hmyapi/apiv2"
"github.com/harmony-one/harmony/internal/hmyapi/filters"
"github.com/harmony-one/harmony/internal/utils"
staking "github.com/harmony-one/harmony/staking/types"
)
const (
@ -55,17 +54,9 @@ func (node *Node) PendingCXReceipts() []*types.CXReceiptsProof {
return cxReceipts
}
// ErroredStakingTransactionSink is the inmemory failed staking transactions this node has
func (node *Node) ErroredStakingTransactionSink() []staking.RPCTransactionError {
node.errorSink.Lock()
defer node.errorSink.Unlock()
result := []staking.RPCTransactionError{}
node.errorSink.failedStakingTxns.Do(func(d interface{}) {
if d != nil {
result = append(result, d.(staking.RPCTransactionError))
}
})
return result
// ReportStakingErrorSink is the report of failed staking transactions this node has (held inmemory only)
func (node *Node) ReportStakingErrorSink() types.TransactionErrorReports {
return node.TransactionErrorSink.StakingReport()
}
// GetNodeBootTime ..
@ -73,17 +64,9 @@ func (node *Node) GetNodeBootTime() int64 {
return node.unixTimeAtNodeStart
}
// ErroredTransactionSink is the inmemory failed transactions this node has
func (node *Node) ErroredTransactionSink() []types.RPCTransactionError {
node.errorSink.Lock()
defer node.errorSink.Unlock()
result := []types.RPCTransactionError{}
node.errorSink.failedTxns.Do(func(d interface{}) {
if d != nil {
result = append(result, d.(types.RPCTransactionError))
}
})
return result
// ReportPlainErrorSink is the report of failed transactions this node has (held inmemory only)
func (node *Node) ReportPlainErrorSink() types.TransactionErrorReports {
return node.TransactionErrorSink.PlainReport()
}
// StartRPC start RPC service

@ -5,10 +5,10 @@ import (
"io"
"math/big"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
@ -74,24 +74,6 @@ type StakingTransaction struct {
from atomic.Value
}
// RPCTransactionError ..
type RPCTransactionError struct {
TxHashID string `json:"tx-hash-id"`
StakingDirective string `json:"directive-kind"`
TimestampOfRejection int64 `json:"time-at-rejection"`
ErrMessage string `json:"error-message"`
}
// NewRPCTransactionError ...
func NewRPCTransactionError(hash common.Hash, directive Directive, err error) RPCTransactionError {
return RPCTransactionError{
TxHashID: hash.Hex(),
StakingDirective: directive.String(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
}
}
// StakeMsgFulfiller is signature of callback intended to produce the StakeMsg
type StakeMsgFulfiller func() (Directive, interface{})

@ -17,7 +17,6 @@ import (
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/params"
pkgworker "github.com/harmony-one/harmony/node/worker"
staking "github.com/harmony-one/harmony/staking/types"
)
const (
@ -197,9 +196,7 @@ func playFaucetContract(chain *core.BlockChain) {
func main() {
genesis := gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil)
txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain,
func([]types.RPCTransactionError) {}, func([]staking.RPCTransactionError) {})
txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain, types.NewTransactionErrorSink())
backend := &testWorkerBackend{
db: database,

Loading…
Cancel
Save