[PIE-1707] Implement a timeout in TransactionMessageProcessor (#1604)

* [PIE-1707] Implement a timeout in TransactionMessageProcessor

- `processTransactionsMessage` now takes a `keepAlive` parameter
- don't process the message if expired
- add unit tests
- use a default timeout for transactions (1 minute)

* Update ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java

Co-Authored-By: Nicolas MASSART <NicolasMassart@users.noreply.github.com>

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Abdelhamid Bakhta 6 years ago committed by GitHub
parent 8e104b697c
commit 810e326d95
  1. 9
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java
  2. 14
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java
  3. 35
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java

@ -12,13 +12,19 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;
import static java.time.Instant.now;
import tech.pegasys.pantheon.ethereum.eth.manager.EthMessage;
import tech.pegasys.pantheon.ethereum.eth.manager.EthMessages.MessageCallback;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import java.time.Duration;
import java.time.Instant;
class TransactionsMessageHandler implements MessageCallback {
private static final Duration TX_KEEP_ALIVE = Duration.ofMinutes(1);
private final TransactionsMessageProcessor transactionsMessageProcessor;
private final EthScheduler scheduler;
@ -32,9 +38,10 @@ class TransactionsMessageHandler implements MessageCallback {
@Override
public void exec(final EthMessage message) {
final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData());
final Instant startedAt = now();
scheduler.scheduleTxWorkerTask(
() ->
transactionsMessageProcessor.processTransactionsMessage(
message.getPeer(), transactionsMessage));
message.getPeer(), transactionsMessage, startedAt, TX_KEEP_ALIVE));
}
}

@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;
import static java.time.Instant.now;
import static org.apache.logging.log4j.LogManager.getLogger;
import tech.pegasys.pantheon.ethereum.core.Transaction;
@ -20,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Set;
@ -39,6 +42,17 @@ class TransactionsMessageProcessor {
}
void processTransactionsMessage(
final EthPeer peer,
final TransactionsMessage transactionsMessage,
final Instant startedAt,
final Duration keepAlive) {
// Check if message not expired.
if (startedAt.plus(keepAlive).isAfter(now())) {
this.processTransactionsMessage(peer, transactionsMessage);
}
}
private void processTransactionsMessage(
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
LOG.trace("Received transactions message from {}", peer);

@ -12,9 +12,13 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.now;
import static java.util.Arrays.asList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.Transaction;
@ -41,7 +45,10 @@ public class TransactionsMessageProcessorTest {
@Test
public void shouldMarkAllReceivedTransactionsAsSeen() {
messageHandler.processTransactionsMessage(
peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3)));
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now(),
ofMinutes(1));
verify(transactionTracker)
.markTransactionsAsSeen(peer1, ImmutableSet.of(transaction1, transaction2, transaction3));
@ -50,9 +57,31 @@ public class TransactionsMessageProcessorTest {
@Test
public void shouldAddReceivedTransactionsToTransactionPool() {
messageHandler.processTransactionsMessage(
peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3)));
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now(),
ofMinutes(1));
verify(transactionPool)
.addRemoteTransactions(ImmutableSet.of(transaction1, transaction2, transaction3));
}
@Test
public void shouldNotMarkReceivedExpiredTransactionsAsSeen() {
messageHandler.processTransactionsMessage(
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now().minus(ofMinutes(1)),
ofMillis(1));
verifyZeroInteractions(transactionTracker);
}
@Test
public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() {
messageHandler.processTransactionsMessage(
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now().minus(ofMinutes(1)),
ofMillis(1));
verifyZeroInteractions(transactionPool);
}
}

Loading…
Cancel
Save