added throttling by num transactions per account past hour

pull/1319/head
Dennis Won 5 years ago
parent c632fd6c3b
commit 1d48c7c42c
  1. 16
      node/node.go
  2. 19
      node/worker/worker.go

@ -255,17 +255,21 @@ func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
// Note the pending transaction list will then contain the rest of the txs // Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Transactions { func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Transactions {
node.pendingTxMutex.Lock() node.pendingTxMutex.Lock()
selected, unselected, invalid, blockTxsCounts := node.Worker.SelectTransactionsForNewBlock(node.pendingTransactions, core.ShardingSchedule.TxsThrottleConfig(), coinbase)
node.pendingTransactions = unselected // update recentTxsStats and initiailize for the new block
node.reducePendingTransactions() newBlockNum := node.Consensus.ChainReader.CurrentHeader().Number.Uint64() + 1
node.recentTxsStats[node.Consensus.ChainReader.CurrentHeader().Number.Uint64()+1] = blockTxsCounts
for blockNum := range node.recentTxsStats { for blockNum := range node.recentTxsStats {
blockNumPastHour := (time.Hour / time.Second) / node.BlockPeriod blockNumHourAgo := (time.Hour / time.Second) / node.BlockPeriod
if blockNum < node.Consensus.ChainReader.CurrentHeader().Number.Uint64()-uint64(blockNumPastHour) { if blockNum < node.Consensus.ChainReader.CurrentHeader().Number.Uint64()-uint64(blockNumHourAgo) {
delete(node.recentTxsStats, blockNum) delete(node.recentTxsStats, blockNum)
} }
} }
node.recentTxsStats[newBlockNum] = types.BlockTxsCounts{}
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, node.pendingTransactions, node.recentTxsStats, core.ShardingSchedule.TxsThrottleConfig(), coinbase)
node.pendingTransactions = unselected
node.reducePendingTransactions()
utils.Logger().Info(). utils.Logger().Info().
Int("remainPending", len(node.pendingTransactions)). Int("remainPending", len(node.pendingTransactions)).

@ -46,7 +46,7 @@ type Worker struct {
// Returns a tuple where the first value is the txs sender account address, // Returns a tuple where the first value is the txs sender account address,
// the second is the throttling result enum for the transaction of interest. // the second is the throttling result enum for the transaction of interest.
// Throttling happens based on the amount, frequency, etc. // Throttling happens based on the amount, frequency, etc.
func (w *Worker) throttleTxs(selected types.Transactions, txsThrottleConfig *shardingconfig.TxsThrottleConfig, txnCnts types.BlockTxsCounts, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) { func (w *Worker) throttleTxs(selected types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, tx *types.Transaction) (common.Address, shardingconfig.TxThrottleFlag) {
chainID := tx.ChainID() chainID := tx.ChainID()
// Depending on the presence of the chain ID, sign with EIP155 or homestead // Depending on the presence of the chain ID, sign with EIP155 or homestead
var s types.Signer var s types.Signer
@ -82,7 +82,11 @@ func (w *Worker) throttleTxs(selected types.Transactions, txsThrottleConfig *sha
} }
// throttle too large transaction // throttle too large transaction
if txnCnts[sender] >= (*txsThrottleConfig).MaxTxsPerAccountInBlockLimit { var numTxsPastHour uint64
for _, blockTxsCounts := range recentTxsStats {
numTxsPastHour += blockTxsCounts[sender]
}
if numTxsPastHour >= (*txsThrottleConfig).MaxTxsPerAccountInBlockLimit {
utils.Logger().Debug(). utils.Logger().Debug().
Uint64("MaxTxsPerAccountInBlockLimit", (*txsThrottleConfig).MaxTxsPerAccountInBlockLimit). Uint64("MaxTxsPerAccountInBlockLimit", (*txsThrottleConfig).MaxTxsPerAccountInBlockLimit).
Msg("Throttling tx with max txs per account in a single block limit") Msg("Throttling tx with max txs per account in a single block limit")
@ -93,14 +97,11 @@ func (w *Worker) throttleTxs(selected types.Transactions, txsThrottleConfig *sha
} }
// SelectTransactionsForNewBlock selects transactions for new block. // SelectTransactionsForNewBlock selects transactions for new block.
func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions, types.BlockTxsCounts) { func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Transactions, recentTxsStats types.RecentTxsStats, txsThrottleConfig *shardingconfig.TxsThrottleConfig, coinbase common.Address) (types.Transactions, types.Transactions, types.Transactions) {
if w.current.gasPool == nil { if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit) w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
} }
// used for per account transaction frequency throttling
txnCnts := types.BlockTxsCounts{}
selected := types.Transactions{} selected := types.Transactions{}
unselected := types.Transactions{} unselected := types.Transactions{}
invalid := types.Transactions{} invalid := types.Transactions{}
@ -109,7 +110,7 @@ func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, txsThrott
invalid = append(invalid, tx) invalid = append(invalid, tx)
} }
sender, flag := w.throttleTxs(selected, txsThrottleConfig, txnCnts, tx) sender, flag := w.throttleTxs(selected, recentTxsStats, txsThrottleConfig, tx)
switch flag { switch flag {
case shardingconfig.Unselect: case shardingconfig.Unselect:
unselected = append(unselected, tx) unselected = append(unselected, tx)
@ -138,11 +139,11 @@ func (w *Worker) SelectTransactionsForNewBlock(txs types.Transactions, txsThrott
Msg("Transaction Throttle flag") Msg("Transaction Throttle flag")
} else { } else {
selected = append(selected, tx) selected = append(selected, tx)
txnCnts[sender]++ recentTxsStats[newBlockNum][sender]++
} }
} }
} }
return selected, unselected, invalid, txnCnts return selected, unselected, invalid
} }
func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {

Loading…
Cancel
Save