Layered txpool: fix for unsent drop notifications on remove (#7538)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/7586/head
Fabio Di Fabio 3 months ago committed by GitHub
parent edd3c4f0a0
commit dc6324c8d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java
  3. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractSequentialTransactionsLayer.java
  4. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java
  5. 65
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AddReason.java
  6. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java
  7. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java
  8. 35
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java
  9. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java
  10. 131
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/RemovalReason.java
  11. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java
  12. 76
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java
  13. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java
  14. 1
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java
  15. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java
  16. 609
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java
  17. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java

@ -11,6 +11,7 @@
### Bug fixes
- Layered txpool: do not send notifications when moving tx between layers [#7539](https://github.com/hyperledger/besu/pull/7539)
- Layered txpool: fix for unsent drop notifications on remove [#7538](https://github.com/hyperledger/besu/pull/7538)
## 24.9.0

@ -15,7 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier;

@ -14,9 +14,9 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.FOLLOW_INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import java.util.Map;
import java.util.NavigableMap;
@ -45,7 +46,7 @@ public abstract class AbstractSequentialTransactionsLayer extends AbstractTransa
}
@Override
public void remove(final PendingTransaction invalidatedTx, final RemovalReason reason) {
public void remove(final PendingTransaction invalidatedTx, final PoolRemovalReason reason) {
nextLayer.remove(invalidatedTx, reason);
final var senderTxs = txsBySender.get(invalidatedTx.getSender());

@ -19,11 +19,12 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedRes
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.RemovedFrom.POOL;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@ -292,7 +293,8 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
if (remainingPromotionsPerType[txType.ordinal()] > 0) {
senderTxs.pollFirstEntry();
processRemove(senderTxs, candidateTx.getTransaction(), RemovalReason.PROMOTED);
processRemove(
senderTxs, candidateTx.getTransaction(), RemovalReason.LayerMoveReason.PROMOTED);
metrics.incrementRemoved(candidateTx, "promoted", name());
if (senderTxs.isEmpty()) {
@ -419,6 +421,9 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
decreaseCounters(removedTx);
metrics.incrementRemoved(removedTx, removalReason.label(), name());
internalRemove(senderTxs, removedTx, removalReason);
if (removalReason.removedFrom().equals(POOL)) {
notifyTransactionDropped(removedTx);
}
}
return removedTx;
}

@ -0,0 +1,65 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import java.util.Locale;
/** Describe why we are trying to add a tx to a layer. */
public enum AddReason {
/** When adding a tx, that is not present in the pool. */
NEW(true, true),
/** When adding a tx as result of an internal move between layers. */
MOVE(false, false),
/** When adding a tx as result of a promotion from a lower layer. */
PROMOTED(false, false);
private final boolean sendNotification;
private final boolean makeCopy;
private final String label;
AddReason(final boolean sendNotification, final boolean makeCopy) {
this.sendNotification = sendNotification;
this.makeCopy = makeCopy;
this.label = name().toLowerCase(Locale.ROOT);
}
/**
* Should we send add notification for this reason?
*
* @return true if notification should be sent
*/
public boolean sendNotification() {
return sendNotification;
}
/**
* Should the layer make a copy of the pending tx before adding it, to avoid keeping reference to
* potentially large underlying byte buffers?
*
* @return true is a copy is necessary
*/
public boolean makeCopy() {
return makeCopy;
}
/**
* Return a label that identify this reason to be used in the metric system.
*
* @return a label
*/
public String label() {
return label;
}
}

@ -14,8 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.BELOW_BASE_FEE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.DEMOTED;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -105,7 +105,7 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti
while (itTxsBySender.hasNext()) {
final var senderTxs = itTxsBySender.next().getValue();
Optional<Long> maybeFirstUnderpricedNonce = Optional.empty();
Optional<Long> maybeFirstDemotedNonce = Optional.empty();
for (final var e : senderTxs.entrySet()) {
final PendingTransaction tx = e.getValue();
@ -115,25 +115,27 @@ public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransacti
} else {
// otherwise sender txs starting from this nonce need to be demoted to next layer,
// and we can go to next sender
maybeFirstUnderpricedNonce = Optional.of(e.getKey());
maybeFirstDemotedNonce = Optional.of(e.getKey());
break;
}
}
maybeFirstUnderpricedNonce.ifPresent(
maybeFirstDemotedNonce.ifPresent(
nonce -> {
// demote all txs after the first underpriced to the next layer, because none of them is
// demote all txs after the first demoted to the next layer, because none of them is
// executable now, and we can avoid sorting them until they are candidate for execution
// again
final var demoteTxs = senderTxs.tailMap(nonce, true);
while (!demoteTxs.isEmpty()) {
final PendingTransaction demoteTx = demoteTxs.pollLastEntry().getValue();
LOG.atTrace()
.setMessage("Demoting tx {} with max gas price below next block base fee {}")
.setMessage(
"Demoting tx {} since it does not respect anymore the requisites to stay in this layer."
+ " Next block base fee {}")
.addArgument(demoteTx::toTraceLog)
.addArgument(newNextBlockBaseFee::toHumanReadableString)
.log();
processEvict(senderTxs, demoteTx, BELOW_BASE_FEE);
processEvict(senderTxs, demoteTx, DEMOTED);
addToNextLayer(senderTxs, demoteTx, 0, MOVE);
}
});

@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedLis
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.util.Subscribers;
@ -84,7 +85,7 @@ public class EndLayer implements TransactionsLayer {
}
@Override
public void remove(final PendingTransaction pendingTransaction, final RemovalReason reason) {}
public void remove(final PendingTransaction pendingTransaction, final PoolRemovalReason reason) {}
@Override
public void penalize(final PendingTransaction penalizedTx) {}

@ -20,9 +20,9 @@ import static java.util.stream.Collectors.reducing;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.RECONCILED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.RECONCILED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
@ -316,7 +316,6 @@ public class LayeredPendingTransactions implements PendingTransactions {
@Override
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final List<PendingTransaction> penalizedTransactions = new ArrayList<>();
final Set<Address> skipSenders = new HashSet<>();
@ -347,7 +346,12 @@ public class LayeredPendingTransactions implements PendingTransactions {
.log();
if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
ethScheduler.scheduleTxWorkerTask(
() -> {
synchronized (this) {
prioritizedTransactions.remove(candidatePendingTx, INVALIDATED);
}
});
logDiscardedTransaction(candidatePendingTx, selectionResult);
}
@ -377,20 +381,13 @@ public class LayeredPendingTransactions implements PendingTransactions {
}
ethScheduler.scheduleTxWorkerTask(
() -> {
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
});
penalizedTransactions.forEach(
penalizedTx -> {
synchronized (this) {
prioritizedTransactions.internalPenalize(penalizedTx);
}
});
});
() ->
penalizedTransactions.forEach(
penalizedTx -> {
synchronized (this) {
prioritizedTransactions.penalize(penalizedTx);
}
}));
}
@Override

@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.BlockHeader;

@ -0,0 +1,131 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import java.util.Locale;
/** The reason why a pending tx has been removed */
public interface RemovalReason {
/**
* From where the tx has been removed
*
* @return removed from item
*/
RemovedFrom removedFrom();
/**
* Return a label that identify this reason to be used in the metric system.
*
* @return a label
*/
String label();
/** There are 2 kinds of removals, from a layer and from the pool. */
enum RemovedFrom {
/**
* Removing from a layer, can be also seen as a <i>move</i> between layers, since it is removed
* from the current layer and added to another layer, for example in the case the layer is full
* and some txs need to be moved to the next layer, or in the opposite case when some txs are
* promoted to the upper layer.
*/
LAYER,
/**
* Removing from the pool, instead means that the tx is directly removed from the pool, and it
* will not be present in any layer, for example, when it is added to an imported block, or it
* is replaced by another tx.
*/
POOL
}
/** The reason why the tx has been removed from the pool */
enum PoolRemovalReason implements RemovalReason {
/** Tx removed since it is confirmed on chain, as part of an imported block. */
CONFIRMED(),
/** Tx removed since it has been replaced by another one added in the same layer. */
REPLACED(),
/** Tx removed since it has been replaced by another one added in another layer. */
CROSS_LAYER_REPLACED(),
/** Tx removed when the pool is full, to make space for new incoming txs. */
DROPPED(),
/**
* Tx removed since found invalid after it was added to the pool, for example during txs
* selection for a new block proposal.
*/
INVALIDATED(),
/**
* Special case, when for a sender, discrepancies are found between the world state view and the
* pool view, then all the txs for this sender are removed and added again. Discrepancies, are
* rare, and can happen during a short windows when a new block is being imported and the world
* state being updated.
*/
RECONCILED();
private final String label;
PoolRemovalReason() {
this.label = name().toLowerCase(Locale.ROOT);
}
@Override
public RemovedFrom removedFrom() {
return RemovedFrom.POOL;
}
@Override
public String label() {
return label;
}
}
/** The reason why the tx has been moved across layers */
enum LayerMoveReason implements RemovalReason {
/**
* When the current layer is full, and this tx needs to be moved to the lower layer, in order to
* free space.
*/
EVICTED(),
/**
* Specific to sequential layers, when a tx is removed because found invalid, then if the sender
* has other txs with higher nonce, then a gap is created, and since sequential layers do not
* permit gaps, txs following the invalid one need to be moved to lower layers.
*/
FOLLOW_INVALIDATED(),
/**
* When a tx is moved to the upper layer, since it satisfies all the requirement to be promoted.
*/
PROMOTED(),
/**
* When a tx is moved to the lower layer, since it, or a preceding one from the same sender,
* does not respect anymore the requisites to stay in this layer.
*/
DEMOTED();
private final String label;
LayerMoveReason() {
this.label = name().toLowerCase(Locale.ROOT);
}
@Override
public RemovedFrom removedFrom() {
return RemovedFrom.LAYER;
}
@Override
public String label() {
return label;
}
}
}

@ -14,8 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import java.util.ArrayList;
@ -225,7 +226,7 @@ public class SparseTransactions extends AbstractTransactionsLayer {
@Override
public synchronized void remove(
final PendingTransaction invalidatedTx, final RemovalReason reason) {
final PendingTransaction invalidatedTx, final PoolRemovalReason reason) {
final var senderTxs = txsBySender.get(invalidatedTx.getSender());
if (senderTxs != null && senderTxs.containsKey(invalidatedTx.getNonce())) {

@ -22,10 +22,10 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
@ -55,7 +55,13 @@ public interface TransactionsLayer {
*/
TransactionAddedResult add(PendingTransaction pendingTransaction, int gap, AddReason addReason);
void remove(PendingTransaction pendingTransaction, RemovalReason reason);
/**
* Remove the pending tx from the pool
*
* @param pendingTransaction the pending tx
* @param reason the reason it is removed from the pool
*/
void remove(PendingTransaction pendingTransaction, PoolRemovalReason reason);
/**
* Penalize a pending transaction. Penalization could be applied to notify the txpool that this
@ -119,70 +125,4 @@ public interface TransactionsLayer {
String logStats();
String logSender(Address sender);
/** Describe why we are trying to add a tx to a layer. */
enum AddReason {
/** When adding a tx, that is not present in the pool. */
NEW(true, true),
/** When adding a tx as result of an internal move between layers. */
MOVE(false, false),
/** When adding a tx as result of a promotion from a lower layer. */
PROMOTED(false, false);
private final boolean sendNotification;
private final boolean makeCopy;
private final String label;
AddReason(final boolean sendNotification, final boolean makeCopy) {
this.sendNotification = sendNotification;
this.makeCopy = makeCopy;
this.label = name().toLowerCase(Locale.ROOT);
}
/**
* Should we send add notification for this reason?
*
* @return true if notification should be sent
*/
public boolean sendNotification() {
return sendNotification;
}
/**
* Should the layer make a copy of the pending tx before adding it, to avoid keeping reference
* to potentially large underlying byte buffers?
*
* @return true is a copy is necessary
*/
public boolean makeCopy() {
return makeCopy;
}
public String label() {
return label;
}
}
enum RemovalReason {
CONFIRMED,
CROSS_LAYER_REPLACED,
EVICTED,
DROPPED,
FOLLOW_INVALIDATED,
INVALIDATED,
PROMOTED,
REPLACED,
RECONCILED,
BELOW_BASE_FEE;
private final String label;
RemovalReason() {
this.label = name().toLowerCase(Locale.ROOT);
}
public String label() {
return label;
}
}
}

@ -17,7 +17,7 @@ package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;

@ -35,7 +35,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactions;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.metrics.StubMetricsSystem;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;

@ -20,10 +20,10 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedRes
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOB_PRICE_BELOW_CURRENT_MIN;

@ -15,6 +15,8 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.hyperledger.besu.datatypes.TransactionType.ACCESS_LIST;
import static org.hyperledger.besu.datatypes.TransactionType.BLOB;
import static org.hyperledger.besu.datatypes.TransactionType.EIP1559;
@ -25,7 +27,7 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S4;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.SP1;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.SP2;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -35,6 +37,7 @@ import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.Util;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
@ -51,11 +54,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
@ -156,7 +162,7 @@ public class LayersTest extends BaseTransactionPoolTest {
@ParameterizedTest
@MethodSource("providerMaxPrioritizedByType")
void maxPrioritizedByType(final Scenario scenario) {
assertScenario(scenario, BLOB_TX_POOL_CONFIG);
assertScenario(scenario);
}
@ParameterizedTest
@ -166,54 +172,7 @@ public class LayersTest extends BaseTransactionPoolTest {
}
private void assertScenario(final Scenario scenario) {
assertScenario(scenario, DEFAULT_TX_POOL_CONFIG);
}
private void assertScenario(
final Scenario scenario, final TransactionPoolConfiguration poolConfig) {
final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem);
final EvictCollectorLayer evictCollector = new EvictCollectorLayer(txPoolMetrics);
final EthScheduler ethScheduler = new DeterministicEthScheduler();
final SparseTransactions sparseTransactions =
new SparseTransactions(
poolConfig,
ethScheduler,
evictCollector,
txPoolMetrics,
(pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2),
new BlobCache());
final ReadyTransactions readyTransactions =
new ReadyTransactions(
poolConfig,
ethScheduler,
sparseTransactions,
txPoolMetrics,
(pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2),
new BlobCache());
final BaseFeePrioritizedTransactions prioritizedTransactions =
new BaseFeePrioritizedTransactions(
poolConfig,
LayersTest::mockBlockHeader,
ethScheduler,
readyTransactions,
txPoolMetrics,
(pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2),
FeeMarket.london(0L),
new BlobCache(),
MiningParameters.newDefault().setMinTransactionGasPrice(MIN_GAS_PRICE));
final LayeredPendingTransactions pendingTransactions =
new LayeredPendingTransactions(poolConfig, prioritizedTransactions, ethScheduler);
scenario.execute(
pendingTransactions,
prioritizedTransactions,
readyTransactions,
sparseTransactions,
evictCollector);
scenario.run();
}
static Stream<Arguments> providerAddTransactions() {
@ -452,7 +411,7 @@ public class LayersTest extends BaseTransactionPoolTest {
.expectedReadyForSenders(S1, 0, S1, 1)
.expectedSparseForSender(S3, 2)
.addForSenders(S3, 1)
// ToDo: only S3[1] is prioritized because there is no space to try to fill gaps
// only S3[1] is prioritized because there is no space to try to fill gaps
.expectedPrioritizedForSenders(S3, 0, S3, 1, S2, 0)
.expectedReadyForSenders(S2, 1, S1, 0, S1, 1)
.expectedSparseForSender(S3, 2)
@ -465,11 +424,11 @@ public class LayersTest extends BaseTransactionPoolTest {
Arguments.of(
new Scenario("replacement cross layer")
.addForSenders(S2, 0, S3, 2, S1, 1, S2, 1, S3, 0, S1, 0, S3, 1)
// ToDo: only S3[1] is prioritized because there is no space to try to fill gaps
// only S3[1] is prioritized because there is no space to try to fill gaps
.expectedPrioritizedForSenders(S3, 0, S3, 1, S2, 0)
.expectedReadyForSenders(S2, 1, S1, 0, S1, 1)
.expectedSparseForSender(S3, 2)
.addForSenders(S3, 2) // added in prioritized, but replacement in sparse
.replaceForSenders(S3, 2) // added in prioritized, but replacement in sparse
.expectedPrioritizedForSenders(S3, 0, S3, 1, S3, 2)
.expectedReadyForSenders(S2, 0, S2, 1, S1, 0)
.expectedSparseForSender(S1, 1)));
@ -477,6 +436,8 @@ public class LayersTest extends BaseTransactionPoolTest {
static Stream<Arguments> providerRemoveTransactions() {
return Stream.of(
// when expected*ForSender(s) is not present, by default there is a check that the layers
// are empty
Arguments.of(new Scenario("remove not existing").removeForSender(S1, 0)),
Arguments.of(new Scenario("add/remove first").addForSender(S1, 0).removeForSender(S1, 0)),
Arguments.of(
@ -1060,10 +1021,10 @@ public class LayersTest extends BaseTransactionPoolTest {
.expectedNextNonceForSenders(S1, 3)
.addForSender(S1, 3)
.expectedPrioritizedForSender(S1, 2, 3)
.setAccountNonce(S1, 0) // rewind nonce due to reorg
.addForSender(S1, 0)
.expectedPrioritizedForSender(S1, 0)
.expectedSparseForSender(S1, 2, 3)));
.reorgForSenders(S1, 0) // rewind nonce due to reorg
.addForSender(S1, 0, 1) // re-add reorged txs
.expectedPrioritizedForSender(S1, 0, 1, 2)
.expectedReadyForSender(S1, 3)));
}
static Stream<Arguments> providerAsyncWorldStateUpdates() {
@ -1221,22 +1182,22 @@ public class LayersTest extends BaseTransactionPoolTest {
static Stream<Arguments> providerMaxPrioritizedByType() {
return Stream.of(
Arguments.of(
new Scenario("first blob tx is prioritized")
new Scenario("first blob tx is prioritized", BLOB_TX_POOL_CONFIG)
.addForSender(S1, BLOB, 0)
.expectedPrioritizedForSender(S1, 0)),
Arguments.of(
new Scenario("multiple senders only first blob tx is prioritized")
new Scenario("multiple senders only first blob tx is prioritized", BLOB_TX_POOL_CONFIG)
.addForSender(S1, BLOB, 0)
.addForSender(S2, BLOB, 0)
.expectedPrioritizedForSender(S1, 0)
.expectedReadyForSender(S2, 0)),
Arguments.of(
new Scenario("same sender following blob txs are moved to ready")
new Scenario("same sender following blob txs are moved to ready", BLOB_TX_POOL_CONFIG)
.addForSender(S1, BLOB, 0, 1, 2)
.expectedPrioritizedForSender(S1, 0)
.expectedReadyForSender(S1, 1, 2)),
Arguments.of(
new Scenario("promoting txs respect prioritized count limit")
new Scenario("promoting txs respect prioritized count limit", BLOB_TX_POOL_CONFIG)
.addForSender(S1, BLOB, 0, 1, 2)
.expectedPrioritizedForSender(S1, 0)
.expectedReadyForSender(S1, 1, 2)
@ -1244,14 +1205,14 @@ public class LayersTest extends BaseTransactionPoolTest {
.expectedPrioritizedForSender(S1, 1)
.expectedReadyForSender(S1, 2)),
Arguments.of(
new Scenario("filling gaps respect prioritized count limit")
new Scenario("filling gaps respect prioritized count limit", BLOB_TX_POOL_CONFIG)
.addForSender(S1, BLOB, 1)
.expectedSparseForSender(S1, 1)
.addForSender(S1, BLOB, 0)
.expectedPrioritizedForSender(S1, 0)
.expectedSparseForSender(S1, 1)),
Arguments.of(
new Scenario("promoting to ready is unbounded")
new Scenario("promoting to ready is unbounded", BLOB_TX_POOL_CONFIG)
.addForSender(S1, BLOB, 0, 1, 2, 3, 4, 5, 6)
.expectedPrioritizedForSender(S1, 0)
.expectedReadyForSender(S1, 1, 2, 3)
@ -1351,18 +1312,18 @@ public class LayersTest extends BaseTransactionPoolTest {
return transactionReplacementHandler.shouldReplace(pt1, pt2, mockBlockHeader());
}
static class Scenario extends BaseTransactionPoolTest {
interface TransactionLayersConsumer {
void accept(
LayeredPendingTransactions pending,
AbstractPrioritizedTransactions prioritized,
ReadyTransactions ready,
SparseTransactions sparse,
EvictCollectorLayer dropped);
}
static class Scenario extends BaseTransactionPoolTest implements Runnable {
final String description;
final List<TransactionLayersConsumer> actions = new ArrayList<>();
final TransactionPoolConfiguration poolConfig;
final EvictCollectorLayer dropped;
final SparseTransactions sparse;
final ReadyTransactions ready;
final AbstractPrioritizedTransactions prio;
final LayeredPendingTransactions pending;
final NotificationsChecker notificationsChecker = new NotificationsChecker();
final List<Runnable> actions = new ArrayList<>();
List<PendingTransaction> lastExpectedPrioritized = new ArrayList<>();
List<PendingTransaction> lastExpectedReady = new ArrayList<>();
List<PendingTransaction> lastExpectedSparse = new ArrayList<>();
@ -1374,94 +1335,281 @@ public class LayersTest extends BaseTransactionPoolTest {
Arrays.stream(Sender.values()).forEach(e -> nonceBySender.put(e, 0L));
}
final EnumMap<Sender, Map<Long, PendingTransaction>> txsBySender = new EnumMap<>(Sender.class);
final EnumSet<Sender> sendersWithReorg = EnumSet.noneOf(Sender.class);
final EnumMap<Sender, NavigableMap<Long, PendingTransaction>> liveTxsBySender =
new EnumMap<>(Sender.class);
{
Arrays.stream(Sender.values()).forEach(e -> liveTxsBySender.put(e, new TreeMap<>()));
}
final EnumMap<Sender, NavigableMap<Long, PendingTransaction>> droppedTxsBySender =
new EnumMap<>(Sender.class);
{
Arrays.stream(Sender.values()).forEach(e -> txsBySender.put(e, new HashMap<>()));
Arrays.stream(Sender.values()).forEach(e -> droppedTxsBySender.put(e, new TreeMap<>()));
}
Scenario(final String description) {
this(description, DEFAULT_TX_POOL_CONFIG);
}
Scenario(final String description, final TransactionPoolConfiguration poolConfig) {
this.description = description;
this.poolConfig = poolConfig;
final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem);
this.dropped = new EvictCollectorLayer(txPoolMetrics);
final EthScheduler ethScheduler = new DeterministicEthScheduler();
this.sparse =
new SparseTransactions(
poolConfig,
ethScheduler,
this.dropped,
txPoolMetrics,
(pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2),
new BlobCache());
this.ready =
new ReadyTransactions(
poolConfig,
ethScheduler,
this.sparse,
txPoolMetrics,
(pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2),
new BlobCache());
this.prio =
new BaseFeePrioritizedTransactions(
poolConfig,
LayersTest::mockBlockHeader,
ethScheduler,
this.ready,
txPoolMetrics,
(pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2),
FeeMarket.london(0L),
new BlobCache(),
MiningParameters.newDefault().setMinTransactionGasPrice(MIN_GAS_PRICE));
this.pending = new LayeredPendingTransactions(poolConfig, this.prio, ethScheduler);
this.pending.subscribePendingTransactions(notificationsChecker::collectAddNotification);
this.pending.subscribeDroppedTransactions(notificationsChecker::collectDropNotification);
}
Scenario addForSender(final Sender sender, final long... nonce) {
@Override
public void run() {
actions.forEach(Runnable::run);
assertExpectedPrioritized(prio, lastExpectedPrioritized);
assertExpectedReady(ready, lastExpectedReady);
assertExpectedSparse(sparse, lastExpectedSparse);
assertExpectedDropped(dropped, lastExpectedDropped);
}
public Scenario addForSender(final Sender sender, final long... nonce) {
return addForSender(sender, EIP1559, nonce);
}
Scenario addForSender(final Sender sender, final TransactionType type, final long... nonce) {
Arrays.stream(nonce)
.forEach(
n -> {
final var pendingTx = getOrCreate(sender, type, n);
actions.add(
(pending, prio, ready, sparse, dropped) -> {
public Scenario addForSender(
final Sender sender, final TransactionType type, final long... nonce) {
internalAddForSender(sender, type, nonce);
actions.add(notificationsChecker::assertExpectedNotifications);
return this;
}
private void internalAddForSender(
final Sender sender, final TransactionType type, final long... nonce) {
actions.add(
() -> {
Arrays.stream(nonce)
.forEach(
n -> {
final var pendingTx = create(sender, type, n);
final Account mockSender = mock(Account.class);
when(mockSender.getNonce()).thenReturn(nonceBySender.get(sender));
pending.addTransaction(pendingTx, Optional.of(mockSender));
notificationsChecker.addExpectedAddNotification(pendingTx);
});
// reorg case
if (sendersWithReorg.contains(sender)) {
// reorg is removing and re-adding all sender txs, so assert notifications accordingly
final var currentPendingTxs =
liveTxsBySender.get(sender).tailMap(nonce[nonce.length - 1], false).values();
currentPendingTxs.forEach(
pt -> {
notificationsChecker.addExpectedAddNotification(pt);
notificationsChecker.addExpectedDropNotification(pt);
});
sendersWithReorg.remove(sender);
}
// reconciliation case
final var txsRemovedByReconciliation =
liveTxsBySender.get(sender).headMap(nonceBySender.get(sender), false).values();
if (!txsRemovedByReconciliation.isEmpty()) {
// reconciliation is removing all sender txs, and re-adding only the ones with a
// larger nonce, so assert notifications accordingly
final var reconciledPendingTxs =
liveTxsBySender.get(sender).tailMap(nonce[nonce.length - 1], false).values();
txsRemovedByReconciliation.forEach(notificationsChecker::addExpectedDropNotification);
reconciledPendingTxs.forEach(
pt -> {
notificationsChecker.addExpectedDropNotification(pt);
notificationsChecker.addExpectedAddNotification(pt);
});
txsRemovedByReconciliation.clear();
}
handleDropped();
});
}
private void handleDropped() {
// handle dropped tx due to layer or pool full
final var droppedTxs = dropped.getEvictedTransactions();
droppedTxs.forEach(notificationsChecker::addExpectedDropNotification);
droppedTxs.stream()
.forEach(
pt -> {
liveTxsBySender.get(Sender.getByAddress(pt.getSender())).remove(pt.getNonce());
droppedTxsBySender.get(Sender.getByAddress(pt.getSender())).put(pt.getNonce(), pt);
});
return this;
}
Scenario addForSenders(final Object... args) {
public Scenario addForSenders(final Object... args) {
for (int i = 0; i < args.length; i = i + 2) {
final Sender sender = (Sender) args[i];
final long nonce = (int) args[i + 1];
addForSender(sender, nonce);
internalAddForSender(sender, EIP1559, nonce);
}
actions.add(notificationsChecker::assertExpectedNotifications);
return this;
}
public Scenario confirmedForSenders(final Object... args) {
final Map<Address, Long> maxConfirmedNonceBySender = new HashMap<>();
public Scenario replaceForSender(final Sender sender, final long... nonce) {
internalReplaceForSender(sender, nonce);
actions.add(notificationsChecker::assertExpectedNotifications);
return this;
}
private Scenario internalReplaceForSender(final Sender sender, final long... nonce) {
actions.add(
() -> {
Arrays.stream(nonce)
.forEach(
n -> {
final var maybeExistingTx = getMaybe(sender, n);
maybeExistingTx.ifPresentOrElse(
existingTx -> {
final var pendingTx = replace(sender, existingTx);
final Account mockSender = mock(Account.class);
when(mockSender.getNonce()).thenReturn(nonceBySender.get(sender));
pending.addTransaction(pendingTx, Optional.of(mockSender));
notificationsChecker.addExpectedAddNotification(pendingTx);
notificationsChecker.addExpectedDropNotification(existingTx);
},
() ->
fail(
"Could not replace non-existing transaction with nonce "
+ n
+ " for sender "
+ sender.name()));
});
});
return this;
}
public Scenario replaceForSenders(final Object... args) {
for (int i = 0; i < args.length; i = i + 2) {
final Sender sender = (Sender) args[i];
final long nonce = (int) args[i + 1];
maxConfirmedNonceBySender.put(sender.address, nonce);
setAccountNonce(sender, nonce + 1);
internalReplaceForSender(sender, nonce);
}
actions.add(notificationsChecker::assertExpectedNotifications);
return this;
}
public Scenario confirmedForSenders(final Object... args) {
actions.add(
(pending, prio, ready, sparse, dropped) ->
prio.blockAdded(FeeMarket.london(0L), mockBlockHeader(), maxConfirmedNonceBySender));
() -> {
final Map<Address, Long> maxConfirmedNonceBySender = new HashMap<>();
for (int i = 0; i < args.length; i = i + 2) {
final Sender sender = (Sender) args[i];
final long nonce = (int) args[i + 1];
maxConfirmedNonceBySender.put(sender.address, nonce);
nonceBySender.put(sender, nonce + 1);
for (final var pendingTx : getAll(sender)) {
if (pendingTx.getNonce() <= nonce) {
notificationsChecker.addExpectedDropNotification(
liveTxsBySender.get(sender).remove(pendingTx.getNonce()));
}
}
}
prio.blockAdded(FeeMarket.london(0L), mockBlockHeader(), maxConfirmedNonceBySender);
notificationsChecker.assertExpectedNotifications();
});
return this;
}
Scenario setAccountNonce(final Sender sender, final long nonce) {
actions.add((pending, prio, ready, sparse, dropped) -> nonceBySender.put(sender, nonce));
public Scenario setAccountNonce(final Sender sender, final long nonce) {
actions.add(() -> nonceBySender.put(sender, nonce));
return this;
}
void execute(
final LayeredPendingTransactions pending,
final AbstractPrioritizedTransactions prioritized,
final ReadyTransactions ready,
final SparseTransactions sparse,
final EvictCollectorLayer dropped) {
actions.forEach(action -> action.accept(pending, prioritized, ready, sparse, dropped));
assertExpectedPrioritized(prioritized, lastExpectedPrioritized);
assertExpectedReady(ready, lastExpectedReady);
assertExpectedSparse(sparse, lastExpectedSparse);
assertExpectedDropped(dropped, lastExpectedDropped);
public Scenario reorgForSenders(final Object... args) {
actions.add(
() -> {
for (int i = 0; i < args.length; i = i + 2) {
final Sender sender = (Sender) args[i];
final long nonce = (int) args[i + 1];
nonceBySender.put(sender, nonce);
sendersWithReorg.add(sender);
}
});
return this;
}
private PendingTransaction getOrCreate(
private PendingTransaction create(
final Sender sender, final TransactionType type, final long nonce) {
return txsBySender
.get(sender)
.computeIfAbsent(
nonce,
n ->
switch (type) {
case FRONTIER -> createFrontierPendingTransaction(sender, n);
case ACCESS_LIST -> createAccessListPendingTransaction(sender, n);
case EIP1559 -> createEIP1559PendingTransaction(sender, n);
case BLOB -> createBlobPendingTransaction(sender, n);
case SET_CODE -> throw new UnsupportedOperationException();
});
if (liveTxsBySender.get(sender).containsKey(nonce)) {
fail(
"Transaction for sender " + sender.name() + " with nonce " + nonce + " already exists");
}
final var newPendingTx =
switch (type) {
case FRONTIER -> createFrontierPendingTransaction(sender, nonce);
case ACCESS_LIST -> createAccessListPendingTransaction(sender, nonce);
case EIP1559 -> createEIP1559PendingTransaction(sender, nonce);
case BLOB -> createBlobPendingTransaction(sender, nonce);
case SET_CODE -> throw new UnsupportedOperationException();
};
liveTxsBySender.get(sender).put(nonce, newPendingTx);
return newPendingTx;
}
private PendingTransaction replace(final Sender sender, final PendingTransaction pendingTx) {
final var replaceTx =
createRemotePendingTransaction(
createTransactionReplacement(pendingTx.getTransaction(), sender.key),
sender.hasPriority);
liveTxsBySender.get(sender).replace(pendingTx.getNonce(), replaceTx);
return replaceTx;
}
private Optional<PendingTransaction> getMaybe(final Sender sender, final long nonce) {
return Optional.ofNullable(liveTxsBySender.get(sender).get(nonce));
}
private PendingTransaction get(final Sender sender, final long nonce) {
return txsBySender.get(sender).get(nonce);
return getMaybe(sender, nonce).get();
}
private List<PendingTransaction> getAll(final Sender sender) {
return List.copyOf(liveTxsBySender.get(sender).values());
}
private PendingTransaction createFrontierPendingTransaction(
@ -1489,102 +1637,114 @@ public class LayersTest extends BaseTransactionPoolTest {
}
public Scenario expectedPrioritizedForSender(final Sender sender, final long... nonce) {
lastExpectedPrioritized = expectedForSender(sender, nonce);
final var expectedCopy = List.copyOf(lastExpectedPrioritized);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedPrioritized(prio, expectedCopy));
() -> {
lastExpectedPrioritized = expectedForSender(sender, nonce);
assertExpectedPrioritized(prio, lastExpectedPrioritized);
});
return this;
}
public Scenario expectedReadyForSender(final Sender sender, final long... nonce) {
lastExpectedReady = expectedForSender(sender, nonce);
final var expectedCopy = List.copyOf(lastExpectedReady);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedReady(ready, expectedCopy));
() -> {
lastExpectedReady = expectedForSender(sender, nonce);
assertExpectedReady(ready, lastExpectedReady);
});
return this;
}
public Scenario expectedSparseForSender(final Sender sender, final long... nonce) {
lastExpectedSparse = expectedForSender(sender, nonce);
final var expectedCopy = List.copyOf(lastExpectedSparse);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedSparse(sparse, expectedCopy));
() -> {
lastExpectedSparse = expectedForSender(sender, nonce);
assertExpectedSparse(sparse, lastExpectedSparse);
});
return this;
}
public Scenario expectedDroppedForSender(final Sender sender, final long... nonce) {
lastExpectedDropped = expectedForSender(sender, nonce);
final var expectedCopy = List.copyOf(lastExpectedDropped);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedDropped(dropped, expectedCopy));
() -> {
lastExpectedDropped = droppedForSender(sender, nonce);
assertExpectedDropped(dropped, lastExpectedDropped);
});
return this;
}
public Scenario expectedPrioritizedForSenders(
final Sender sender1, final long nonce1, final Sender sender2, Object... args) {
lastExpectedPrioritized = expectedForSenders(sender1, nonce1, sender2, args);
final var expectedCopy = List.copyOf(lastExpectedPrioritized);
final Sender sender1, final long nonce1, final Sender sender2, final Object... args) {
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedPrioritized(prio, expectedCopy));
() -> {
lastExpectedPrioritized = expectedForSenders(sender1, nonce1, sender2, args);
assertExpectedPrioritized(prio, lastExpectedPrioritized);
});
return this;
}
public Scenario expectedPrioritizedForSenders() {
lastExpectedPrioritized = List.of();
final var expectedCopy = List.copyOf(lastExpectedPrioritized);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedPrioritized(prio, expectedCopy));
() -> {
lastExpectedPrioritized = List.of();
assertExpectedPrioritized(prio, lastExpectedPrioritized);
});
return this;
}
public Scenario expectedReadyForSenders(
final Sender sender1, final long nonce1, final Sender sender2, final Object... args) {
lastExpectedReady = expectedForSenders(sender1, nonce1, sender2, args);
final var expectedCopy = List.copyOf(lastExpectedReady);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedReady(ready, expectedCopy));
() -> {
lastExpectedReady = expectedForSenders(sender1, nonce1, sender2, args);
assertExpectedReady(ready, lastExpectedReady);
});
return this;
}
public Scenario expectedReadyForSenders() {
lastExpectedReady = List.of();
final var expectedCopy = List.copyOf(lastExpectedReady);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedReady(ready, expectedCopy));
() -> {
lastExpectedReady = List.of();
assertExpectedReady(ready, lastExpectedReady);
});
return this;
}
public Scenario expectedSparseForSenders(
final Sender sender1, final long nonce1, final Sender sender2, final Object... args) {
lastExpectedSparse = expectedForSenders(sender1, nonce1, sender2, args);
final var expectedCopy = List.copyOf(lastExpectedSparse);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedSparse(sparse, expectedCopy));
() -> {
lastExpectedSparse = expectedForSenders(sender1, nonce1, sender2, args);
assertExpectedSparse(sparse, lastExpectedSparse);
});
return this;
}
public Scenario expectedSparseForSenders() {
lastExpectedSparse = List.of();
final var expectedCopy = List.copyOf(lastExpectedSparse);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedSparse(sparse, expectedCopy));
() -> {
lastExpectedSparse = List.of();
assertExpectedSparse(sparse, lastExpectedSparse);
});
return this;
}
public Scenario expectedDroppedForSenders(
final Sender sender1, final long nonce1, final Sender sender2, final Object... args) {
lastExpectedDropped = expectedForSenders(sender1, nonce1, sender2, args);
final var expectedCopy = List.copyOf(lastExpectedDropped);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedDropped(dropped, expectedCopy));
() -> {
lastExpectedDropped = expectedForSenders(sender1, nonce1, sender2, args);
assertExpectedDropped(dropped, lastExpectedDropped);
});
return this;
}
public Scenario expectedDroppedForSenders() {
lastExpectedDropped = List.of();
final var expectedCopy = List.copyOf(lastExpectedDropped);
actions.add(
(pending, prio, ready, sparse, dropped) -> assertExpectedDropped(dropped, expectedCopy));
() -> {
lastExpectedDropped = List.of();
assertExpectedDropped(dropped, lastExpectedDropped);
});
return this;
}
@ -1639,59 +1799,74 @@ public class LayersTest extends BaseTransactionPoolTest {
return Arrays.stream(nonce).mapToObj(n -> get(sender, n)).toList();
}
private List<PendingTransaction> droppedForSender(final Sender sender, final long... nonce) {
return Arrays.stream(nonce).mapToObj(n -> droppedTxsBySender.get(sender).get(n)).toList();
}
public Scenario expectedNextNonceForSenders(final Object... args) {
for (int i = 0; i < args.length; i = i + 2) {
final Sender sender = (Sender) args[i];
final Integer nullableInt = (Integer) args[i + 1];
final OptionalLong nonce =
nullableInt == null ? OptionalLong.empty() : OptionalLong.of(nullableInt);
actions.add(
(pending, prio, ready, sparse, dropped) ->
assertThat(prio.getNextNonceFor(sender.address)).isEqualTo(nonce));
actions.add(() -> assertThat(prio.getNextNonceFor(sender.address)).isEqualTo(nonce));
}
return this;
}
public Scenario removeForSender(final Sender sender, final long... nonce) {
Arrays.stream(nonce)
.forEach(
n -> {
final var pendingTx = getOrCreate(sender, EIP1559, n);
actions.add(
(pending, prio, ready, sparse, dropped) -> prio.remove(pendingTx, INVALIDATED));
});
actions.add(
() -> {
Arrays.stream(nonce)
.forEach(
n -> {
final var maybeLiveTx = getMaybe(sender, n);
final var pendingTx = maybeLiveTx.orElseGet(() -> create(sender, EIP1559, n));
prio.remove(pendingTx, INVALIDATED);
maybeLiveTx.ifPresent(
liveTx -> {
notificationsChecker.addExpectedDropNotification(liveTx);
liveTxsBySender.get(sender).remove(liveTx.getNonce());
droppedTxsBySender.get(sender).put(liveTx.getNonce(), liveTx);
});
});
handleDropped();
notificationsChecker.assertExpectedNotifications();
});
return this;
}
public Scenario penalizeForSender(final Sender sender, final long... nonce) {
Arrays.stream(nonce)
.forEach(
n -> {
actions.add(
(pending, prio, ready, sparse, dropped) -> {
final var senderTxs = prio.getAllFor(sender.address);
Arrays.stream(nonce)
.mapToObj(
n2 -> senderTxs.stream().filter(pt -> pt.getNonce() == n2).findAny())
.map(Optional::get)
.forEach(prio::penalize);
});
});
actions.add(
() ->
Arrays.stream(nonce)
.forEach(
n -> {
final var senderTxs = prio.getAllFor(sender.address);
Arrays.stream(nonce)
.mapToObj(
n2 ->
senderTxs.stream().filter(pt -> pt.getNonce() == n2).findAny())
.map(Optional::get)
.forEach(prio::penalize);
}));
return this;
}
public Scenario expectedSelectedTransactions(final Object... args) {
List<PendingTransaction> expectedSelected = new ArrayList<>();
for (int i = 0; i < args.length; i = i + 2) {
final Sender sender = (Sender) args[i];
final long nonce = (int) args[i + 1];
expectedSelected.add(get(sender, nonce));
}
actions.add(
(pending, prio, ready, sparse, dropped) ->
assertThat(prio.getBySender())
.flatExtracting(SenderPendingTransactions::pendingTransactions)
.containsExactlyElementsOf(expectedSelected));
() -> {
List<PendingTransaction> expectedSelected = new ArrayList<>();
for (int i = 0; i < args.length; i = i + 2) {
final Sender sender = (Sender) args[i];
final long nonce = (int) args[i + 1];
expectedSelected.add(get(sender, nonce));
}
assertThat(prio.getBySender())
.flatExtracting(SenderPendingTransactions::pendingTransactions)
.containsExactlyElementsOf(expectedSelected);
});
return this;
}
@ -1721,6 +1896,62 @@ public class LayersTest extends BaseTransactionPoolTest {
this.gasFeeMultiplier = gasFeeMultiplier;
this.hasPriority = hasPriority;
}
static Sender getByAddress(final Address address) {
return Arrays.stream(values()).filter(s -> s.address.equals(address)).findAny().get();
}
}
static class NotificationsChecker {
private final List<Transaction> collectedAddNotifications =
Collections.synchronizedList(new ArrayList<>());
private final List<Transaction> collectedDropNotifications =
Collections.synchronizedList(new ArrayList<>());
private final List<Transaction> expectedAddNotifications = new ArrayList<>();
private final List<Transaction> expectedDropNotifications = new ArrayList<>();
void collectAddNotification(final Transaction tx) {
collectedAddNotifications.add(tx);
}
void collectDropNotification(final Transaction tx) {
collectedDropNotifications.add(tx);
}
void addExpectedAddNotification(final PendingTransaction tx) {
expectedAddNotifications.add(tx.getTransaction());
}
void addExpectedDropNotification(final PendingTransaction tx) {
expectedDropNotifications.add(tx.getTransaction());
}
void assertExpectedNotifications() {
assertAddNotifications(expectedAddNotifications);
assertDropNotifications(expectedDropNotifications);
}
private void assertAddNotifications(final List<Transaction> expectedAddedTxs) {
await()
.untilAsserted(
() ->
assertThat(collectedAddNotifications)
.describedAs("Added notifications")
.containsExactlyInAnyOrderElementsOf(expectedAddedTxs));
collectedAddNotifications.clear();
expectedAddNotifications.clear();
}
private void assertDropNotifications(final List<Transaction> expectedDroppedTxs) {
await()
.untilAsserted(
() ->
assertThat(collectedDropNotifications)
.describedAs("Dropped notifications")
.containsExactlyInAnyOrderElementsOf(expectedDroppedTxs));
collectedDropNotifications.clear();
expectedDropNotifications.clear();
}
}
@Test

@ -16,7 +16,7 @@ package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Loading…
Cancel
Save