diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java index e802ad5864..99a7df59ef 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java @@ -12,13 +12,19 @@ */ package tech.pegasys.pantheon.ethereum.eth.transactions; +import static java.time.Instant.now; + import tech.pegasys.pantheon.ethereum.eth.manager.EthMessage; import tech.pegasys.pantheon.ethereum.eth.manager.EthMessages.MessageCallback; import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage; +import java.time.Duration; +import java.time.Instant; + class TransactionsMessageHandler implements MessageCallback { + private static final Duration TX_KEEP_ALIVE = Duration.ofMinutes(1); private final TransactionsMessageProcessor transactionsMessageProcessor; private final EthScheduler scheduler; @@ -32,9 +38,10 @@ class TransactionsMessageHandler implements MessageCallback { @Override public void exec(final EthMessage message) { final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData()); + final Instant startedAt = now(); scheduler.scheduleTxWorkerTask( () -> transactionsMessageProcessor.processTransactionsMessage( - message.getPeer(), transactionsMessage)); + message.getPeer(), transactionsMessage, startedAt, TX_KEEP_ALIVE)); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java index 7829804676..5a50edcaa3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.transactions; +import static java.time.Instant.now; import static org.apache.logging.log4j.LogManager.getLogger; import tech.pegasys.pantheon.ethereum.core.Transaction; @@ -20,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage; import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.rlp.RLPException; +import java.time.Duration; +import java.time.Instant; import java.util.Iterator; import java.util.Set; @@ -39,6 +42,17 @@ class TransactionsMessageProcessor { } void processTransactionsMessage( + final EthPeer peer, + final TransactionsMessage transactionsMessage, + final Instant startedAt, + final Duration keepAlive) { + // Check if message not expired. + if (startedAt.plus(keepAlive).isAfter(now())) { + this.processTransactionsMessage(peer, transactionsMessage); + } + } + + private void processTransactionsMessage( final EthPeer peer, final TransactionsMessage transactionsMessage) { try { LOG.trace("Received transactions message from {}", peer); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java index 9c29bfffeb..2fcc38c4d9 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java @@ -12,9 +12,13 @@ */ package tech.pegasys.pantheon.ethereum.eth.transactions; +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofMinutes; +import static java.time.Instant.now; import static java.util.Arrays.asList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.Transaction; @@ -41,7 +45,10 @@ public class TransactionsMessageProcessorTest { @Test public void shouldMarkAllReceivedTransactionsAsSeen() { messageHandler.processTransactionsMessage( - peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3))); + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now(), + ofMinutes(1)); verify(transactionTracker) .markTransactionsAsSeen(peer1, ImmutableSet.of(transaction1, transaction2, transaction3)); @@ -50,9 +57,31 @@ public class TransactionsMessageProcessorTest { @Test public void shouldAddReceivedTransactionsToTransactionPool() { messageHandler.processTransactionsMessage( - peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3))); - + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now(), + ofMinutes(1)); verify(transactionPool) .addRemoteTransactions(ImmutableSet.of(transaction1, transaction2, transaction3)); } + + @Test + public void shouldNotMarkReceivedExpiredTransactionsAsSeen() { + messageHandler.processTransactionsMessage( + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now().minus(ofMinutes(1)), + ofMillis(1)); + verifyZeroInteractions(transactionTracker); + } + + @Test + public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() { + messageHandler.processTransactionsMessage( + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now().minus(ofMinutes(1)), + ofMillis(1)); + verifyZeroInteractions(transactionPool); + } }