Penalize invalid transient pending transactions in the layered transaction pool (#7359)

* Introduce score for pending transactions

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Introduce score for pending transactions

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Update package javadoc

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

---------

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/7385/head
Fabio Di Fabio 4 months ago committed by GitHub
parent e57c811e47
commit 30dfa66c1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 21
      ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java
  2. 3
      ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/selectors/ProcessingResultTransactionSelector.java
  3. 37
      ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java
  4. 48
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransaction.java
  5. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java
  6. 65
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java
  7. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java
  8. 30
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java
  9. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java
  10. 29
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/GasPricePrioritizedTransactions.java
  11. 39
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java
  12. 34
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java
  13. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java
  14. 13
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java
  15. 27
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/package-info.java
  16. 106
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java
  17. 2
      plugin-api/build.gradle
  18. 49
      plugin-api/src/main/java/org/hyperledger/besu/plugin/data/TransactionSelectionResult.java

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.blockcreation.txselection;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;
@ -419,11 +420,14 @@ public class BlockTransactionSelector {
final var pendingTransaction = evaluationContext.getPendingTransaction();
// check if this tx took too much to evaluate, and in case remove it from the pool
// check if this tx took too much to evaluate, and in case it was invalid remove it from the
// pool, otherwise penalize it.
final TransactionSelectionResult actualResult =
isTimeout.get()
? transactionTookTooLong(evaluationContext)
? TX_EVALUATION_TOO_LONG
? transactionTookTooLong(evaluationContext, selectionResult)
? selectionResult.discard()
? INVALID_TX_EVALUATION_TOO_LONG
: TX_EVALUATION_TOO_LONG
: BLOCK_SELECTION_TIMEOUT
: selectionResult;
@ -441,16 +445,21 @@ public class BlockTransactionSelector {
return actualResult;
}
private boolean transactionTookTooLong(final TransactionEvaluationContext evaluationContext) {
private boolean transactionTookTooLong(
final TransactionEvaluationContext evaluationContext,
final TransactionSelectionResult selectionResult) {
final var evaluationTimer = evaluationContext.getEvaluationTimer();
if (evaluationTimer.elapsed(TimeUnit.MILLISECONDS) > blockTxsSelectionMaxTime) {
LOG.atWarn()
.setMessage(
"Transaction {} is too late for inclusion, evaluated in {} that is over the max limit of {}ms"
+ ", removing it from the pool")
"Transaction {} is too late for inclusion, with result {}, evaluated in {} that is over the max limit of {}ms"
+ ", {}")
.addArgument(evaluationContext.getPendingTransaction()::getHash)
.addArgument(selectionResult)
.addArgument(evaluationTimer)
.addArgument(blockTxsSelectionMaxTime)
.addArgument(
selectionResult.discard() ? "removing it from the pool" : "penalizing it in the pool")
.log();
return true;
}

@ -110,7 +110,8 @@ public class ProcessingResultTransactionSelector extends AbstractTransactionSele
* @return True if the invalid reason is transient, false otherwise.
*/
private boolean isTransientValidationError(final TransactionInvalidReason invalidReason) {
return invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE)
return invalidReason.equals(TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE)
|| invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE)
|| invalidReason.equals(TransactionInvalidReason.NONCE_TOO_HIGH);
}
}

@ -18,8 +18,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.awaitility.Awaitility.await;
import static org.hyperledger.besu.ethereum.core.MiningParameters.DEFAULT_NON_POA_BLOCK_TXS_SELECTION_MAX_TIME;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.NONCE_TOO_LOW;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.PRIORITY_FEE_PER_GAS_BELOW_CURRENT_MIN;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;
@ -296,7 +297,7 @@ public abstract class AbstractBlockTransactionSelectorTest {
final Transaction tx = createTransaction(i, Wei.of(7), 100_000);
transactionsToInject.add(tx);
if (i == 1) {
ensureTransactionIsInvalid(tx, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(tx, TransactionInvalidReason.NONCE_TOO_LOW);
} else {
ensureTransactionIsValid(tx);
}
@ -311,8 +312,7 @@ public abstract class AbstractBlockTransactionSelectorTest {
.containsOnly(
entry(
invalidTx,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
assertThat(results.getSelectedTransactions().size()).isEqualTo(4);
assertThat(results.getSelectedTransactions().contains(invalidTx)).isFalse();
assertThat(results.getReceipts().size()).isEqualTo(4);
@ -568,8 +568,7 @@ public abstract class AbstractBlockTransactionSelectorTest {
ensureTransactionIsValid(validTransaction, 21_000, 0);
final Transaction invalidTransaction = createTransaction(3, Wei.of(10), 21_000);
ensureTransactionIsInvalid(
invalidTransaction, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(invalidTransaction, TransactionInvalidReason.NONCE_TOO_LOW);
transactionPool.addRemoteTransactions(List.of(validTransaction, invalidTransaction));
@ -582,8 +581,7 @@ public abstract class AbstractBlockTransactionSelectorTest {
.containsOnly(
entry(
invalidTransaction,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
}
@Test
@ -948,7 +946,7 @@ public abstract class AbstractBlockTransactionSelectorTest {
@ParameterizedTest
@MethodSource("subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver")
public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool(
public void pendingTransactionsThatTakesTooLongToEvaluateIsPenalized(
final boolean isPoa,
final boolean preProcessingTooLate,
final boolean processingTooLate,
@ -961,7 +959,7 @@ public abstract class AbstractBlockTransactionSelectorTest {
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
true);
false);
}
private void internalBlockSelectionTimeoutSimulation(
@ -1085,7 +1083,7 @@ public abstract class AbstractBlockTransactionSelectorTest {
500,
BLOCK_SELECTION_TIMEOUT,
false,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}
@ParameterizedTest
@ -1102,9 +1100,9 @@ public abstract class AbstractBlockTransactionSelectorTest {
processingTooLate,
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
INVALID_TX_EVALUATION_TOO_LONG,
true,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}
private void internalBlockSelectionTimeoutSimulationInvalidTxs(
@ -1423,15 +1421,17 @@ public abstract class AbstractBlockTransactionSelectorTest {
private static class PluginTransactionSelectionResult extends TransactionSelectionResult {
private enum PluginStatus implements Status {
PLUGIN_INVALID(false, true),
PLUGIN_INVALID_TRANSIENT(false, false);
PLUGIN_INVALID(false, true, false),
PLUGIN_INVALID_TRANSIENT(false, false, true);
private final boolean stop;
private final boolean discard;
private final boolean penalize;
PluginStatus(final boolean stop, final boolean discard) {
PluginStatus(final boolean stop, final boolean discard, final boolean penalize) {
this.stop = stop;
this.discard = discard;
this.penalize = penalize;
}
@Override
@ -1443,6 +1443,11 @@ public abstract class AbstractBlockTransactionSelectorTest {
public boolean discard() {
return discard;
}
@Override
public boolean penalize() {
return penalize;
}
}
public static final TransactionSelectionResult GENERIC_PLUGIN_INVALID_TRANSIENT =

@ -50,18 +50,20 @@ public abstract class PendingTransaction
private final Transaction transaction;
private final long addedAt;
private final long sequence; // Allows prioritization based on order transactions are added
private volatile byte score;
private int memorySize = NOT_INITIALIZED;
private PendingTransaction(
final Transaction transaction, final long addedAt, final long sequence) {
final Transaction transaction, final long addedAt, final long sequence, final byte score) {
this.transaction = transaction;
this.addedAt = addedAt;
this.sequence = sequence;
this.score = score;
}
private PendingTransaction(final Transaction transaction, final long addedAt) {
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement());
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement(), Byte.MAX_VALUE);
}
public static PendingTransaction newPendingTransaction(
@ -123,6 +125,20 @@ public abstract class PendingTransaction
return memorySize;
}
public byte getScore() {
return score;
}
public void decrementScore() {
// use temp var to avoid non-atomic update of volatile var
final byte newScore = (byte) (score - 1);
// check to avoid underflow
if (newScore < score) {
score = newScore;
}
}
public abstract PendingTransaction detachedCopy();
private int computeMemorySize() {
@ -255,6 +271,8 @@ public abstract class PendingTransaction
+ isReceivedFromLocalSource()
+ ", hasPriority="
+ hasPriority()
+ ", score="
+ score
+ '}';
}
@ -267,6 +285,8 @@ public abstract class PendingTransaction
+ isReceivedFromLocalSource()
+ ", hasPriority="
+ hasPriority()
+ ", score="
+ score
+ ", "
+ transaction.toTraceLog()
+ "}";
@ -282,13 +302,13 @@ public abstract class PendingTransaction
this(transaction, System.currentTimeMillis());
}
private Local(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Local(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}
@Override
public PendingTransaction detachedCopy() {
return new Local(getSequence(), getTransaction().detachedCopy());
return new Local(getSequence(), getScore(), getTransaction().detachedCopy());
}
@Override
@ -310,13 +330,13 @@ public abstract class PendingTransaction
super(transaction, addedAt);
}
public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}
@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}
@Override
@ -336,13 +356,13 @@ public abstract class PendingTransaction
this(transaction, System.currentTimeMillis());
}
private Remote(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Remote(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}
@Override
public PendingTransaction detachedCopy() {
return new Remote(getSequence(), getTransaction().detachedCopy());
return new Remote(getSequence(), getScore(), getTransaction().detachedCopy());
}
@Override
@ -364,13 +384,13 @@ public abstract class PendingTransaction
super(transaction, addedAt);
}
public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}
@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}
@Override

@ -37,12 +37,14 @@ public class TransactionPoolMetrics {
public static final String ADDED_COUNTER_NAME = "added_total";
public static final String REMOVED_COUNTER_NAME = "removed_total";
public static final String REJECTED_COUNTER_NAME = "rejected_total";
public static final String PENALIZED_COUNTER_NAME = "penalized_total";
public static final String EXPIRED_MESSAGES_COUNTER_NAME = "messages_expired_total";
private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000;
private final MetricsSystem metricsSystem;
private final LabelledMetric<Counter> addedCounter;
private final LabelledMetric<Counter> removedCounter;
private final LabelledMetric<Counter> rejectedCounter;
private final LabelledMetric<Counter> penalizedCounter;
private final LabelledGauge spaceUsed;
private final LabelledGauge transactionCount;
private final LabelledGauge transactionCountByType;
@ -88,6 +90,15 @@ public class TransactionPoolMetrics {
"reason",
"layer");
penalizedCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.TRANSACTION_POOL,
PENALIZED_COUNTER_NAME,
"Count of penalized transactions in the transaction pool",
"source",
"priority",
"layer");
spaceUsed =
metricsSystem.createLabelledGauge(
BesuMetricCategory.TRANSACTION_POOL,
@ -246,6 +257,15 @@ public class TransactionPoolMetrics {
.inc();
}
public void incrementPenalized(final PendingTransaction pendingTransaction, final String layer) {
penalizedCounter
.labels(
location(pendingTransaction.isReceivedFromLocalSource()),
priority(pendingTransaction.hasPriority()),
layer)
.inc();
}
public void incrementExpiredMessages(final String message) {
expiredMessagesCounter.labels(message).inc();
}

@ -24,13 +24,17 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
@ -137,6 +141,13 @@ public abstract class AbstractPrioritizedTransactions extends AbstractSequential
orderByFee.remove(removedTx);
}
@Override
protected void internalPenalize(final PendingTransaction penalizedTx) {
orderByFee.remove(penalizedTx);
penalizedTx.decrementScore();
orderByFee.add(penalizedTx);
}
@Override
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
@ -188,6 +199,60 @@ public abstract class AbstractPrioritizedTransactions extends AbstractSequential
.toList();
}
/**
* Returns pending txs by sender and ordered by score desc. In case a sender has pending txs with
* different scores, then in nonce sequence, every time there is a score decrease, his pending txs
* will be put in a new entry with that score. For example if a sender has 3 pending txs (where
* the first number is the nonce and the score is between parenthesis): 0(127), 1(126), 2(127),
* then for he there will be 2 entries:
*
* <ul>
* <li>0(127)
* <li>1(126), 2(127)
* </ul>
*
* @return pending txs by sender and ordered by score desc
*/
public NavigableMap<Byte, List<SenderPendingTransactions>> getByScore() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.flatMap(sender -> splitByScore(sender, txsBySender.get(sender)).entrySet().stream())
.collect(
Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(a, b) -> {
a.addAll(b);
return a;
},
TreeMap::new))
.descendingMap();
}
private Map<Byte, List<SenderPendingTransactions>> splitByScore(
final Address sender, final NavigableMap<Long, PendingTransaction> txsBySender) {
final var splitByScore = new HashMap<Byte, List<SenderPendingTransactions>>();
byte currScore = txsBySender.firstEntry().getValue().getScore();
var currSplit = new ArrayList<PendingTransaction>();
for (final var entry : txsBySender.entrySet()) {
if (entry.getValue().getScore() < currScore) {
// score decreased, we need to save current split and start a new one
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
.add(new SenderPendingTransactions(sender, currSplit));
currSplit = new ArrayList<>();
currScore = entry.getValue().getScore();
}
currSplit.add(entry.getValue());
}
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
.add(new SenderPendingTransactions(sender, currSplit));
return splitByScore;
}
@Override
protected long cacheFreeSpace() {
return Integer.MAX_VALUE;

@ -463,6 +463,18 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
}
}
@Override
public void penalize(final PendingTransaction penalizedTransaction) {
if (pendingTransactions.containsKey(penalizedTransaction.getHash())) {
internalPenalize(penalizedTransaction);
metrics.incrementPenalized(penalizedTransaction, name());
} else {
nextLayer.penalize(penalizedTransaction);
}
}
protected abstract void internalPenalize(final PendingTransaction pendingTransaction);
/**
* How many txs of a specified type can be promoted? This make sense when a max number of txs of a
* type can be included in a single block (ex. blob txs), to avoid filling the layer with more txs

@ -19,7 +19,6 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.Transaction
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
@ -66,7 +65,8 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti
@Override
protected int compareByFee(final PendingTransaction pt1, final PendingTransaction pt2) {
return Comparator.comparing(PendingTransaction::hasPriority)
return Comparator.comparing(PendingTransaction::getScore)
.thenComparing(PendingTransaction::hasPriority)
.thenComparing(
(PendingTransaction pendingTransaction) ->
pendingTransaction.getTransaction().getEffectivePriorityFeePerGas(nextBlockBaseFee))
@ -195,8 +195,8 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti
return "Basefee Prioritized: Empty";
}
final Transaction highest = orderByFee.last().getTransaction();
final Transaction lowest = orderByFee.first().getTransaction();
final PendingTransaction highest = orderByFee.last();
final PendingTransaction lowest = orderByFee.first();
return "Basefee Prioritized: "
+ "count: "
@ -205,16 +205,26 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti
+ spaceUsed
+ ", unique senders: "
+ txsBySender.size()
+ ", highest priority tx: [max fee: "
+ highest.getMaxGasPrice().toHumanReadableString()
+ ", highest priority tx: [score: "
+ highest.getScore()
+ ", max fee: "
+ highest.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", curr prio fee: "
+ highest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString()
+ highest
.getTransaction()
.getEffectivePriorityFeePerGas(nextBlockBaseFee)
.toHumanReadableString()
+ ", hash: "
+ highest.getHash()
+ "], lowest priority tx: [max fee: "
+ lowest.getMaxGasPrice().toHumanReadableString()
+ "], lowest priority tx: [score: "
+ lowest.getScore()
+ ", max fee: "
+ lowest.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", curr prio fee: "
+ lowest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString()
+ lowest
.getTransaction()
.getEffectivePriorityFeePerGas(nextBlockBaseFee)
.toHumanReadableString()
+ ", hash: "
+ lowest.getHash()
+ "], next block base fee: "

@ -85,6 +85,9 @@ public class EndLayer implements TransactionsLayer {
@Override
public void remove(final PendingTransaction pendingTransaction, final RemovalReason reason) {}
@Override
public void penalize(final PendingTransaction penalizedTx) {}
@Override
public void blockAdded(
final FeeMarket feeMarket,

@ -56,7 +56,8 @@ public class GasPricePrioritizedTransactions extends AbstractPrioritizedTransact
@Override
protected int compareByFee(final PendingTransaction pt1, final PendingTransaction pt2) {
return comparing(PendingTransaction::hasPriority)
return comparing(PendingTransaction::getScore)
.thenComparing(PendingTransaction::hasPriority)
.thenComparing(PendingTransaction::getGasPrice)
.thenComparing(PendingTransaction::getSequence)
.compare(pt1, pt2);
@ -78,21 +79,33 @@ public class GasPricePrioritizedTransactions extends AbstractPrioritizedTransact
}
@Override
public String internalLogStats() {
protected String internalLogStats() {
if (orderByFee.isEmpty()) {
return "GasPrice Prioritized: Empty";
}
final PendingTransaction highest = orderByFee.last();
final PendingTransaction lowest = orderByFee.first();
return "GasPrice Prioritized: "
+ "count: "
+ pendingTransactions.size()
+ " space used: "
+ ", space used: "
+ spaceUsed
+ " unique senders: "
+ ", unique senders: "
+ txsBySender.size()
+ ", highest fee tx: "
+ orderByFee.last().getTransaction().getGasPrice().get().toHumanReadableString()
+ ", lowest fee tx: "
+ orderByFee.first().getTransaction().getGasPrice().get().toHumanReadableString();
+ ", highest priority tx: [score: "
+ highest.getScore()
+ ", gas price: "
+ highest.getTransaction().getGasPrice().get().toHumanReadableString()
+ ", hash: "
+ highest.getHash()
+ "], lowest priority tx: [score: "
+ lowest.getScore()
+ ", gas price: "
+ lowest.getTransaction().getGasPrice().get().toHumanReadableString()
+ ", hash: "
+ lowest.getHash()
+ "]";
}
}

@ -42,10 +42,12 @@ import org.hyperledger.besu.plugin.data.TransactionSelectionResult;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@ -314,18 +316,25 @@ public class LayeredPendingTransactions implements PendingTransactions {
@Override
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final List<PendingTransaction> penalizedTransactions = new ArrayList<>();
final Set<Address> skipSenders = new HashSet<>();
final List<SenderPendingTransactions> candidateTxsBySender;
final Map<Byte, List<SenderPendingTransactions>> candidateTxsByScore;
synchronized (this) {
// since selecting transactions for block creation is a potential long operation
// we want to avoid to keep the lock for all the process, but we just lock to get
// the candidate transactions
candidateTxsBySender = prioritizedTransactions.getBySender();
candidateTxsByScore = prioritizedTransactions.getByScore();
}
selection:
for (final var senderTxs : candidateTxsBySender) {
LOG.trace("highPrioSenderTxs {}", senderTxs);
for (final var entry : candidateTxsByScore.entrySet()) {
LOG.trace("Evaluating txs with score {}", entry.getKey());
for (final var senderTxs : entry.getValue()) {
LOG.trace("Evaluating sender txs {}", senderTxs);
if (!skipSenders.contains(senderTxs.sender())) {
for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);
@ -341,6 +350,14 @@ public class LayeredPendingTransactions implements PendingTransactions {
logDiscardedTransaction(candidatePendingTx, selectionResult);
}
if (selectionResult.penalize()) {
penalizedTransactions.add(candidatePendingTx);
LOG.atTrace()
.setMessage("Transaction {} penalized")
.addArgument(candidatePendingTx::toTraceLog)
.log();
}
if (selectionResult.stop()) {
LOG.trace("Stopping selection");
break selection;
@ -350,19 +367,29 @@ public class LayeredPendingTransactions implements PendingTransactions {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
skipSenders.add(candidatePendingTx.getSender());
break;
}
}
}
}
}
ethScheduler.scheduleTxWorkerTask(
() ->
() -> {
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
}));
});
penalizedTransactions.forEach(
penalizedTx -> {
synchronized (this) {
prioritizedTransactions.internalPenalize(penalizedTx);
}
});
});
}
@Override

@ -18,7 +18,6 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.Transaction
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
@ -43,7 +42,8 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
private final NavigableSet<PendingTransaction> orderByMaxFee =
new TreeSet<>(
Comparator.comparing(PendingTransaction::hasPriority)
Comparator.comparing(PendingTransaction::getScore)
.thenComparing(PendingTransaction::hasPriority)
.thenComparing((PendingTransaction pt) -> pt.getTransaction().getMaxGasPrice())
.thenComparing(PendingTransaction::getSequence));
@ -116,6 +116,20 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
}
}
@Override
protected void internalPenalize(final PendingTransaction penalizedTx) {
final var senderTxs = txsBySender.get(penalizedTx.getSender());
if (senderTxs.firstKey() == penalizedTx.getNonce()) {
// since we only sort the first tx of sender, we only need to re-sort in this case
orderByMaxFee.remove(penalizedTx);
penalizedTx.decrementScore();
orderByMaxFee.add(penalizedTx);
} else {
// otherwise we just decrement the score
penalizedTx.decrementScore();
}
}
@Override
protected void internalReplaced(final PendingTransaction replacedTx) {
orderByMaxFee.remove(replacedTx);
@ -213,8 +227,8 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
return "Ready: Empty";
}
final Transaction top = orderByMaxFee.last().getTransaction();
final Transaction last = orderByMaxFee.first().getTransaction();
final PendingTransaction top = orderByMaxFee.last();
final PendingTransaction last = orderByMaxFee.first();
return "Ready: "
+ "count="
@ -223,12 +237,16 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
+ spaceUsed
+ ", unique senders: "
+ txsBySender.size()
+ ", top by max fee[max fee:"
+ top.getMaxGasPrice().toHumanReadableString()
+ ", top by score and max gas price[score: "
+ top.getScore()
+ ", max gas price:"
+ top.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", hash: "
+ top.getHash()
+ "], last by max fee [max fee: "
+ last.getMaxGasPrice().toHumanReadableString()
+ "], last by score and max gas price [score: "
+ last.getScore()
+ ", max fee: "
+ last.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", hash: "
+ last.getHash()
+ "]";

@ -303,6 +303,11 @@ public class SparseTransactions extends AbstractTransactionsLayer {
}
}
@Override
protected void internalPenalize(final PendingTransaction penalizedTx) {
// intentionally no-op
}
private void deleteGap(final Address sender) {
orderByGap.get(gapBySender.remove(sender)).remove(sender);
}

@ -45,6 +45,19 @@ public interface TransactionsLayer {
void remove(PendingTransaction pendingTransaction, RemovalReason reason);
/**
* Penalize a pending transaction. Penalization could be applied to notify the txpool that this
* pending tx has some temporary issues that prevent it from being included in a block, and so it
* should be de-prioritized in some ways, so it will be re-evaluated only after non penalized
* pending txs. For example: if during the evaluation for block inclusion, the pending tx is
* excluded because the sender has not enough balance to send it, this could be a transient issue
* since later the sender could receive some funds, but in any case we penalize the pending tx, so
* it is pushed down in the order of prioritized pending txs.
*
* @param penalizedTransaction the tx to penalize
*/
void penalize(PendingTransaction penalizedTransaction);
void blockAdded(
FeeMarket feeMarket,
BlockHeader blockHeader,

@ -22,7 +22,8 @@
* transactions that could be selected for a future block proposal, and at the same time, without
* penalizing legitimate unordered transactions, that are only temporary non-executable.
*
* <p>It is disabled by default, to enable use the option {@code Xlayered-tx-pool=true}
* <p>It is enabled by default on public networks, to switch to another implementation use the
* option {@code tx-pool}
*
* <p>The main idea is to organize the txpool in an arbitrary number of layers, where each layer has
* specific rules and constraints that determine if a transaction belong or not to that layer and
@ -38,6 +39,14 @@
* transactions are removed since confirmed in a block, transactions from the next layer are
* promoted until there is space.
*
* <p>Some layers could make use of the score of a pending transactions, to push back in the rank
* those pending transactions that have been penalized.
*
* <p>Layers are not thread safe, since they are not meant to be accessed directly, and all the
* synchronization is managed at the level of {@link
* org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredPendingTransactions
* LayeredPendingTransactions} class.
*
* <p>The current implementation is based on 3 layers, plus the last one that just drop every
* transaction when the previous layers are full. The 3 layers are, in order:
*
@ -48,20 +57,20 @@
* </ul>
*
* <p>Prioritized: This is where candidate transactions are selected for creating a new block.
* Transactions ordered by the effective priority fee, and it is limited by size, 2000 by default,
* to reduce the overhead of the sorting and because that number is enough to fill any block, at the
* current gas limit. Does not allow nonce gaps, and the first transaction for each sender must be
* the next one for that sender. Eviction is done removing the transaction with the higher nonce for
* the sender of the less valuable transaction, to avoid creating nonce gaps, evicted transactions
* go into the next layer Ready.
* Transactions ordered by score and then effective priority fee, and it is limited by size, 2000 by
* default, to reduce the overhead of the sorting and because that number is enough to fill any
* block, at the current gas limit. Does not allow nonce gaps, and the first transaction for each
* sender must be the next one for that sender. Eviction is done removing the transaction with the
* higher nonce for the sender of the less score and less effective priority fee transaction, to
* avoid creating nonce gaps, evicted transactions go into the next layer Ready.
*
* <p>Ready: Similar to the Prioritized, it does not allow nonce gaps, and the first transaction for
* each sender must be the next one for that sender, but it is limited by space instead of count,
* thus allowing many more transactions, think about this layer like a buffer for the Prioritized.
* Since it is meant to keep ten to hundreds of thousand of transactions, it does not have a full
* ordering, like the previous, but only the first transaction for each sender is ordered using a
* stable value that is the max fee per gas. Eviction is the same as the Prioritized, and evicted
* transaction go into the next layer Sparse.
* stable value that is score and then max fee per gas. Eviction is the same as the Prioritized, and
* evicted transaction go into the next layer Sparse.
*
* <p>Sparse: This is the first layer where nonce gaps are allowed and where the first transaction
* for a sender could not be the next expected one for that sender. The main purpose of this layer

@ -159,6 +159,12 @@ public class LayersTest extends BaseTransactionPoolTest {
assertScenario(scenario, BLOB_TX_POOL_CONFIG);
}
@ParameterizedTest
@MethodSource("providerPenalized")
void penalized(final Scenario scenario) {
assertScenario(scenario);
}
private void assertScenario(final Scenario scenario) {
assertScenario(scenario, DEFAULT_TX_POOL_CONFIG);
}
@ -1256,6 +1262,79 @@ public class LayersTest extends BaseTransactionPoolTest {
.expectedSparseForSenders()));
}
static Stream<Arguments> providerPenalized() {
return Stream.of(
Arguments.of(
new Scenario("single sender, single tx")
.addForSender(S1, 0)
.expectedPrioritizedForSender(S1, 0)
.penalizeForSender(S1, 0)
.expectedPrioritizedForSender(S1, 0)),
Arguments.of(
new Scenario("single sender penalize last")
.addForSender(S1, 0, 1)
.expectedPrioritizedForSender(S1, 0, 1)
.penalizeForSender(S1, 1)
.expectedPrioritizedForSender(S1, 0, 1)),
Arguments.of(
new Scenario("single sender penalize first")
.addForSender(S1, 0, 1)
.expectedPrioritizedForSender(S1, 0, 1)
.penalizeForSender(S1, 0, 1)
// even if 0 has less score it is always the first for the sender
// since otherwise there is a nonce gap
.expectedPrioritizedForSender(S1, 0, 1)),
Arguments.of(
new Scenario("multiple senders, penalize top")
.addForSenders(S1, 0, S2, 0)
// remember S2 pays more fees
.expectedPrioritizedForSenders(S2, 0, S1, 0)
.penalizeForSender(S2, 0)
.expectedPrioritizedForSenders(S1, 0, S2, 0)),
Arguments.of(
new Scenario("multiple senders, penalize bottom")
.addForSenders(S1, 0, S2, 0)
.expectedPrioritizedForSenders(S2, 0, S1, 0)
.penalizeForSender(S1, 0)
.expectedPrioritizedForSenders(S2, 0, S1, 0)),
Arguments.of(
new Scenario("multiple senders, penalize middle")
.addForSenders(S1, 0, S2, 0, S3, 0)
.expectedPrioritizedForSenders(S3, 0, S2, 0, S1, 0)
.penalizeForSender(S2, 0)
.expectedPrioritizedForSenders(S3, 0, S1, 0, S2, 0)),
Arguments.of(
new Scenario("single sender, promote from ready")
.addForSender(S1, 0, 1, 2, 3, 4, 5)
.expectedPrioritizedForSender(S1, 0, 1, 2)
.expectedReadyForSender(S1, 3, 4, 5)
.penalizeForSender(S1, 3)
.confirmedForSenders(S1, 0)
// even if penalized 3 is promoted to avoid nonce gap
.expectedPrioritizedForSender(S1, 1, 2, 3)
.expectedReadyForSender(S1, 4, 5)),
Arguments.of(
new Scenario("multiple senders, overflow to ready")
.addForSenders(S1, 0, S2, 0, S3, 0)
.expectedPrioritizedForSenders(S3, 0, S2, 0, S1, 0)
.expectedReadyForSenders()
.penalizeForSender(S3, 0)
.addForSender(S1, 1)
.expectedPrioritizedForSenders(S2, 0, S1, 0, S1, 1)
// S3(0) is demoted to ready even if it is paying more fees,
// since has a lower score
.expectedReadyForSender(S3, 0)),
Arguments.of(
new Scenario("multiple senders, overflow to sparse")
.addForSenders(S1, 0, S2, 0, S3, 0, S1, 1, S2, 1, S3, 1)
.expectedPrioritizedForSenders(S3, 0, S3, 1, S2, 0)
.expectedReadyForSenders(S2, 1, S1, 0, S1, 1)
.penalizeForSender(S2, 1)
.addForSender(S2, 2)
.expectedReadyForSenders(S1, 0, S1, 1, S2, 1)
.expectedSparseForSender(S2, 2)));
}
private static BlockHeader mockBlockHeader() {
final BlockHeader blockHeader = mock(BlockHeader.class);
when(blockHeader.getBaseFee()).thenReturn(Optional.of(BASE_FEE));
@ -1511,10 +1590,12 @@ public class LayersTest extends BaseTransactionPoolTest {
private void assertExpectedPrioritized(
final AbstractPrioritizedTransactions prioLayer, final List<PendingTransaction> expected) {
assertThat(prioLayer.getBySender())
.describedAs("Prioritized")
.flatExtracting(SenderPendingTransactions::pendingTransactions)
.containsExactlyElementsOf(expected);
final var flatOrder =
prioLayer.getByScore().values().stream()
.flatMap(List::stream)
.flatMap(spt -> spt.pendingTransactions().stream())
.toList();
assertThat(flatOrder).describedAs("Prioritized").containsExactlyElementsOf(expected);
}
private void assertExpectedReady(
@ -1582,6 +1663,23 @@ public class LayersTest extends BaseTransactionPoolTest {
return this;
}
public Scenario penalizeForSender(final Sender sender, final long... nonce) {
Arrays.stream(nonce)
.forEach(
n -> {
actions.add(
(pending, prio, ready, sparse, dropped) -> {
final var senderTxs = prio.getAllFor(sender.address);
Arrays.stream(nonce)
.mapToObj(
n2 -> senderTxs.stream().filter(pt -> pt.getNonce() == n2).findAny())
.map(Optional::get)
.forEach(prio::penalize);
});
});
return this;
}
public Scenario expectedSelectedTransactions(final Object... args) {
List<PendingTransaction> expectedSelected = new ArrayList<>();
for (int i = 0; i < args.length; i = i + 2) {

@ -70,7 +70,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files
knownHash = 'F07ix5Mkvycb2W2luprKmcMyrWcSLB4Xtou5Id10DW0='
knownHash = '6L5dNJ975Ka/X7g4lTdpkBvPQrJgJu+vAf/m1dFCneU='
}
check.dependsOn('checkAPIChanges')

@ -42,6 +42,13 @@ public class TransactionSelectionResult {
*/
boolean discard();
/**
* Should the score of this transaction be decremented?
*
* @return yes if the score of this transaction needs to be decremented
*/
boolean penalize();
/**
* Name of this status
*
@ -52,30 +59,34 @@ public class TransactionSelectionResult {
private enum BaseStatus implements Status {
SELECTED,
BLOCK_FULL(true, false),
BLOBS_FULL(false, false),
BLOCK_OCCUPANCY_ABOVE_THRESHOLD(true, false),
BLOCK_SELECTION_TIMEOUT(true, false),
TX_EVALUATION_TOO_LONG(true, true),
INVALID_TRANSIENT(false, false),
INVALID(false, true);
BLOCK_FULL(true, false, false),
BLOBS_FULL(false, false, false),
BLOCK_OCCUPANCY_ABOVE_THRESHOLD(true, false, false),
BLOCK_SELECTION_TIMEOUT(true, false, false),
TX_EVALUATION_TOO_LONG(true, false, true),
INVALID_TX_EVALUATION_TOO_LONG(true, true, true),
INVALID_TRANSIENT(false, false, true),
INVALID(false, true, false);
private final boolean stop;
private final boolean discard;
private final boolean penalize;
BaseStatus() {
this.stop = false;
this.discard = false;
this.penalize = false;
}
BaseStatus(final boolean stop, final boolean discard) {
BaseStatus(final boolean stop, final boolean discard, final boolean penalize) {
this.stop = stop;
this.discard = discard;
this.penalize = penalize;
}
@Override
public String toString() {
return name() + " (stop=" + stop + ", discard=" + discard + ")";
return name() + " (stop=" + stop + ", discard=" + discard + ", penalize=" + penalize + ")";
}
@Override
@ -87,6 +98,11 @@ public class TransactionSelectionResult {
public boolean discard() {
return discard;
}
@Override
public boolean penalize() {
return penalize;
}
}
/** The transaction has been selected to be included in the new block */
@ -105,10 +121,14 @@ public class TransactionSelectionResult {
public static final TransactionSelectionResult BLOCK_SELECTION_TIMEOUT =
new TransactionSelectionResult(BaseStatus.BLOCK_SELECTION_TIMEOUT);
/** Transaction took too much to evaluate */
/** Transaction took too much to evaluate, but it was not invalid */
public static final TransactionSelectionResult TX_EVALUATION_TOO_LONG =
new TransactionSelectionResult(BaseStatus.TX_EVALUATION_TOO_LONG);
/** Transaction took too much to evaluate, and it was invalid */
public static final TransactionSelectionResult INVALID_TX_EVALUATION_TOO_LONG =
new TransactionSelectionResult(BaseStatus.INVALID_TX_EVALUATION_TOO_LONG);
/**
* The transaction has not been selected since too large and the occupancy of the block is enough
* to stop the selection.
@ -215,6 +235,15 @@ public class TransactionSelectionResult {
return status.discard();
}
/**
* Should the score of this transaction be decremented?
*
* @return yes if the score of this transaction needs to be decremented
*/
public boolean penalize() {
return status.penalize();
}
/**
* Is the candidate transaction selected for block inclusion?
*

Loading…
Cancel
Save