From 30dfa66c1ebd01c53966f4274af626b2168e1ee2 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Thu, 25 Jul 2024 23:53:53 +0200 Subject: [PATCH] Penalize invalid transient pending transactions in the layered transaction pool (#7359) * Introduce score for pending transactions Signed-off-by: Fabio Di Fabio * Introduce score for pending transactions Signed-off-by: Fabio Di Fabio * Update package javadoc Signed-off-by: Fabio Di Fabio --------- Signed-off-by: Fabio Di Fabio --- .../txselection/BlockTransactionSelector.java | 21 +++- .../ProcessingResultTransactionSelector.java | 3 +- .../AbstractBlockTransactionSelectorTest.java | 37 +++--- .../eth/transactions/PendingTransaction.java | 48 +++++--- .../transactions/TransactionPoolMetrics.java | 20 ++++ .../AbstractPrioritizedTransactions.java | 65 +++++++++++ .../layered/AbstractTransactionsLayer.java | 12 ++ .../BaseFeePrioritizedTransactions.java | 30 +++-- .../eth/transactions/layered/EndLayer.java | 3 + .../GasPricePrioritizedTransactions.java | 29 +++-- .../layered/LayeredPendingTransactions.java | 99 ++++++++++------ .../layered/ReadyTransactions.java | 34 ++++-- .../layered/SparseTransactions.java | 5 + .../layered/TransactionsLayer.java | 13 +++ .../transactions/layered/package-info.java | 27 +++-- .../eth/transactions/layered/LayersTest.java | 106 +++++++++++++++++- plugin-api/build.gradle | 2 +- .../data/TransactionSelectionResult.java | 49 ++++++-- 18 files changed, 480 insertions(+), 123 deletions(-) diff --git a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java index c106b7aff1..ed028767d6 100644 --- a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java +++ b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.blockcreation.txselection; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT; +import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG; @@ -419,11 +420,14 @@ public class BlockTransactionSelector { final var pendingTransaction = evaluationContext.getPendingTransaction(); - // check if this tx took too much to evaluate, and in case remove it from the pool + // check if this tx took too much to evaluate, and in case it was invalid remove it from the + // pool, otherwise penalize it. final TransactionSelectionResult actualResult = isTimeout.get() - ? transactionTookTooLong(evaluationContext) - ? TX_EVALUATION_TOO_LONG + ? transactionTookTooLong(evaluationContext, selectionResult) + ? selectionResult.discard() + ? INVALID_TX_EVALUATION_TOO_LONG + : TX_EVALUATION_TOO_LONG : BLOCK_SELECTION_TIMEOUT : selectionResult; @@ -441,16 +445,21 @@ public class BlockTransactionSelector { return actualResult; } - private boolean transactionTookTooLong(final TransactionEvaluationContext evaluationContext) { + private boolean transactionTookTooLong( + final TransactionEvaluationContext evaluationContext, + final TransactionSelectionResult selectionResult) { final var evaluationTimer = evaluationContext.getEvaluationTimer(); if (evaluationTimer.elapsed(TimeUnit.MILLISECONDS) > blockTxsSelectionMaxTime) { LOG.atWarn() .setMessage( - "Transaction {} is too late for inclusion, evaluated in {} that is over the max limit of {}ms" - + ", removing it from the pool") + "Transaction {} is too late for inclusion, with result {}, evaluated in {} that is over the max limit of {}ms" + + ", {}") .addArgument(evaluationContext.getPendingTransaction()::getHash) + .addArgument(selectionResult) .addArgument(evaluationTimer) .addArgument(blockTxsSelectionMaxTime) + .addArgument( + selectionResult.discard() ? "removing it from the pool" : "penalizing it in the pool") .log(); return true; } diff --git a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/selectors/ProcessingResultTransactionSelector.java b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/selectors/ProcessingResultTransactionSelector.java index 85d97bdf46..9eed2bff09 100644 --- a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/selectors/ProcessingResultTransactionSelector.java +++ b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/selectors/ProcessingResultTransactionSelector.java @@ -110,7 +110,8 @@ public class ProcessingResultTransactionSelector extends AbstractTransactionSele * @return True if the invalid reason is transient, false otherwise. */ private boolean isTransientValidationError(final TransactionInvalidReason invalidReason) { - return invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE) + return invalidReason.equals(TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE) + || invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE) || invalidReason.equals(TransactionInvalidReason.NONCE_TOO_HIGH); } } diff --git a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java index c6ee454241..adcc3ee1e2 100644 --- a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java +++ b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java @@ -18,8 +18,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; import static org.awaitility.Awaitility.await; import static org.hyperledger.besu.ethereum.core.MiningParameters.DEFAULT_NON_POA_BLOCK_TXS_SELECTION_MAX_TIME; -import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE; +import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.NONCE_TOO_LOW; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT; +import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.PRIORITY_FEE_PER_GAS_BELOW_CURRENT_MIN; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG; @@ -296,7 +297,7 @@ public abstract class AbstractBlockTransactionSelectorTest { final Transaction tx = createTransaction(i, Wei.of(7), 100_000); transactionsToInject.add(tx); if (i == 1) { - ensureTransactionIsInvalid(tx, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE); + ensureTransactionIsInvalid(tx, TransactionInvalidReason.NONCE_TOO_LOW); } else { ensureTransactionIsValid(tx); } @@ -311,8 +312,7 @@ public abstract class AbstractBlockTransactionSelectorTest { .containsOnly( entry( invalidTx, - TransactionSelectionResult.invalid( - TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name()))); + TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name()))); assertThat(results.getSelectedTransactions().size()).isEqualTo(4); assertThat(results.getSelectedTransactions().contains(invalidTx)).isFalse(); assertThat(results.getReceipts().size()).isEqualTo(4); @@ -568,8 +568,7 @@ public abstract class AbstractBlockTransactionSelectorTest { ensureTransactionIsValid(validTransaction, 21_000, 0); final Transaction invalidTransaction = createTransaction(3, Wei.of(10), 21_000); - ensureTransactionIsInvalid( - invalidTransaction, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE); + ensureTransactionIsInvalid(invalidTransaction, TransactionInvalidReason.NONCE_TOO_LOW); transactionPool.addRemoteTransactions(List.of(validTransaction, invalidTransaction)); @@ -582,8 +581,7 @@ public abstract class AbstractBlockTransactionSelectorTest { .containsOnly( entry( invalidTransaction, - TransactionSelectionResult.invalid( - TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name()))); + TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name()))); } @Test @@ -948,7 +946,7 @@ public abstract class AbstractBlockTransactionSelectorTest { @ParameterizedTest @MethodSource("subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver") - public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool( + public void pendingTransactionsThatTakesTooLongToEvaluateIsPenalized( final boolean isPoa, final boolean preProcessingTooLate, final boolean processingTooLate, @@ -961,7 +959,7 @@ public abstract class AbstractBlockTransactionSelectorTest { postProcessingTooLate, 900, TX_EVALUATION_TOO_LONG, - true); + false); } private void internalBlockSelectionTimeoutSimulation( @@ -1085,7 +1083,7 @@ public abstract class AbstractBlockTransactionSelectorTest { 500, BLOCK_SELECTION_TIMEOUT, false, - UPFRONT_COST_EXCEEDS_BALANCE); + NONCE_TOO_LOW); } @ParameterizedTest @@ -1102,9 +1100,9 @@ public abstract class AbstractBlockTransactionSelectorTest { processingTooLate, postProcessingTooLate, 900, - TX_EVALUATION_TOO_LONG, + INVALID_TX_EVALUATION_TOO_LONG, true, - UPFRONT_COST_EXCEEDS_BALANCE); + NONCE_TOO_LOW); } private void internalBlockSelectionTimeoutSimulationInvalidTxs( @@ -1423,15 +1421,17 @@ public abstract class AbstractBlockTransactionSelectorTest { private static class PluginTransactionSelectionResult extends TransactionSelectionResult { private enum PluginStatus implements Status { - PLUGIN_INVALID(false, true), - PLUGIN_INVALID_TRANSIENT(false, false); + PLUGIN_INVALID(false, true, false), + PLUGIN_INVALID_TRANSIENT(false, false, true); private final boolean stop; private final boolean discard; + private final boolean penalize; - PluginStatus(final boolean stop, final boolean discard) { + PluginStatus(final boolean stop, final boolean discard, final boolean penalize) { this.stop = stop; this.discard = discard; + this.penalize = penalize; } @Override @@ -1443,6 +1443,11 @@ public abstract class AbstractBlockTransactionSelectorTest { public boolean discard() { return discard; } + + @Override + public boolean penalize() { + return penalize; + } } public static final TransactionSelectionResult GENERIC_PLUGIN_INVALID_TRANSIENT = diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransaction.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransaction.java index 6ce8f47a7a..bd98bee5ad 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransaction.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransaction.java @@ -50,18 +50,20 @@ public abstract class PendingTransaction private final Transaction transaction; private final long addedAt; private final long sequence; // Allows prioritization based on order transactions are added + private volatile byte score; private int memorySize = NOT_INITIALIZED; private PendingTransaction( - final Transaction transaction, final long addedAt, final long sequence) { + final Transaction transaction, final long addedAt, final long sequence, final byte score) { this.transaction = transaction; this.addedAt = addedAt; this.sequence = sequence; + this.score = score; } private PendingTransaction(final Transaction transaction, final long addedAt) { - this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement()); + this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement(), Byte.MAX_VALUE); } public static PendingTransaction newPendingTransaction( @@ -123,6 +125,20 @@ public abstract class PendingTransaction return memorySize; } + public byte getScore() { + return score; + } + + public void decrementScore() { + // use temp var to avoid non-atomic update of volatile var + final byte newScore = (byte) (score - 1); + + // check to avoid underflow + if (newScore < score) { + score = newScore; + } + } + public abstract PendingTransaction detachedCopy(); private int computeMemorySize() { @@ -255,6 +271,8 @@ public abstract class PendingTransaction + isReceivedFromLocalSource() + ", hasPriority=" + hasPriority() + + ", score=" + + score + '}'; } @@ -267,6 +285,8 @@ public abstract class PendingTransaction + isReceivedFromLocalSource() + ", hasPriority=" + hasPriority() + + ", score=" + + score + ", " + transaction.toTraceLog() + "}"; @@ -282,13 +302,13 @@ public abstract class PendingTransaction this(transaction, System.currentTimeMillis()); } - private Local(final long sequence, final Transaction transaction) { - super(transaction, System.currentTimeMillis(), sequence); + private Local(final long sequence, final byte score, final Transaction transaction) { + super(transaction, System.currentTimeMillis(), sequence, score); } @Override public PendingTransaction detachedCopy() { - return new Local(getSequence(), getTransaction().detachedCopy()); + return new Local(getSequence(), getScore(), getTransaction().detachedCopy()); } @Override @@ -310,13 +330,13 @@ public abstract class PendingTransaction super(transaction, addedAt); } - public Priority(final long sequence, final Transaction transaction) { - super(sequence, transaction); + public Priority(final long sequence, final byte score, final Transaction transaction) { + super(sequence, score, transaction); } @Override public PendingTransaction detachedCopy() { - return new Priority(getSequence(), getTransaction().detachedCopy()); + return new Priority(getSequence(), getScore(), getTransaction().detachedCopy()); } @Override @@ -336,13 +356,13 @@ public abstract class PendingTransaction this(transaction, System.currentTimeMillis()); } - private Remote(final long sequence, final Transaction transaction) { - super(transaction, System.currentTimeMillis(), sequence); + private Remote(final long sequence, final byte score, final Transaction transaction) { + super(transaction, System.currentTimeMillis(), sequence, score); } @Override public PendingTransaction detachedCopy() { - return new Remote(getSequence(), getTransaction().detachedCopy()); + return new Remote(getSequence(), getScore(), getTransaction().detachedCopy()); } @Override @@ -364,13 +384,13 @@ public abstract class PendingTransaction super(transaction, addedAt); } - public Priority(final long sequence, final Transaction transaction) { - super(sequence, transaction); + public Priority(final long sequence, final byte score, final Transaction transaction) { + super(sequence, score, transaction); } @Override public PendingTransaction detachedCopy() { - return new Priority(getSequence(), getTransaction().detachedCopy()); + return new Priority(getSequence(), getScore(), getTransaction().detachedCopy()); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java index 812e7de9f4..90e9628e5c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java @@ -37,12 +37,14 @@ public class TransactionPoolMetrics { public static final String ADDED_COUNTER_NAME = "added_total"; public static final String REMOVED_COUNTER_NAME = "removed_total"; public static final String REJECTED_COUNTER_NAME = "rejected_total"; + public static final String PENALIZED_COUNTER_NAME = "penalized_total"; public static final String EXPIRED_MESSAGES_COUNTER_NAME = "messages_expired_total"; private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000; private final MetricsSystem metricsSystem; private final LabelledMetric addedCounter; private final LabelledMetric removedCounter; private final LabelledMetric rejectedCounter; + private final LabelledMetric penalizedCounter; private final LabelledGauge spaceUsed; private final LabelledGauge transactionCount; private final LabelledGauge transactionCountByType; @@ -88,6 +90,15 @@ public class TransactionPoolMetrics { "reason", "layer"); + penalizedCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.TRANSACTION_POOL, + PENALIZED_COUNTER_NAME, + "Count of penalized transactions in the transaction pool", + "source", + "priority", + "layer"); + spaceUsed = metricsSystem.createLabelledGauge( BesuMetricCategory.TRANSACTION_POOL, @@ -246,6 +257,15 @@ public class TransactionPoolMetrics { .inc(); } + public void incrementPenalized(final PendingTransaction pendingTransaction, final String layer) { + penalizedCounter + .labels( + location(pendingTransaction.isReceivedFromLocalSource()), + priority(pendingTransaction.hasPriority()), + layer) + .inc(); + } + public void incrementExpiredMessages(final String message) { expiredMessagesCounter.labels(message).inc(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java index 7a65b3febc..470e7f5b7b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java @@ -24,13 +24,17 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.TreeMap; import java.util.TreeSet; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.Collectors; /** * Holds the current set of executable pending transactions, that are candidate for inclusion on @@ -137,6 +141,13 @@ public abstract class AbstractPrioritizedTransactions extends AbstractSequential orderByFee.remove(removedTx); } + @Override + protected void internalPenalize(final PendingTransaction penalizedTx) { + orderByFee.remove(penalizedTx); + penalizedTx.decrementScore(); + orderByFee.add(penalizedTx); + } + @Override public List promote( final Predicate promotionFilter, @@ -188,6 +199,60 @@ public abstract class AbstractPrioritizedTransactions extends AbstractSequential .toList(); } + /** + * Returns pending txs by sender and ordered by score desc. In case a sender has pending txs with + * different scores, then in nonce sequence, every time there is a score decrease, his pending txs + * will be put in a new entry with that score. For example if a sender has 3 pending txs (where + * the first number is the nonce and the score is between parenthesis): 0(127), 1(126), 2(127), + * then for he there will be 2 entries: + * + *
    + *
  • 0(127) + *
  • 1(126), 2(127) + *
+ * + * @return pending txs by sender and ordered by score desc + */ + public NavigableMap> getByScore() { + final var sendersToAdd = new HashSet<>(txsBySender.keySet()); + return orderByFee.descendingSet().stream() + .map(PendingTransaction::getSender) + .filter(sendersToAdd::remove) + .flatMap(sender -> splitByScore(sender, txsBySender.get(sender)).entrySet().stream()) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (a, b) -> { + a.addAll(b); + return a; + }, + TreeMap::new)) + .descendingMap(); + } + + private Map> splitByScore( + final Address sender, final NavigableMap txsBySender) { + final var splitByScore = new HashMap>(); + byte currScore = txsBySender.firstEntry().getValue().getScore(); + var currSplit = new ArrayList(); + for (final var entry : txsBySender.entrySet()) { + if (entry.getValue().getScore() < currScore) { + // score decreased, we need to save current split and start a new one + splitByScore + .computeIfAbsent(currScore, k -> new ArrayList<>()) + .add(new SenderPendingTransactions(sender, currSplit)); + currSplit = new ArrayList<>(); + currScore = entry.getValue().getScore(); + } + currSplit.add(entry.getValue()); + } + splitByScore + .computeIfAbsent(currScore, k -> new ArrayList<>()) + .add(new SenderPendingTransactions(sender, currSplit)); + return splitByScore; + } + @Override protected long cacheFreeSpace() { return Integer.MAX_VALUE; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index 5997fe6c18..b4f6e927c0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -463,6 +463,18 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer { } } + @Override + public void penalize(final PendingTransaction penalizedTransaction) { + if (pendingTransactions.containsKey(penalizedTransaction.getHash())) { + internalPenalize(penalizedTransaction); + metrics.incrementPenalized(penalizedTransaction, name()); + } else { + nextLayer.penalize(penalizedTransaction); + } + } + + protected abstract void internalPenalize(final PendingTransaction pendingTransaction); + /** * How many txs of a specified type can be promoted? This make sense when a max number of txs of a * type can be included in a single block (ex. blob txs), to avoid filling the layer with more txs diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java index 8140cd5d7a..b3dec34b77 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java @@ -19,7 +19,6 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.Transaction import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MiningParameters; -import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction; @@ -66,7 +65,8 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti @Override protected int compareByFee(final PendingTransaction pt1, final PendingTransaction pt2) { - return Comparator.comparing(PendingTransaction::hasPriority) + return Comparator.comparing(PendingTransaction::getScore) + .thenComparing(PendingTransaction::hasPriority) .thenComparing( (PendingTransaction pendingTransaction) -> pendingTransaction.getTransaction().getEffectivePriorityFeePerGas(nextBlockBaseFee)) @@ -195,8 +195,8 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti return "Basefee Prioritized: Empty"; } - final Transaction highest = orderByFee.last().getTransaction(); - final Transaction lowest = orderByFee.first().getTransaction(); + final PendingTransaction highest = orderByFee.last(); + final PendingTransaction lowest = orderByFee.first(); return "Basefee Prioritized: " + "count: " @@ -205,16 +205,26 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti + spaceUsed + ", unique senders: " + txsBySender.size() - + ", highest priority tx: [max fee: " - + highest.getMaxGasPrice().toHumanReadableString() + + ", highest priority tx: [score: " + + highest.getScore() + + ", max fee: " + + highest.getTransaction().getMaxGasPrice().toHumanReadableString() + ", curr prio fee: " - + highest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString() + + highest + .getTransaction() + .getEffectivePriorityFeePerGas(nextBlockBaseFee) + .toHumanReadableString() + ", hash: " + highest.getHash() - + "], lowest priority tx: [max fee: " - + lowest.getMaxGasPrice().toHumanReadableString() + + "], lowest priority tx: [score: " + + lowest.getScore() + + ", max fee: " + + lowest.getTransaction().getMaxGasPrice().toHumanReadableString() + ", curr prio fee: " - + lowest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString() + + lowest + .getTransaction() + .getEffectivePriorityFeePerGas(nextBlockBaseFee) + .toHumanReadableString() + ", hash: " + lowest.getHash() + "], next block base fee: " diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java index 0f5d9a6d15..f383f178c2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java @@ -85,6 +85,9 @@ public class EndLayer implements TransactionsLayer { @Override public void remove(final PendingTransaction pendingTransaction, final RemovalReason reason) {} + @Override + public void penalize(final PendingTransaction penalizedTx) {} + @Override public void blockAdded( final FeeMarket feeMarket, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/GasPricePrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/GasPricePrioritizedTransactions.java index 97bd3a88ee..504a453fa8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/GasPricePrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/GasPricePrioritizedTransactions.java @@ -56,7 +56,8 @@ public class GasPricePrioritizedTransactions extends AbstractPrioritizedTransact @Override protected int compareByFee(final PendingTransaction pt1, final PendingTransaction pt2) { - return comparing(PendingTransaction::hasPriority) + return comparing(PendingTransaction::getScore) + .thenComparing(PendingTransaction::hasPriority) .thenComparing(PendingTransaction::getGasPrice) .thenComparing(PendingTransaction::getSequence) .compare(pt1, pt2); @@ -78,21 +79,33 @@ public class GasPricePrioritizedTransactions extends AbstractPrioritizedTransact } @Override - public String internalLogStats() { + protected String internalLogStats() { if (orderByFee.isEmpty()) { return "GasPrice Prioritized: Empty"; } + final PendingTransaction highest = orderByFee.last(); + final PendingTransaction lowest = orderByFee.first(); + return "GasPrice Prioritized: " + "count: " + pendingTransactions.size() - + " space used: " + + ", space used: " + spaceUsed - + " unique senders: " + + ", unique senders: " + txsBySender.size() - + ", highest fee tx: " - + orderByFee.last().getTransaction().getGasPrice().get().toHumanReadableString() - + ", lowest fee tx: " - + orderByFee.first().getTransaction().getGasPrice().get().toHumanReadableString(); + + ", highest priority tx: [score: " + + highest.getScore() + + ", gas price: " + + highest.getTransaction().getGasPrice().get().toHumanReadableString() + + ", hash: " + + highest.getHash() + + "], lowest priority tx: [score: " + + lowest.getScore() + + ", gas price: " + + lowest.getTransaction().getGasPrice().get().toHumanReadableString() + + ", hash: " + + lowest.getHash() + + "]"; } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java index 9444885ebe..5297f08021 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java @@ -42,10 +42,12 @@ import org.hyperledger.besu.plugin.data.TransactionSelectionResult; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -314,55 +316,80 @@ public class LayeredPendingTransactions implements PendingTransactions { @Override public void selectTransactions(final PendingTransactions.TransactionSelector selector) { final List invalidTransactions = new ArrayList<>(); + final List penalizedTransactions = new ArrayList<>(); + final Set
skipSenders = new HashSet<>(); - final List candidateTxsBySender; + final Map> candidateTxsByScore; synchronized (this) { // since selecting transactions for block creation is a potential long operation // we want to avoid to keep the lock for all the process, but we just lock to get // the candidate transactions - candidateTxsBySender = prioritizedTransactions.getBySender(); + candidateTxsByScore = prioritizedTransactions.getByScore(); } selection: - for (final var senderTxs : candidateTxsBySender) { - LOG.trace("highPrioSenderTxs {}", senderTxs); - - for (final var candidatePendingTx : senderTxs.pendingTransactions()) { - final var selectionResult = selector.evaluateTransaction(candidatePendingTx); - - LOG.atTrace() - .setMessage("Selection result {} for transaction {}") - .addArgument(selectionResult) - .addArgument(candidatePendingTx::toTraceLog) - .log(); - - if (selectionResult.discard()) { - invalidTransactions.add(candidatePendingTx); - logDiscardedTransaction(candidatePendingTx, selectionResult); - } - - if (selectionResult.stop()) { - LOG.trace("Stopping selection"); - break selection; - } - - if (!selectionResult.selected()) { - // avoid processing other txs from this sender if this one is skipped - // since the following will not be selected due to the nonce gap - LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender()); - break; + for (final var entry : candidateTxsByScore.entrySet()) { + LOG.trace("Evaluating txs with score {}", entry.getKey()); + + for (final var senderTxs : entry.getValue()) { + LOG.trace("Evaluating sender txs {}", senderTxs); + + if (!skipSenders.contains(senderTxs.sender())) { + + for (final var candidatePendingTx : senderTxs.pendingTransactions()) { + final var selectionResult = selector.evaluateTransaction(candidatePendingTx); + + LOG.atTrace() + .setMessage("Selection result {} for transaction {}") + .addArgument(selectionResult) + .addArgument(candidatePendingTx::toTraceLog) + .log(); + + if (selectionResult.discard()) { + invalidTransactions.add(candidatePendingTx); + logDiscardedTransaction(candidatePendingTx, selectionResult); + } + + if (selectionResult.penalize()) { + penalizedTransactions.add(candidatePendingTx); + LOG.atTrace() + .setMessage("Transaction {} penalized") + .addArgument(candidatePendingTx::toTraceLog) + .log(); + } + + if (selectionResult.stop()) { + LOG.trace("Stopping selection"); + break selection; + } + + if (!selectionResult.selected()) { + // avoid processing other txs from this sender if this one is skipped + // since the following will not be selected due to the nonce gap + LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender()); + skipSenders.add(candidatePendingTx.getSender()); + break; + } + } } } } ethScheduler.scheduleTxWorkerTask( - () -> - invalidTransactions.forEach( - invalidTx -> { - synchronized (this) { - prioritizedTransactions.remove(invalidTx, INVALIDATED); - } - })); + () -> { + invalidTransactions.forEach( + invalidTx -> { + synchronized (this) { + prioritizedTransactions.remove(invalidTx, INVALIDATED); + } + }); + penalizedTransactions.forEach( + penalizedTx -> { + synchronized (this) { + prioritizedTransactions.internalPenalize(penalizedTx); + } + }); + }); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java index 1f9fc0ab8d..0f52e1c5c3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java @@ -18,7 +18,6 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.Transaction import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction; @@ -43,7 +42,8 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer { private final NavigableSet orderByMaxFee = new TreeSet<>( - Comparator.comparing(PendingTransaction::hasPriority) + Comparator.comparing(PendingTransaction::getScore) + .thenComparing(PendingTransaction::hasPriority) .thenComparing((PendingTransaction pt) -> pt.getTransaction().getMaxGasPrice()) .thenComparing(PendingTransaction::getSequence)); @@ -116,6 +116,20 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer { } } + @Override + protected void internalPenalize(final PendingTransaction penalizedTx) { + final var senderTxs = txsBySender.get(penalizedTx.getSender()); + if (senderTxs.firstKey() == penalizedTx.getNonce()) { + // since we only sort the first tx of sender, we only need to re-sort in this case + orderByMaxFee.remove(penalizedTx); + penalizedTx.decrementScore(); + orderByMaxFee.add(penalizedTx); + } else { + // otherwise we just decrement the score + penalizedTx.decrementScore(); + } + } + @Override protected void internalReplaced(final PendingTransaction replacedTx) { orderByMaxFee.remove(replacedTx); @@ -213,8 +227,8 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer { return "Ready: Empty"; } - final Transaction top = orderByMaxFee.last().getTransaction(); - final Transaction last = orderByMaxFee.first().getTransaction(); + final PendingTransaction top = orderByMaxFee.last(); + final PendingTransaction last = orderByMaxFee.first(); return "Ready: " + "count=" @@ -223,12 +237,16 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer { + spaceUsed + ", unique senders: " + txsBySender.size() - + ", top by max fee[max fee:" - + top.getMaxGasPrice().toHumanReadableString() + + ", top by score and max gas price[score: " + + top.getScore() + + ", max gas price:" + + top.getTransaction().getMaxGasPrice().toHumanReadableString() + ", hash: " + top.getHash() - + "], last by max fee [max fee: " - + last.getMaxGasPrice().toHumanReadableString() + + "], last by score and max gas price [score: " + + last.getScore() + + ", max fee: " + + last.getTransaction().getMaxGasPrice().toHumanReadableString() + ", hash: " + last.getHash() + "]"; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java index 52f598ba71..aef318e6df 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java @@ -303,6 +303,11 @@ public class SparseTransactions extends AbstractTransactionsLayer { } } + @Override + protected void internalPenalize(final PendingTransaction penalizedTx) { + // intentionally no-op + } + private void deleteGap(final Address sender) { orderByGap.get(gapBySender.remove(sender)).remove(sender); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java index d3c22aeef1..531add0af7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java @@ -45,6 +45,19 @@ public interface TransactionsLayer { void remove(PendingTransaction pendingTransaction, RemovalReason reason); + /** + * Penalize a pending transaction. Penalization could be applied to notify the txpool that this + * pending tx has some temporary issues that prevent it from being included in a block, and so it + * should be de-prioritized in some ways, so it will be re-evaluated only after non penalized + * pending txs. For example: if during the evaluation for block inclusion, the pending tx is + * excluded because the sender has not enough balance to send it, this could be a transient issue + * since later the sender could receive some funds, but in any case we penalize the pending tx, so + * it is pushed down in the order of prioritized pending txs. + * + * @param penalizedTransaction the tx to penalize + */ + void penalize(PendingTransaction penalizedTransaction); + void blockAdded( FeeMarket feeMarket, BlockHeader blockHeader, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/package-info.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/package-info.java index fbc9da9fe5..ff274ce4ea 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/package-info.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/package-info.java @@ -22,7 +22,8 @@ * transactions that could be selected for a future block proposal, and at the same time, without * penalizing legitimate unordered transactions, that are only temporary non-executable. * - *

It is disabled by default, to enable use the option {@code Xlayered-tx-pool=true} + *

It is enabled by default on public networks, to switch to another implementation use the + * option {@code tx-pool} * *

The main idea is to organize the txpool in an arbitrary number of layers, where each layer has * specific rules and constraints that determine if a transaction belong or not to that layer and @@ -38,6 +39,14 @@ * transactions are removed since confirmed in a block, transactions from the next layer are * promoted until there is space. * + *

Some layers could make use of the score of a pending transactions, to push back in the rank + * those pending transactions that have been penalized. + * + *

Layers are not thread safe, since they are not meant to be accessed directly, and all the + * synchronization is managed at the level of {@link + * org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredPendingTransactions + * LayeredPendingTransactions} class. + * *

The current implementation is based on 3 layers, plus the last one that just drop every * transaction when the previous layers are full. The 3 layers are, in order: * @@ -48,20 +57,20 @@ * * *

Prioritized: This is where candidate transactions are selected for creating a new block. - * Transactions ordered by the effective priority fee, and it is limited by size, 2000 by default, - * to reduce the overhead of the sorting and because that number is enough to fill any block, at the - * current gas limit. Does not allow nonce gaps, and the first transaction for each sender must be - * the next one for that sender. Eviction is done removing the transaction with the higher nonce for - * the sender of the less valuable transaction, to avoid creating nonce gaps, evicted transactions - * go into the next layer Ready. + * Transactions ordered by score and then effective priority fee, and it is limited by size, 2000 by + * default, to reduce the overhead of the sorting and because that number is enough to fill any + * block, at the current gas limit. Does not allow nonce gaps, and the first transaction for each + * sender must be the next one for that sender. Eviction is done removing the transaction with the + * higher nonce for the sender of the less score and less effective priority fee transaction, to + * avoid creating nonce gaps, evicted transactions go into the next layer Ready. * *

Ready: Similar to the Prioritized, it does not allow nonce gaps, and the first transaction for * each sender must be the next one for that sender, but it is limited by space instead of count, * thus allowing many more transactions, think about this layer like a buffer for the Prioritized. * Since it is meant to keep ten to hundreds of thousand of transactions, it does not have a full * ordering, like the previous, but only the first transaction for each sender is ordered using a - * stable value that is the max fee per gas. Eviction is the same as the Prioritized, and evicted - * transaction go into the next layer Sparse. + * stable value that is score and then max fee per gas. Eviction is the same as the Prioritized, and + * evicted transaction go into the next layer Sparse. * *

Sparse: This is the first layer where nonce gaps are allowed and where the first transaction * for a sender could not be the next expected one for that sender. The main purpose of this layer diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java index c5c2bdcf97..84ac759a79 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java @@ -159,6 +159,12 @@ public class LayersTest extends BaseTransactionPoolTest { assertScenario(scenario, BLOB_TX_POOL_CONFIG); } + @ParameterizedTest + @MethodSource("providerPenalized") + void penalized(final Scenario scenario) { + assertScenario(scenario); + } + private void assertScenario(final Scenario scenario) { assertScenario(scenario, DEFAULT_TX_POOL_CONFIG); } @@ -1256,6 +1262,79 @@ public class LayersTest extends BaseTransactionPoolTest { .expectedSparseForSenders())); } + static Stream providerPenalized() { + return Stream.of( + Arguments.of( + new Scenario("single sender, single tx") + .addForSender(S1, 0) + .expectedPrioritizedForSender(S1, 0) + .penalizeForSender(S1, 0) + .expectedPrioritizedForSender(S1, 0)), + Arguments.of( + new Scenario("single sender penalize last") + .addForSender(S1, 0, 1) + .expectedPrioritizedForSender(S1, 0, 1) + .penalizeForSender(S1, 1) + .expectedPrioritizedForSender(S1, 0, 1)), + Arguments.of( + new Scenario("single sender penalize first") + .addForSender(S1, 0, 1) + .expectedPrioritizedForSender(S1, 0, 1) + .penalizeForSender(S1, 0, 1) + // even if 0 has less score it is always the first for the sender + // since otherwise there is a nonce gap + .expectedPrioritizedForSender(S1, 0, 1)), + Arguments.of( + new Scenario("multiple senders, penalize top") + .addForSenders(S1, 0, S2, 0) + // remember S2 pays more fees + .expectedPrioritizedForSenders(S2, 0, S1, 0) + .penalizeForSender(S2, 0) + .expectedPrioritizedForSenders(S1, 0, S2, 0)), + Arguments.of( + new Scenario("multiple senders, penalize bottom") + .addForSenders(S1, 0, S2, 0) + .expectedPrioritizedForSenders(S2, 0, S1, 0) + .penalizeForSender(S1, 0) + .expectedPrioritizedForSenders(S2, 0, S1, 0)), + Arguments.of( + new Scenario("multiple senders, penalize middle") + .addForSenders(S1, 0, S2, 0, S3, 0) + .expectedPrioritizedForSenders(S3, 0, S2, 0, S1, 0) + .penalizeForSender(S2, 0) + .expectedPrioritizedForSenders(S3, 0, S1, 0, S2, 0)), + Arguments.of( + new Scenario("single sender, promote from ready") + .addForSender(S1, 0, 1, 2, 3, 4, 5) + .expectedPrioritizedForSender(S1, 0, 1, 2) + .expectedReadyForSender(S1, 3, 4, 5) + .penalizeForSender(S1, 3) + .confirmedForSenders(S1, 0) + // even if penalized 3 is promoted to avoid nonce gap + .expectedPrioritizedForSender(S1, 1, 2, 3) + .expectedReadyForSender(S1, 4, 5)), + Arguments.of( + new Scenario("multiple senders, overflow to ready") + .addForSenders(S1, 0, S2, 0, S3, 0) + .expectedPrioritizedForSenders(S3, 0, S2, 0, S1, 0) + .expectedReadyForSenders() + .penalizeForSender(S3, 0) + .addForSender(S1, 1) + .expectedPrioritizedForSenders(S2, 0, S1, 0, S1, 1) + // S3(0) is demoted to ready even if it is paying more fees, + // since has a lower score + .expectedReadyForSender(S3, 0)), + Arguments.of( + new Scenario("multiple senders, overflow to sparse") + .addForSenders(S1, 0, S2, 0, S3, 0, S1, 1, S2, 1, S3, 1) + .expectedPrioritizedForSenders(S3, 0, S3, 1, S2, 0) + .expectedReadyForSenders(S2, 1, S1, 0, S1, 1) + .penalizeForSender(S2, 1) + .addForSender(S2, 2) + .expectedReadyForSenders(S1, 0, S1, 1, S2, 1) + .expectedSparseForSender(S2, 2))); + } + private static BlockHeader mockBlockHeader() { final BlockHeader blockHeader = mock(BlockHeader.class); when(blockHeader.getBaseFee()).thenReturn(Optional.of(BASE_FEE)); @@ -1511,10 +1590,12 @@ public class LayersTest extends BaseTransactionPoolTest { private void assertExpectedPrioritized( final AbstractPrioritizedTransactions prioLayer, final List expected) { - assertThat(prioLayer.getBySender()) - .describedAs("Prioritized") - .flatExtracting(SenderPendingTransactions::pendingTransactions) - .containsExactlyElementsOf(expected); + final var flatOrder = + prioLayer.getByScore().values().stream() + .flatMap(List::stream) + .flatMap(spt -> spt.pendingTransactions().stream()) + .toList(); + assertThat(flatOrder).describedAs("Prioritized").containsExactlyElementsOf(expected); } private void assertExpectedReady( @@ -1582,6 +1663,23 @@ public class LayersTest extends BaseTransactionPoolTest { return this; } + public Scenario penalizeForSender(final Sender sender, final long... nonce) { + Arrays.stream(nonce) + .forEach( + n -> { + actions.add( + (pending, prio, ready, sparse, dropped) -> { + final var senderTxs = prio.getAllFor(sender.address); + Arrays.stream(nonce) + .mapToObj( + n2 -> senderTxs.stream().filter(pt -> pt.getNonce() == n2).findAny()) + .map(Optional::get) + .forEach(prio::penalize); + }); + }); + return this; + } + public Scenario expectedSelectedTransactions(final Object... args) { List expectedSelected = new ArrayList<>(); for (int i = 0; i < args.length; i = i + 2) { diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index 58c241509e..81e66a06a0 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -70,7 +70,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = 'F07ix5Mkvycb2W2luprKmcMyrWcSLB4Xtou5Id10DW0=' + knownHash = '6L5dNJ975Ka/X7g4lTdpkBvPQrJgJu+vAf/m1dFCneU=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/data/TransactionSelectionResult.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/data/TransactionSelectionResult.java index b59fb26967..66b1c1f2d6 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/data/TransactionSelectionResult.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/data/TransactionSelectionResult.java @@ -42,6 +42,13 @@ public class TransactionSelectionResult { */ boolean discard(); + /** + * Should the score of this transaction be decremented? + * + * @return yes if the score of this transaction needs to be decremented + */ + boolean penalize(); + /** * Name of this status * @@ -52,30 +59,34 @@ public class TransactionSelectionResult { private enum BaseStatus implements Status { SELECTED, - BLOCK_FULL(true, false), - BLOBS_FULL(false, false), - BLOCK_OCCUPANCY_ABOVE_THRESHOLD(true, false), - BLOCK_SELECTION_TIMEOUT(true, false), - TX_EVALUATION_TOO_LONG(true, true), - INVALID_TRANSIENT(false, false), - INVALID(false, true); + BLOCK_FULL(true, false, false), + BLOBS_FULL(false, false, false), + BLOCK_OCCUPANCY_ABOVE_THRESHOLD(true, false, false), + BLOCK_SELECTION_TIMEOUT(true, false, false), + TX_EVALUATION_TOO_LONG(true, false, true), + INVALID_TX_EVALUATION_TOO_LONG(true, true, true), + INVALID_TRANSIENT(false, false, true), + INVALID(false, true, false); private final boolean stop; private final boolean discard; + private final boolean penalize; BaseStatus() { this.stop = false; this.discard = false; + this.penalize = false; } - BaseStatus(final boolean stop, final boolean discard) { + BaseStatus(final boolean stop, final boolean discard, final boolean penalize) { this.stop = stop; this.discard = discard; + this.penalize = penalize; } @Override public String toString() { - return name() + " (stop=" + stop + ", discard=" + discard + ")"; + return name() + " (stop=" + stop + ", discard=" + discard + ", penalize=" + penalize + ")"; } @Override @@ -87,6 +98,11 @@ public class TransactionSelectionResult { public boolean discard() { return discard; } + + @Override + public boolean penalize() { + return penalize; + } } /** The transaction has been selected to be included in the new block */ @@ -105,10 +121,14 @@ public class TransactionSelectionResult { public static final TransactionSelectionResult BLOCK_SELECTION_TIMEOUT = new TransactionSelectionResult(BaseStatus.BLOCK_SELECTION_TIMEOUT); - /** Transaction took too much to evaluate */ + /** Transaction took too much to evaluate, but it was not invalid */ public static final TransactionSelectionResult TX_EVALUATION_TOO_LONG = new TransactionSelectionResult(BaseStatus.TX_EVALUATION_TOO_LONG); + /** Transaction took too much to evaluate, and it was invalid */ + public static final TransactionSelectionResult INVALID_TX_EVALUATION_TOO_LONG = + new TransactionSelectionResult(BaseStatus.INVALID_TX_EVALUATION_TOO_LONG); + /** * The transaction has not been selected since too large and the occupancy of the block is enough * to stop the selection. @@ -215,6 +235,15 @@ public class TransactionSelectionResult { return status.discard(); } + /** + * Should the score of this transaction be decremented? + * + * @return yes if the score of this transaction needs to be decremented + */ + public boolean penalize() { + return status.penalize(); + } + /** * Is the candidate transaction selected for block inclusion? *