Stop evaluating txs for sender after the first skipped one (#5498)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/5522/head
Fabio Di Fabio 2 years ago committed by GitHub
parent cb3e13450b
commit 5fe53cf95c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java
  2. 91
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java

@ -321,31 +321,24 @@ public class LayeredPendingTransactions implements PendingTransactions {
final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);
prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.peek(
highPrioPendingTx ->
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(unused -> !completed.get())
.filter(
.takeWhile(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash()))
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
candidatePendingTx.getNonce() <= highPrioPendingTx.getNonce())
!alreadyChecked.contains(candidatePendingTx.getHash())
&& candidatePendingTx.getNonce() <= highPrioPendingTx.getNonce())
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
@ -366,12 +359,31 @@ public class LayeredPendingTransactions implements PendingTransactions {
if (res.stop()) {
completed.set(true);
}
if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));
invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}
private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
}
@Override
public long maxSize() {
return -1;

@ -19,10 +19,16 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedRes
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.BLOCK_FULL;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.BLOCK_OCCUPANCY_ABOVE_THRESHOLD;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.CURRENT_TX_PRICE_BELOW_MIN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.DATA_PRICE_BELOW_CURRENT_MIN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult.TX_TOO_LARGE_FOR_REMAINING_GAS;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@ -43,7 +49,6 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolReplacementHandler;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionSelectionResult;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.evm.account.Account;
import java.util.ArrayList;
@ -51,9 +56,12 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {
@ -284,8 +292,10 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {
verifyNoMoreInteractions(listener);
}
@Test
public void selectTransactionsUntilSelectorRequestsNoMore() {
@ParameterizedTest
@MethodSource
public void selectTransactionsUntilSelectorRequestsNoMore(
final TransactionSelectionResult selectionResult) {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());
pendingTransactions.addRemoteTransaction(transaction1, Optional.empty());
@ -293,13 +303,17 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {
pendingTransactions.selectTransactions(
transaction -> {
parsedTransactions.add(transaction);
return BLOCK_OCCUPANCY_ABOVE_THRESHOLD;
return selectionResult;
});
assertThat(parsedTransactions.size()).isEqualTo(1);
assertThat(parsedTransactions.get(0)).isEqualTo(transaction0);
}
static Stream<TransactionSelectionResult> selectTransactionsUntilSelectorRequestsNoMore() {
return Stream.of(BLOCK_OCCUPANCY_ABOVE_THRESHOLD, BLOCK_FULL);
}
@Test
public void selectTransactionsUntilPendingIsEmpty() {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());
@ -356,6 +370,42 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {
assertThat(iterationOrder).containsExactly(transaction0, transaction1, transaction2);
}
@ParameterizedTest
@MethodSource
public void ignoreSenderTransactionsAfterASkippedOne(
final TransactionSelectionResult skipSelectionResult) {
final Transaction transaction0a = createTransaction(0, Wei.of(20), KEYS1);
final Transaction transaction1a = createTransaction(1, Wei.of(20), KEYS1);
final Transaction transaction2a = createTransaction(2, Wei.of(20), KEYS1);
final Transaction transaction0b = createTransaction(0, Wei.of(10), KEYS2);
pendingTransactions.addLocalTransaction(transaction0a, Optional.empty());
pendingTransactions.addLocalTransaction(transaction1a, Optional.empty());
pendingTransactions.addLocalTransaction(transaction2a, Optional.empty());
pendingTransactions.addLocalTransaction(transaction0b, Optional.empty());
final List<Transaction> iterationOrder = new ArrayList<>(3);
pendingTransactions.selectTransactions(
transaction -> {
iterationOrder.add(transaction);
// pretending that the 2nd tx of the 1st sender is not selected
return transaction.getNonce() == 1 ? skipSelectionResult : SELECTED;
});
// the 3rd tx of the 1st must not be processed, since the 2nd is skipped
// but the 2nd sender must not be affected
assertThat(iterationOrder).containsExactly(transaction0a, transaction1a, transaction0b);
}
static Stream<TransactionSelectionResult> ignoreSenderTransactionsAfterASkippedOne() {
return Stream.of(
CURRENT_TX_PRICE_BELOW_MIN,
DATA_PRICE_BELOW_CURRENT_MIN,
TX_TOO_LARGE_FOR_REMAINING_GAS,
TransactionSelectionResult.invalidTransient(GAS_PRICE_BELOW_CURRENT_BASE_FEE.name()),
TransactionSelectionResult.invalid(UPFRONT_COST_EXCEEDS_BALANCE.name()));
}
@Test
public void notForceNonceOrderWhenSendersDiffer() {
final Account sender2 = mock(Account.class);
@ -382,19 +432,38 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());
pendingTransactions.addRemoteTransaction(transaction1, Optional.empty());
final List<Transaction> parsedTransactions = new ArrayList<>(2);
final List<Transaction> parsedTransactions = new ArrayList<>(1);
pendingTransactions.selectTransactions(
transaction -> {
parsedTransactions.add(transaction);
return TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name());
return TransactionSelectionResult.invalid(UPFRONT_COST_EXCEEDS_BALANCE.name());
});
assertThat(parsedTransactions.size()).isEqualTo(2);
assertThat(parsedTransactions.get(0)).isEqualTo(transaction0);
assertThat(parsedTransactions.get(1)).isEqualTo(transaction1);
// only the first is processed since not being selected will automatically skip the processing
// all the other txs from the same sender
assertThat(parsedTransactions).containsExactly(transaction0);
assertThat(pendingTransactions.getPendingTransactions())
.map(PendingTransaction::getTransaction)
.containsExactly(transaction1);
}
assertThat(pendingTransactions.size()).isZero();
@Test
public void temporarilyInvalidTransactionIsKeptInPendingTransactions() {
pendingTransactions.addRemoteTransaction(transaction0, Optional.empty());
final List<Transaction> parsedTransactions = new ArrayList<>(1);
pendingTransactions.selectTransactions(
transaction -> {
parsedTransactions.add(transaction);
return TransactionSelectionResult.invalidTransient(
GAS_PRICE_BELOW_CURRENT_BASE_FEE.name());
});
assertThat(parsedTransactions).containsExactly(transaction0);
assertThat(pendingTransactions.getPendingTransactions())
.map(PendingTransaction::getTransaction)
.containsExactly(transaction0);
}
@Test

Loading…
Cancel
Save