Reduce lock contention on transaction pool when building a block (#7180)

* Avoid keeping txpool lock during block creation

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Update CHANGELOG

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Remove unneeded synchronized

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

---------

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/7228/head
Fabio Di Fabio 5 months ago committed by GitHub
parent e3e86c7ef6
commit 19d2079377
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 2
      ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java
  3. 3
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/AbstractIsolationTests.java
  4. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
  5. 22
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java
  6. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java
  7. 129
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java
  8. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java
  9. 43
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SenderPendingTransactions.java
  10. 31
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java
  11. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java
  12. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractLayeredTransactionPoolTest.java
  13. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java
  14. 54
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java
  15. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java

@ -27,6 +27,7 @@
- Enable continuous profiling with default setting [#7006](https://github.com/hyperledger/besu/pull/7006)
- A full and up to date implementation of EOF for Prague [#7169](https://github.com/hyperledger/besu/pull/7169)
- Add Subnet-Based Peer Permissions. [#7168](https://github.com/hyperledger/besu/pull/7168)
- Reduce lock contention on transaction pool when building a block [#7180](https://github.com/hyperledger/besu/pull/7180)
### Bug fixes
- Make `eth_gasPrice` aware of the base fee market [#7102](https://github.com/hyperledger/besu/pull/7102)

@ -164,7 +164,7 @@ public class BlockTransactionSelector {
public TransactionSelectionResults buildTransactionListForBlock() {
LOG.atDebug()
.setMessage("Transaction pool stats {}")
.addArgument(blockSelectionContext.transactionPool().logStats())
.addArgument(blockSelectionContext.transactionPool()::logStats)
.log();
timeLimitedSelection();
LOG.atTrace()

@ -136,7 +136,8 @@ public abstract class AbstractIsolationTests {
txPoolMetrics,
transactionReplacementTester,
new BlobCache(),
MiningParameters.newDefault()));
MiningParameters.newDefault()),
ethScheduler);
protected final List<GenesisAccount> accounts =
GenesisConfigFile.fromResource("/dev.json")

@ -357,6 +357,7 @@ public class TransactionPoolFactory {
miningParameters);
}
return new LayeredPendingTransactions(transactionPoolConfiguration, pendingTransactionsSorter);
return new LayeredPendingTransactions(
transactionPoolConfiguration, pendingTransactionsSorter, ethScheduler);
}
}

@ -24,13 +24,13 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;
/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
@ -167,9 +167,25 @@ public abstract class AbstractPrioritizedTransactions extends AbstractSequential
return remainingPromotionsPerType;
}
/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the most profitable tx.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
return orderByFee.descendingSet().stream();
public List<SenderPendingTransactions> getBySender() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}
@Override

@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static java.util.Collections.unmodifiableList;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
@ -54,7 +55,6 @@ import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -138,6 +138,14 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
|| nextLayer.contains(transaction);
}
/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* @return a list of sender pending txs
*/
public abstract List<SenderPendingTransactions> getBySender();
@Override
public List<PendingTransaction> getAll() {
final List<PendingTransaction> allNextLayers = nextLayer.getAll();
@ -548,17 +556,17 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
return priorityTxs;
}
Stream<PendingTransaction> stream(final Address sender) {
return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream();
}
@Override
public List<PendingTransaction> getAllFor(final Address sender) {
return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList();
public synchronized List<PendingTransaction> getAllFor(final Address sender) {
final var fromNextLayers = nextLayer.getAllFor(sender);
final var fromThisLayer = txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values();
final var concatLayers =
new ArrayList<PendingTransaction>(fromThisLayer.size() + fromNextLayers.size());
concatLayers.addAll(fromThisLayer);
concatLayers.addAll(fromNextLayers);
return unmodifiableList(concatLayers);
}
abstract Stream<PendingTransaction> stream();
@Override
public int count() {
return pendingTransactions.size() + nextLayer.count();

@ -27,6 +27,7 @@ import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
@ -41,13 +42,10 @@ import org.hyperledger.besu.plugin.data.TransactionSelectionResult;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@ -63,12 +61,15 @@ public class LayeredPendingTransactions implements PendingTransactions {
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
private final TransactionPoolConfiguration poolConfig;
private final AbstractPrioritizedTransactions prioritizedTransactions;
private final EthScheduler ethScheduler;
public LayeredPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final AbstractPrioritizedTransactions prioritizedTransactions) {
final AbstractPrioritizedTransactions prioritizedTransactions,
final EthScheduler ethScheduler) {
this.poolConfig = poolConfig;
this.prioritizedTransactions = prioritizedTransactions;
this.ethScheduler = ethScheduler;
}
@Override
@ -311,79 +312,57 @@ public class LayeredPendingTransactions implements PendingTransactions {
}
@Override
// There's a small edge case here we could encounter.
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
// of that new type into our pool.
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
// block could end up with transactions of the new type.
// This seems like it would be very rare but worth it to document that we don't handle that case
// right now.
public synchronized void selectTransactions(
final PendingTransactions.TransactionSelector selector) {
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);
prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(
candidatePendingTx ->
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash())
&& Long.compareUnsigned(
candidatePendingTx.getNonce(), highPrioPendingTx.getNonce())
<= 0)
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
final var res = selector.evaluateTransaction(candidatePendingTx);
LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(res)
.addArgument(candidatePendingTx::toTraceLog)
.log();
if (res.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, res);
}
if (res.stop()) {
completed.set(true);
}
if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));
invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}
private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
final List<SenderPendingTransactions> candidateTxsBySender;
synchronized (this) {
// since selecting transactions for block creation is a potential long operation
// we want to avoid to keep the lock for all the process, but we just lock to get
// the candidate transactions
candidateTxsBySender = prioritizedTransactions.getBySender();
}
selection:
for (final var senderTxs : candidateTxsBySender) {
LOG.trace("highPrioSenderTxs {}", senderTxs);
for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);
LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(selectionResult)
.addArgument(candidatePendingTx::toTraceLog)
.log();
if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, selectionResult);
}
if (selectionResult.stop()) {
LOG.trace("Stopping selection");
break selection;
}
if (!selectionResult.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
break;
}
}
}
ethScheduler.scheduleTxWorkerTask(
() ->
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
}));
}
@Override

@ -38,7 +38,6 @@ import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
@ -137,11 +136,24 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer {
return true;
}
/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the tx with the highest max gas
* price.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
public List<SenderPendingTransactions> getBySender() {
return orderByMaxFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.flatMap(sender -> txsBySender.get(sender).values().stream());
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}
@Override

@ -0,0 +1,43 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import java.util.List;
import java.util.stream.Collectors;
/**
* A list of pending transactions of a specific sender, ordered by nonce asc
*
* @param sender the sender
* @param pendingTransactions the list of pending transactions order by nonce asc
*/
public record SenderPendingTransactions(
Address sender, List<PendingTransaction> pendingTransactions) {
@Override
public String toString() {
return "Sender "
+ sender
+ " has "
+ pendingTransactions.size()
+ " pending transactions "
+ pendingTransactions.stream()
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(",", "[", "]"));
}
}

@ -44,15 +44,19 @@ import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.Iterables;
public class SparseTransactions extends AbstractTransactionsLayer {
/**
* Order sparse tx by priority flag and sequence asc, so that we pick for eviction txs that have
* no priority and with the lowest sequence number (oldest) first.
*/
private final NavigableSet<PendingTransaction> sparseEvictionOrder =
new TreeSet<>(
Comparator.comparing(PendingTransaction::hasPriority)
.thenComparing(PendingTransaction::getSequence));
private final Map<Address, Integer> gapBySender = new HashMap<>();
private final List<SendersByPriority> orderByGap;
@ -220,7 +224,8 @@ public class SparseTransactions extends AbstractTransactionsLayer {
}
@Override
public void remove(final PendingTransaction invalidatedTx, final RemovalReason reason) {
public synchronized void remove(
final PendingTransaction invalidatedTx, final RemovalReason reason) {
final var senderTxs = txsBySender.get(invalidatedTx.getSender());
if (senderTxs != null && senderTxs.containsKey(invalidatedTx.getNonce())) {
@ -312,9 +317,27 @@ public class SparseTransactions extends AbstractTransactionsLayer {
return false;
}
/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the tx that will be evicted as last.
* So for example if the same sender has the first and the last txs in the eviction order, it will
* be the first in the returned list, since we give precedence to tx that will be evicted later.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
return sparseEvictionOrder.descendingSet().stream();
public List<SenderPendingTransactions> getBySender() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return sparseEvictionOrder.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}
@Override

@ -41,8 +41,6 @@ public interface TransactionsLayer {
boolean contains(Transaction transaction);
List<PendingTransaction> getAll();
TransactionAddedResult add(PendingTransaction pendingTransaction, int gap);
void remove(PendingTransaction pendingTransaction, RemovalReason reason);
@ -52,6 +50,10 @@ public interface TransactionsLayer {
BlockHeader blockHeader,
final Map<Address, Long> maxConfirmedNonceBySender);
List<PendingTransaction> getAll();
List<PendingTransaction> getAllFor(Address sender);
List<Transaction> getAllLocal();
List<Transaction> getAllPriority();
@ -93,8 +95,6 @@ public interface TransactionsLayer {
String logSender(Address sender);
List<PendingTransaction> getAllFor(Address sender);
enum RemovalReason {
CONFIRMED,
CROSS_LAYER_REPLACED,

@ -58,7 +58,8 @@ public abstract class AbstractLayeredTransactionPoolTest extends AbstractTransac
return new LayeredPendingTransactions(
poolConfig,
createPrioritizedTransactions(
poolConfig, readyLayer, txPoolMetrics, transactionReplacementTester));
poolConfig, readyLayer, txPoolMetrics, transactionReplacementTester),
ethScheduler);
}
protected abstract AbstractPrioritizedTransactions createPrioritizedTransactions(

@ -170,14 +170,16 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {
senderLimitedLayers = createLayers(senderLimitedConfig);
smallLayers = createLayers(smallPoolConfig);
pendingTransactions = new LayeredPendingTransactions(poolConf, layers.prioritizedTransactions);
pendingTransactions =
new LayeredPendingTransactions(poolConf, layers.prioritizedTransactions, ethScheduler);
senderLimitedTransactions =
new LayeredPendingTransactions(
senderLimitedConfig, senderLimitedLayers.prioritizedTransactions);
senderLimitedConfig, senderLimitedLayers.prioritizedTransactions, ethScheduler);
smallPendingTransactions =
new LayeredPendingTransactions(smallPoolConfig, smallLayers.prioritizedTransactions);
new LayeredPendingTransactions(
smallPoolConfig, smallLayers.prioritizedTransactions, ethScheduler);
}
@Test

@ -199,7 +199,7 @@ public class LayersTest extends BaseTransactionPoolTest {
MiningParameters.newDefault().setMinTransactionGasPrice(MIN_GAS_PRICE));
final LayeredPendingTransactions pendingTransactions =
new LayeredPendingTransactions(poolConfig, prioritizedTransactions);
new LayeredPendingTransactions(poolConfig, prioritizedTransactions, ethScheduler);
scenario.execute(
pendingTransactions,
@ -306,7 +306,7 @@ public class LayersTest extends BaseTransactionPoolTest {
Arguments.of(
new Scenario("fill sparse 2")
.addForSender(S1, 5, 3, 2)
.expectedSparseForSender(S1, 5, 3, 2)),
.expectedSparseForSender(S1, 2, 3, 5)),
Arguments.of(
new Scenario("overflow sparse 1")
.addForSender(S1, 1, 2, 3, 4)
@ -315,13 +315,13 @@ public class LayersTest extends BaseTransactionPoolTest {
Arguments.of(
new Scenario("overflow sparse 2")
.addForSender(S1, 4, 2, 3, 1)
.expectedSparseForSender(S1, 2, 3, 1)
.expectedSparseForSender(S1, 1, 2, 3)
.expectedDroppedForSender(S1, 4)),
Arguments.of(
new Scenario("overflow sparse 3")
.addForSender(S1, 0, 4, 2, 3, 5)
.expectedPrioritizedForSender(S1, 0)
.expectedSparseForSender(S1, 4, 2, 3)
.expectedSparseForSender(S1, 2, 3, 4)
.expectedDroppedForSender(S1, 5)));
}
@ -334,7 +334,7 @@ public class LayersTest extends BaseTransactionPoolTest {
Arguments.of(
new Scenario("add first sparse")
.addForSenders(S1, 1, S2, 2)
.expectedSparseForSenders(S1, 1, S2, 2)),
.expectedSparseForSenders(S2, 2, S1, 1)),
Arguments.of(
new Scenario("fill prioritized 1")
.addForSender(S1, 0, 1, 2)
@ -357,11 +357,11 @@ public class LayersTest extends BaseTransactionPoolTest {
.addForSenders(S1, 2, S2, 1)
.expectedPrioritizedForSenders()
.expectedReadyForSenders()
.expectedSparseForSenders(S1, 2, S2, 1)
.expectedSparseForSenders(S2, 1, S1, 2)
.addForSenders(S2, 2, S1, 0)
.expectedPrioritizedForSender(S1, 0)
.expectedReadyForSenders()
.expectedSparseForSenders(S1, 2, S2, 1, S2, 2)
.expectedSparseForSenders(S2, 1, S2, 2, S1, 2)
.addForSenders(S1, 1)
.expectedPrioritizedForSenders(S1, 0, S1, 1, S1, 2)
.expectedReadyForSenders()
@ -431,15 +431,15 @@ public class LayersTest extends BaseTransactionPoolTest {
.addForSenders(S2, 0, S3, 2, S1, 1)
.expectedPrioritizedForSender(S2, 0)
.expectedReadyForSenders()
.expectedSparseForSenders(S3, 2, S1, 1)
.expectedSparseForSenders(S1, 1, S3, 2)
.addForSenders(S2, 1)
.expectedPrioritizedForSenders(S2, 0, S2, 1)
.expectedReadyForSenders()
.expectedSparseForSenders(S3, 2, S1, 1)
.expectedSparseForSenders(S1, 1, S3, 2)
.addForSenders(S3, 0)
.expectedPrioritizedForSenders(S3, 0, S2, 0, S2, 1)
.expectedReadyForSenders()
.expectedSparseForSenders(S3, 2, S1, 1)
.expectedSparseForSenders(S1, 1, S3, 2)
.addForSenders(S1, 0)
.expectedPrioritizedForSenders(S3, 0, S2, 0, S2, 1)
.expectedReadyForSenders(S1, 0, S1, 1)
@ -452,7 +452,7 @@ public class LayersTest extends BaseTransactionPoolTest {
.addForSenders(S4, 0, S4, 1, S3, 3)
.expectedPrioritizedForSenders(S4, 0, S4, 1, S3, 0)
.expectedReadyForSenders(S3, 1, S2, 0, S2, 1)
.expectedSparseForSenders(S3, 2, S1, 1, S1, 0)
.expectedSparseForSenders(S1, 0, S1, 1, S3, 2)
// ToDo: non optimal discard, worth to improve?
.expectedDroppedForSender(S3, 3)),
Arguments.of(
@ -813,7 +813,7 @@ public class LayersTest extends BaseTransactionPoolTest {
Arguments.of(
new Scenario("out of order sequence with gap 1")
.addForSender(S1, 2, 1)
.expectedSparseForSender(S1, 2, 1)
.expectedSparseForSender(S1, 1, 2)
.expectedNextNonceForSenders(S1, null)),
Arguments.of(
new Scenario("out of order sequence with gap 2")
@ -969,7 +969,7 @@ public class LayersTest extends BaseTransactionPoolTest {
Arguments.of(
new Scenario("out of order sequence with gap 1")
.addForSender(S1, 2, 1)
.expectedSparseForSender(S1, 2, 1)
.expectedSparseForSender(S1, 1, 2)
.expectedSelectedTransactions()),
Arguments.of(
new Scenario("out of order sequence with gap 2")
@ -1073,8 +1073,7 @@ public class LayersTest extends BaseTransactionPoolTest {
.setAccountNonce(S1, 5)
.addForSender(S1, 7)
.expectedPrioritizedForSenders()
// remember that sparse are checked by oldest first
.expectedSparseForSender(S1, 8, 9, 7)));
.expectedSparseForSender(S1, 7, 8, 9)));
}
static Stream<Arguments> providerPrioritySenders() {
@ -1195,7 +1194,7 @@ public class LayersTest extends BaseTransactionPoolTest {
.addForSender(S3, 0)
.expectedSparseForSender(S3, 0)
.addForSender(SP1, 0)
.expectedSparseForSenders(S3, 0, SP1, 0)
.expectedSparseForSenders(SP1, 0, S3, 0)
.confirmedForSenders(SP2, 0)
.expectedPrioritizedForSender(SP2, 1, 2, 3)
.expectedReadyForSenders(SP2, 4, SP2, 5, SP1, 0)
@ -1510,23 +1509,26 @@ public class LayersTest extends BaseTransactionPoolTest {
private void assertExpectedPrioritized(
final AbstractPrioritizedTransactions prioLayer, final List<PendingTransaction> expected) {
assertThat(prioLayer.stream()).describedAs("Prioritized").containsExactlyElementsOf(expected);
assertThat(prioLayer.getBySender())
.describedAs("Prioritized")
.flatExtracting(SenderPendingTransactions::pendingTransactions)
.containsExactlyElementsOf(expected);
}
private void assertExpectedReady(
final ReadyTransactions readyLayer, final List<PendingTransaction> expected) {
assertThat(readyLayer.stream()).describedAs("Ready").containsExactlyElementsOf(expected);
assertThat(readyLayer.getBySender())
.describedAs("Ready")
.flatExtracting(SenderPendingTransactions::pendingTransactions)
.containsExactlyElementsOf(expected);
}
private void assertExpectedSparse(
final SparseTransactions sparseLayer, final List<PendingTransaction> expected) {
// sparse txs are returned from the most recent to the oldest, so reverse it to make writing
// scenarios easier
final var sortedExpected = new ArrayList<>(expected);
Collections.reverse(sortedExpected);
assertThat(sparseLayer.stream())
assertThat(sparseLayer.getBySender())
.describedAs("Sparse")
.containsExactlyElementsOf(sortedExpected);
.flatExtracting(SenderPendingTransactions::pendingTransactions)
.containsExactlyElementsOf(expected);
}
private void assertExpectedDropped(
@ -1587,7 +1589,9 @@ public class LayersTest extends BaseTransactionPoolTest {
}
actions.add(
(pending, prio, ready, sparse, dropped) ->
assertThat(prio.stream()).containsExactlyElementsOf(expectedSelected));
assertThat(prio.getBySender())
.flatExtracting(SenderPendingTransactions::pendingTransactions)
.containsExactlyElementsOf(expectedSelected));
return this;
}

@ -129,7 +129,7 @@ public class ReplayTest {
final AbstractPrioritizedTransactions prioritizedTransactions =
createLayers(poolConfig, txPoolMetrics, baseFeeMarket);
final LayeredPendingTransactions pendingTransactions =
new LayeredPendingTransactions(poolConfig, prioritizedTransactions);
new LayeredPendingTransactions(poolConfig, prioritizedTransactions, ethScheduler);
br.lines()
.forEach(
line -> {

Loading…
Cancel
Save