From 1d48c7c42c018fb1003f7d53de184ed7e59255e3 Mon Sep 17 00:00:00 2001 From: Dennis Won Date: Fri, 9 Aug 2019 08:45:42 -0700 Subject: [PATCH] added throttling by num transactions per account past hour --- node/node.go | 16 ++++++++++------ node/worker/worker.go | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/node/node.go b/node/node.go index d7da00799..1c8b2df35 100644 --- a/node/node.go +++ b/node/node.go @@ -255,17 +255,21 @@ func (node *Node) AddPendingTransaction(newTx *types.Transaction) { // Note the pending transaction list will then contain the rest of the txs func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Transactions { node.pendingTxMutex.Lock() - selected, unselected, invalid, blockTxsCounts := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactions, core.ShardingSchedule.TxsThrottleConfig(), coinbase) - node.pendingTransactions = unselected - node.reducePendingTransactions() - node.recentTxsStats[node.Consensus.ChainReader.CurrentHeader().Number.Uint64()+1] = blockTxsCounts + // update recentTxsStats and initiailize for the new block + newBlockNum := node.Consensus.ChainReader.CurrentHeader().Number.Uint64() + 1 for blockNum := range node.recentTxsStats { - blockNumPastHour := (time.Hour / time.Second) / node.BlockPeriod - if blockNum < node.Consensus.ChainReader.CurrentHeader().Number.Uint64()-uint64(blockNumPastHour) { + blockNumHourAgo := (time.Hour / time.Second) / node.BlockPeriod + if blockNum < node.Consensus.ChainReader.CurrentHeader().Number.Uint64()-uint64(blockNumHourAgo) { delete(node.recentTxsStats, blockNum) } } + node.recentTxsStats[newBlockNum] = types.BlockTxsCounts{} + + selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, core.ShardingSchedule.TxsThrottleConfig(), coinbase) + + node.pendingTransactions = unselected + node.reducePendingTransactions() utils.Logger().Info(). Int("remainPending", len(node.pendingTransactions)). diff --git a/node/worker/worker.go b/node/worker/worker.go index 2279c309e..76032387d 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -46,7 +46,7 @@ type Worker struct { // Returns a tuple where the first value is the txs sender account address, // the second is the throttling result enum for the transaction of interest. // Throttling happens based on the amount, frequency, etc. -func (w *Worker) throttleTxs(selected types.Transactions, txsThrottleConfig *shardingconfig.TxsThrottleConfig, txnCnts types.BlockTxsCounts, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) { +func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) { chainID := tx.ChainID() // Depending on the presence of the chain ID, sign with EIP155 or homestead var s types.Signer @@ -82,7 +82,11 @@ func (w *Worker) throttleTxs(selected types.Transactions, txsThrottleConfig *sha } // throttle too large transaction - if txnCnts[sender] >= (*txsThrottleConfig).MaxTxsPerAccountInBlockLimit { + var numTxsPastHour uint64 + for _, blockTxsCounts := range recentTxsStats { + numTxsPastHour += blockTxsCounts[sender] + } + if numTxsPastHour >= (*txsThrottleConfig).MaxTxsPerAccountInBlockLimit { utils.Logger().Debug(). Uint64("MaxTxsPerAccountInBlockLimit", (*txsThrottleConfig).MaxTxsPerAccountInBlockLimit). Msg("Throttling tx with max txs per account in a single block limit") @@ -93,14 +97,11 @@ func (w *Worker) throttleTxs(selected types.Transactions, txsThrottleConfig *sha } // SelectTransactionsForNewBlock selects transactions for new block. -func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions, types.BlockTxsCounts) { +func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) { if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit) } - // used for per account transaction frequency throttling - txnCnts := types.BlockTxsCounts{} - selected := types.Transactions{} unselected := types.Transactions{} invalid := types.Transactions{} @@ -109,7 +110,7 @@ func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, txsThrott invalid = append(invalid, tx) } - sender, flag := w.throttleTxs(selected, txsThrottleConfig, txnCnts, tx) + sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx) switch flag { case shardingconfig.Unselect: unselected = append(unselected, tx) @@ -138,11 +139,11 @@ func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, txsThrott Msg("Transaction Throttle flag") } else { selected = append(selected, tx) - txnCnts[sender]++ + recentTxsStats[newBlockNum][sender]++ } } } - return selected, unselected, invalid, txnCnts + return selected, unselected, invalid } func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {