Skip seen transactions (#3626)

The currently way Besu has to spot if a transaction has already been process, is to look if the transaction is present in the transaction pool, that by default is 4K, while the amount of pending transactions on the mainnet, is much more, the order of hundred of thousands, so basically even if a transaction has been already processed, the chances that it gets reprocessed is very high, with the result of doing a lot of useless work, that affects Besu performance.

A trivial solution could be to just raise the transaction pool size, but that is not always advisable, because it is critical for block production to keep it fast, and incresing its size could negatively affect the perfomance of the strategy choosen to select transactions to include in the block.

A better option, implemented here, is to leverage data that we already have, and that keeps the history of the transactions exchanged with other peers. This data is just a collection of transaction hashes that we have received or seen, and in any case if a transaction is in that collection, it means that it has already been processed by Besu, so it is possible to directly skip it.

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/3636/head
Fabio Di Fabio 3 years ago committed by GitHub
parent fa0e9eb556
commit 754fa26a5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 98
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java
  3. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetPooledTransactionsFromPeerTask.java
  4. 42
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java
  5. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java
  6. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
  7. 48
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessor.java
  8. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java
  9. 62
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java
  10. 39
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java
  11. 22
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessorTest.java

@ -11,6 +11,7 @@
### Additions and Improvements
- Tune transaction synchronization parameter to adapt to mainnet traffic [#3610](https://github.com/hyperledger/besu/pull/3610)
- Improve eth/66 support [#3616](https://github.com/hyperledger/besu/pull/3616)
- Avoid reprocessing remote transactions already seen [#3626](https://github.com/hyperledger/besu/pull/3626)
## 22.1.2

@ -15,63 +15,117 @@
package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("UnstableApiUsage")
public class BufferedGetPooledTransactionsFromPeerFetcher {
private static final Logger LOG =
LoggerFactory.getLogger(BufferedGetPooledTransactionsFromPeerFetcher.class);
private static final int MAX_HASHES = 256;
private static final String HASHES = "hashes";
private final TransactionPool transactionPool;
private final PeerTransactionTracker transactionTracker;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final EthPeer peer;
private final NewPooledTransactionHashesMessageProcessor processor;
private final Queue<Hash> txAnnounces;
private final Counter alreadySeenTransactionsCounter;
public BufferedGetPooledTransactionsFromPeerFetcher(
final EthPeer peer, final NewPooledTransactionHashesMessageProcessor processor) {
final EthContext ethContext,
final EthPeer peer,
final TransactionPool transactionPool,
final PeerTransactionTracker transactionTracker,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.peer = peer;
this.processor = processor;
this.transactionPool = transactionPool;
this.transactionTracker = transactionTracker;
this.metricsSystem = metricsSystem;
this.txAnnounces = Queues.synchronizedQueue(EvictingQueue.create(MAX_PENDING_TRANSACTIONS));
this.alreadySeenTransactionsCounter =
metricsSystem
.createLabelledCounter(
BesuMetricCategory.TRANSACTION_POOL,
"remote_already_seen_total",
"Total number of received transactions already seen",
"source")
.labels(HASHES);
}
public void requestTransactions() {
for (List<Hash> txAnnounces = getTxAnnounces();
!txAnnounces.isEmpty();
txAnnounces = getTxAnnounces()) {
List<Hash> txHashesAnnounced;
while (!(txHashesAnnounced = getTxHashesAnnounced()).isEmpty()) {
final GetPooledTransactionsFromPeerTask task =
GetPooledTransactionsFromPeerTask.forHashes(
processor.getEthContext(), txAnnounces, processor.getMetricsSystem());
GetPooledTransactionsFromPeerTask.forHashes(ethContext, txHashesAnnounced, metricsSystem);
task.assignPeer(peer);
processor
.getEthContext()
ethContext
.getScheduler()
.scheduleSyncWorkerTask(task)
.thenAccept(
result -> processor.getTransactionPool().addRemoteTransactions(result.getResult()));
result -> {
List<Transaction> retrievedTransactions = result.getResult();
transactionTracker.markTransactionsAsSeen(peer, retrievedTransactions);
traceLambda(
LOG,
"Got {} transactions of {} hashes requested from peer {}",
retrievedTransactions::size,
task.getTransactionHashes()::size,
peer::toString);
transactionPool.addRemoteTransactions(retrievedTransactions);
});
}
}
public void addHash(final Hash hash) {
txAnnounces.add(hash);
public void addHashes(final Collection<Hash> hashes) {
txAnnounces.addAll(hashes);
}
private List<Hash> getTxAnnounces() {
List<Hash> retrieved = new ArrayList<>();
while (retrieved.size() < MAX_HASHES && !txAnnounces.isEmpty()) {
final Hash txAnnounce = txAnnounces.poll();
if (processor.getTransactionPool().getTransactionByHash(txAnnounce).isEmpty()) {
retrieved.add(txAnnounce);
private List<Hash> getTxHashesAnnounced() {
final List<Hash> toRetrieve = new ArrayList<>(MAX_HASHES);
int discarded = 0;
while (toRetrieve.size() < MAX_HASHES && !txAnnounces.isEmpty()) {
final Hash txHashAnnounced = txAnnounces.poll();
if (!transactionTracker.hasSeenTransaction(txHashAnnounced)) {
toRetrieve.add(txHashAnnounced);
} else {
discarded++;
}
}
return retrieved;
final int alreadySeenCount = discarded;
alreadySeenTransactionsCounter.inc(alreadySeenCount);
traceLambda(
LOG,
"Transaction hashes to request from peer {}, fresh count {}, already seen count {}",
peer::toString,
toRetrieve::size,
() -> alreadySeenCount);
return toRetrieve;
}
}

@ -43,7 +43,7 @@ public class GetPooledTransactionsFromPeerTask extends AbstractPeerRequestTask<L
private GetPooledTransactionsFromPeerTask(
final EthContext ethContext, final List<Hash> hashes, final MetricsSystem metricsSystem) {
super(ethContext, EthPV65.GET_POOLED_TRANSACTIONS, metricsSystem);
this.hashes = new ArrayList<>(hashes);
this.hashes = List.copyOf(hashes);
}
public static GetPooledTransactionsFromPeerTask forHashes(
@ -51,6 +51,10 @@ public class GetPooledTransactionsFromPeerTask extends AbstractPeerRequestTask<L
return new GetPooledTransactionsFromPeerTask(ethContext, hashes, metricsSystem);
}
public List<Hash> getTransactionHashes() {
return hashes;
}
@Override
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMess
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
@ -33,6 +34,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +51,7 @@ public class NewPooledTransactionHashesMessageProcessor {
scheduledTasks;
private final PeerTransactionTracker transactionTracker;
private final Counter totalSkippedTransactionsMessageCounter;
private final Counter totalSkippedNewPooledTransactionHashesMessageCounter;
private final TransactionPool transactionPool;
private final TransactionPoolConfiguration transactionPoolConfiguration;
private final EthContext ethContext;
@ -60,7 +62,6 @@ public class NewPooledTransactionHashesMessageProcessor {
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration,
final Counter metricsCounter,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final SyncState syncState) {
@ -70,12 +71,15 @@ public class NewPooledTransactionHashesMessageProcessor {
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.syncState = syncState;
this.totalSkippedTransactionsMessageCounter =
this.totalSkippedNewPooledTransactionHashesMessageCounter =
new RunnableCounter(
metricsCounter,
metricsSystem.createCounter(
BesuMetricCategory.TRANSACTION_POOL,
"new_pooled_transaction_hashes_messages_skipped_total",
"Total number of new pooled transaction hashes messages skipped by the processor."),
() ->
LOG.warn(
"{} expired transaction messages have been skipped.",
"{} expired new pooled transaction hashes messages have been skipped.",
SKIPPED_MESSAGES_LOGGING_THRESHOLD),
SKIPPED_MESSAGES_LOGGING_THRESHOLD);
this.scheduledTasks = new ConcurrentHashMap<>();
@ -90,7 +94,7 @@ public class NewPooledTransactionHashesMessageProcessor {
if (startedAt.plus(keepAlive).isAfter(now())) {
this.processNewPooledTransactionHashesMessage(peer, transactionsMessage);
} else {
totalSkippedTransactionsMessageCounter.inc();
totalSkippedNewPooledTransactionHashesMessageCounter.inc();
}
}
@ -99,7 +103,6 @@ public class NewPooledTransactionHashesMessageProcessor {
final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) {
try {
final List<Hash> incomingTransactionHashes = transactionsMessage.pendingTransactions();
transactionTracker.markTransactionHashesAsSeen(peer, incomingTransactionHashes);
traceLambda(
LOG,
@ -118,14 +121,15 @@ public class NewPooledTransactionHashesMessageProcessor {
.scheduleFutureTask(
new FetcherCreatorTask(peer),
transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod());
return new BufferedGetPooledTransactionsFromPeerFetcher(peer, this);
return new BufferedGetPooledTransactionsFromPeerFetcher(
ethContext, peer, transactionPool, transactionTracker, metricsSystem);
});
for (final Hash hash : incomingTransactionHashes) {
if (transactionPool.getTransactionByHash(hash).isEmpty()) {
bufferedTask.addHash(hash);
}
}
bufferedTask.addHashes(
incomingTransactionHashes.stream()
.filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
.collect(Collectors.toList()));
}
} catch (final RLPException ex) {
if (peer != null) {
@ -136,18 +140,6 @@ public class NewPooledTransactionHashesMessageProcessor {
}
}
public TransactionPool getTransactionPool() {
return transactionPool;
}
public EthContext getEthContext() {
return ethContext;
}
public MetricsSystem getMetricsSystem() {
return metricsSystem;
}
public class FetcherCreatorTask implements Runnable {
final EthPeer peer;

@ -64,6 +64,10 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
}
}
public boolean hasSeenTransaction(final Hash txHash) {
return seenTransactions.values().stream().anyMatch(seen -> seen.contains(txHash));
}
private Set<Hash> getOrCreateSeenTransactionsForPeer(final EthPeer peer) {
return seenTransactions.computeIfAbsent(peer, key -> createTransactionsSet());
}

@ -26,7 +26,6 @@ import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTran
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.time.Clock;
@ -96,16 +95,11 @@ public class TransactionPoolFactory {
miningParameters,
metricsSystem,
transactionPoolConfiguration);
final TransactionsMessageHandler transactionsMessageHandler =
new TransactionsMessageHandler(
ethContext.getScheduler(),
new TransactionsMessageProcessor(
transactionTracker,
transactionPool,
metricsSystem.createCounter(
BesuMetricCategory.TRANSACTION_POOL,
"transactions_messages_skipped_total",
"Total number of transactions messages skipped by the processor.")),
new TransactionsMessageProcessor(transactionTracker, transactionPool, metricsSystem),
transactionPoolConfiguration.getTxMessageKeepAliveSeconds());
ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler);
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler =
@ -115,10 +109,6 @@ public class TransactionPoolFactory {
transactionTracker,
transactionPool,
transactionPoolConfiguration,
metricsSystem.createCounter(
BesuMetricCategory.TRANSACTION_POOL,
"pending_transactions_messages_skipped_total",
"Total number of pending transactions messages skipped by the processor."),
ethContext,
metricsSystem,
syncState),

@ -23,38 +23,56 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TransactionsMessageProcessor {
private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000;
private static final Logger LOG = LoggerFactory.getLogger(TransactionsMessageProcessor.class);
private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000;
private static final String TRANSACTIONS = "transactions";
private final PeerTransactionTracker transactionTracker;
private final TransactionPool transactionPool;
private final Counter totalSkippedTransactionsMessageCounter;
private final Counter alreadySeenTransactionsCounter;
public TransactionsMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final Counter metricsCounter) {
final MetricsSystem metricsSystem) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.totalSkippedTransactionsMessageCounter =
new RunnableCounter(
metricsCounter,
metricsSystem.createCounter(
BesuMetricCategory.TRANSACTION_POOL,
"transactions_messages_skipped_total",
"Total number of transactions messages skipped by the processor."),
() ->
LOG.warn(
"{} expired transaction messages have been skipped.",
SKIPPED_MESSAGES_LOGGING_THRESHOLD),
SKIPPED_MESSAGES_LOGGING_THRESHOLD);
alreadySeenTransactionsCounter =
metricsSystem
.createLabelledCounter(
BesuMetricCategory.TRANSACTION_POOL,
"remote_already_seen_total",
"Total number of received transactions already seen",
"source")
.labels(TRANSACTIONS);
}
void processTransactionsMessage(
@ -62,7 +80,7 @@ class TransactionsMessageProcessor {
final TransactionsMessage transactionsMessage,
final Instant startedAt,
final Duration keepAlive) {
// Check if message not expired.
// Check if message is not expired.
if (startedAt.plus(keepAlive).isAfter(now())) {
this.processTransactionsMessage(peer, transactionsMessage);
} else {
@ -74,16 +92,24 @@ class TransactionsMessageProcessor {
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
final List<Transaction> incomingTransactions = transactionsMessage.transactions();
final Collection<Transaction> freshTransactions = skipSeenTransactions(incomingTransactions);
transactionTracker.markTransactionsAsSeen(peer, incomingTransactions);
alreadySeenTransactionsCounter.inc(
(long) incomingTransactions.size() - freshTransactions.size());
traceLambda(
LOG,
"Received transactions message from {}, incoming transactions {}, incoming list {}",
"Received transactions message from {}, incoming transactions {}, incoming list {}"
+ ", fresh transactions {}, fresh list {}",
peer::toString,
incomingTransactions::size,
() -> toHashList(incomingTransactions));
() -> toHashList(incomingTransactions),
freshTransactions::size,
() -> toHashList(freshTransactions));
transactionPool.addRemoteTransactions(freshTransactions);
transactionPool.addRemoteTransactions(incomingTransactions);
} catch (final RLPException ex) {
if (peer != null) {
LOG.debug("Malformed transaction message received, disconnecting: {}", peer, ex);
@ -91,4 +117,10 @@ class TransactionsMessageProcessor {
}
}
}
private Collection<Transaction> skipSeenTransactions(final List<Transaction> inTransactions) {
return inTransactions.stream()
.filter(tx -> !transactionTracker.hasSeenTransaction(tx.getHash()))
.collect(Collectors.toUnmodifiableList());
}
}

@ -86,7 +86,6 @@ public abstract class AbstractPendingTransactionsSorter {
protected final LabelledMetric<Counter> transactionRemovedCounter;
protected final Counter localTransactionAddedCounter;
protected final Counter remoteTransactionAddedCounter;
protected final Counter localTransactionHashesAddedCounter;
protected final long maxPendingTransactions;
protected final TransactionPoolReplacementHandler transactionReplacementHandler;
@ -112,7 +111,6 @@ public abstract class AbstractPendingTransactionsSorter {
"source");
localTransactionAddedCounter = transactionAddedCounter.labels("local");
remoteTransactionAddedCounter = transactionAddedCounter.labels("remote");
localTransactionHashesAddedCounter = transactionAddedCounter.labels("pool");
transactionRemovedCounter =
metricsSystem.createLabelledCounter(

@ -14,8 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -29,21 +29,18 @@ import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.metrics.StubMetricsSystem;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@ -51,51 +48,57 @@ import org.mockito.junit.MockitoJUnitRunner;
public class BufferedGetPooledTransactionsFromPeerFetcherTest {
@Mock EthPeer ethPeer;
@Mock NewPooledTransactionHashesMessageProcessor processor;
@Mock TransactionPool transactionPool;
@Mock EthContext ethContext;
@Mock EthScheduler ethScheduler;
@InjectMocks BufferedGetPooledTransactionsFromPeerFetcher fetcher;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final BlockDataGenerator generator = new BlockDataGenerator();
private BufferedGetPooledTransactionsFromPeerFetcher fetcher;
private StubMetricsSystem metricsSystem;
private PeerTransactionTracker transactionTracker;
@Before
public void setup() {
when(processor.getTransactionPool()).thenReturn(transactionPool);
when(processor.getMetricsSystem()).thenReturn(metricsSystem);
when(processor.getEthContext()).thenReturn(ethContext);
metricsSystem = new StubMetricsSystem();
transactionTracker = new PeerTransactionTracker();
when(ethContext.getScheduler()).thenReturn(ethScheduler);
fetcher =
new BufferedGetPooledTransactionsFromPeerFetcher(
ethContext, ethPeer, transactionPool, transactionTracker, metricsSystem);
}
@Test
public void requestTransactionShouldStartTaskWhenUnknownTransaction() {
final Hash hash = generator.transaction().getHash();
final List<Transaction> taskResult = Collections.singletonList(Transaction.builder().build());
final Transaction transaction = generator.transaction();
final Hash hash = transaction.getHash();
final List<Transaction> taskResult = List.of(transaction);
final AbstractPeerTask.PeerTaskResult<List<Transaction>> peerTaskResult =
new AbstractPeerTask.PeerTaskResult<>(ethPeer, taskResult);
when(ethScheduler.scheduleSyncWorkerTask(any(GetPooledTransactionsFromPeerTask.class)))
.thenReturn(CompletableFuture.completedFuture(peerTaskResult));
fetcher.addHash(hash);
fetcher.addHashes(List.of(hash));
fetcher.requestTransactions();
verify(ethScheduler).scheduleSyncWorkerTask(any(GetPooledTransactionsFromPeerTask.class));
verifyNoMoreInteractions(ethScheduler);
verify(transactionPool, times(1)).addRemoteTransactions(taskResult);
assertThat(transactionTracker.hasSeenTransaction(hash)).isTrue();
}
@Test
public void requestTransactionShouldSplitRequestIntoSeveralTasks() {
for (int i = 0; i < 257; i++) {
fetcher.addHash(generator.transaction().getHash());
}
fetcher.addHashes(
IntStream.range(0, 257)
.mapToObj(unused -> generator.transaction().getHash())
.collect(Collectors.toList()));
final AbstractPeerTask.PeerTaskResult<List<Transaction>> peerTaskResult =
new AbstractPeerTask.PeerTaskResult<>(ethPeer, new ArrayList<>());
new AbstractPeerTask.PeerTaskResult<>(ethPeer, List.of());
when(ethScheduler.scheduleSyncWorkerTask(any(GetPooledTransactionsFromPeerTask.class)))
.thenReturn(CompletableFuture.completedFuture(peerTaskResult));
@ -107,16 +110,17 @@ public class BufferedGetPooledTransactionsFromPeerFetcherTest {
}
@Test
public void requestTransactionShouldNotStartTaskWhenTransactionAlreadyInPool() {
public void requestTransactionShouldNotStartTaskWhenTransactionAlreadySeen() {
final Hash hash = generator.transaction().getHash();
when(transactionPool.getTransactionByHash(hash))
.thenReturn(Optional.of(Transaction.builder().build()));
final Transaction transaction = generator.transaction();
final Hash hash = transaction.getHash();
transactionTracker.markTransactionHashesAsSeen(ethPeer, List.of(hash));
fetcher.addHash(hash);
fetcher.addHashes(List.of(hash));
fetcher.requestTransactions();
verifyNoInteractions(ethScheduler);
verify(transactionPool, never()).addRemoteTransactions(anyList());
verify(transactionPool, never()).addRemoteTransactions(List.of(transaction));
assertThat(metricsSystem.getCounterValue("remote_already_seen_total", "hashes")).isEqualTo(1);
}
}

@ -18,6 +18,7 @@ import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.now;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
@ -36,11 +37,9 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor.FetcherCreatorTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.metrics.StubMetricsSystem;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
@ -56,49 +55,36 @@ public class NewPooledTransactionHashesMessageProcessorTest {
@Mock private TransactionPool transactionPool;
@Mock private TransactionPoolConfiguration transactionPoolConfiguration;
@Mock private PeerTransactionTracker transactionTracker;
@Mock private Counter totalSkippedTransactionsMessageCounter;
@Mock private EthPeer peer1;
@Mock private MetricsSystem metricsSystem;
@Mock private SyncState syncState;
@Mock private EthContext ethContext;
@Mock private EthScheduler ethScheduler;
private NewPooledTransactionHashesMessageProcessor messageHandler;
private final BlockDataGenerator generator = new BlockDataGenerator();
private final Hash hash1 = generator.transaction().getHash();
private final Hash hash2 = generator.transaction().getHash();
private final Hash hash3 = generator.transaction().getHash();
private NewPooledTransactionHashesMessageProcessor messageHandler;
private StubMetricsSystem metricsSystem;
@Before
public void setup() {
metricsSystem = new StubMetricsSystem();
when(transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod())
.thenReturn(Duration.ofMillis(500));
messageHandler =
new NewPooledTransactionHashesMessageProcessor(
transactionTracker,
transactionPool,
transactionPoolConfiguration,
totalSkippedTransactionsMessageCounter,
ethContext,
metricsSystem,
syncState);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
}
@Test
public void shouldMarkAllReceivedTransactionsAsSeen() {
messageHandler.processNewPooledTransactionHashesMessage(
peer1,
NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)),
now(),
ofMinutes(1));
verify(transactionTracker)
.markTransactionHashesAsSeen(peer1, Arrays.asList(hash1, hash2, hash3));
verifyNoMoreInteractions(transactionTracker);
}
@Test
public void shouldAddInitiatedRequestingTransactions() {
when(syncState.isInSync(anyLong())).thenReturn(true);
@ -130,7 +116,6 @@ public class NewPooledTransactionHashesMessageProcessorTest {
now(),
ofMinutes(1));
// verify(transactionPool).addTransactionHash(hash3);
verify(transactionPool).getTransactionByHash(hash1);
verify(transactionPool).getTransactionByHash(hash2);
verify(transactionPool).getTransactionByHash(hash3);
@ -157,8 +142,9 @@ public class NewPooledTransactionHashesMessageProcessorTest {
now().minus(ofMinutes(1)),
ofMillis(1));
verifyNoInteractions(transactionTracker);
verify(totalSkippedTransactionsMessageCounter).inc(1);
verifyNoMoreInteractions(totalSkippedTransactionsMessageCounter);
assertThat(
metricsSystem.getCounterValue("new_pooled_transaction_hashes_messages_skipped_total"))
.isEqualTo(1);
}
@Test
@ -169,8 +155,9 @@ public class NewPooledTransactionHashesMessageProcessorTest {
now().minus(ofMinutes(1)),
ofMillis(1));
verifyNoInteractions(transactionPool);
verify(totalSkippedTransactionsMessageCounter).inc(1);
verifyNoMoreInteractions(totalSkippedTransactionsMessageCounter);
assertThat(
metricsSystem.getCounterValue("new_pooled_transaction_hashes_messages_skipped_total"))
.isEqualTo(1);
}
@Test

@ -18,6 +18,7 @@ import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.now;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@ -25,11 +26,11 @@ import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.metrics.StubMetricsSystem;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@ -38,15 +39,24 @@ public class TransactionsMessageProcessorTest {
@Mock private TransactionPool transactionPool;
@Mock private PeerTransactionTracker transactionTracker;
@Mock private Counter totalSkippedTransactionsMessageCounter;
@Mock private EthPeer peer1;
@InjectMocks private TransactionsMessageProcessor messageHandler;
private final BlockDataGenerator generator = new BlockDataGenerator();
private final Transaction transaction1 = generator.transaction();
private final Transaction transaction2 = generator.transaction();
private final Transaction transaction3 = generator.transaction();
private TransactionsMessageProcessor messageHandler;
private StubMetricsSystem metricsSystem;
@Before
public void setup() {
metricsSystem = new StubMetricsSystem();
messageHandler =
new TransactionsMessageProcessor(transactionTracker, transactionPool, metricsSystem);
}
@Test
public void shouldMarkAllReceivedTransactionsAsSeen() {
messageHandler.processTransactionsMessage(
@ -77,7 +87,7 @@ public class TransactionsMessageProcessorTest {
now().minus(ofMinutes(1)),
ofMillis(1));
verifyNoInteractions(transactionTracker);
verify(totalSkippedTransactionsMessageCounter).inc(1);
assertThat(metricsSystem.getCounterValue("transactions_messages_skipped_total")).isEqualTo(1);
}
@Test
@ -88,6 +98,6 @@ public class TransactionsMessageProcessorTest {
now().minus(ofMinutes(1)),
ofMillis(1));
verifyNoInteractions(transactionPool);
verify(totalSkippedTransactionsMessageCounter).inc(1);
assertThat(metricsSystem.getCounterValue("transactions_messages_skipped_total")).isEqualTo(1);
}
}

Loading…
Cancel
Save