diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java index 9ee51cb575..22b862ffb9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java @@ -55,6 +55,8 @@ import org.apache.logging.log4j.Logger; public class TransactionPool implements BlockAddedObserver { private static final Logger LOG = getLogger(); + public static final int DEFAULT_TX_MSG_KEEP_ALIVE = 60; + private static final long SYNC_TOLERANCE = 100L; private static final String REMOTE = "remote"; private static final String LOCAL = "local"; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java index 5e19095fd0..c088237b9b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java @@ -34,7 +34,8 @@ public class TransactionPoolFactory { final MetricsSystem metricsSystem, final SyncState syncState, final int maxTransactionRetentionHours, - final Wei minTransactionGasPrice) { + final Wei minTransactionGasPrice, + final int txMessageKeepAliveSeconds) { final PendingTransactions pendingTransactions = new PendingTransactions( @@ -65,7 +66,8 @@ public class TransactionPoolFactory { metricsSystem.createCounter( PantheonMetricCategory.TRANSACTION_POOL, "transactions_messages_skipped_total", - "Total number of transactions messages skipped by the processor."))); + "Total number of transactions messages skipped by the processor.")), + txMessageKeepAliveSeconds); ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler); protocolContext.getBlockchain().observeBlockAdded(transactionPool); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java index 99a7df59ef..255039eab2 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java @@ -24,15 +24,17 @@ import java.time.Instant; class TransactionsMessageHandler implements MessageCallback { - private static final Duration TX_KEEP_ALIVE = Duration.ofMinutes(1); private final TransactionsMessageProcessor transactionsMessageProcessor; private final EthScheduler scheduler; + private final Duration txMsgKeepAlive; public TransactionsMessageHandler( final EthScheduler scheduler, - final TransactionsMessageProcessor transactionsMessageProcessor) { + final TransactionsMessageProcessor transactionsMessageProcessor, + final int txMsgKeepAliveSeconds) { this.scheduler = scheduler; this.transactionsMessageProcessor = transactionsMessageProcessor; + this.txMsgKeepAlive = Duration.ofSeconds(txMsgKeepAliveSeconds); } @Override @@ -42,6 +44,6 @@ class TransactionsMessageHandler implements MessageCallback { scheduler.scheduleTxWorkerTask( () -> transactionsMessageProcessor.processTransactionsMessage( - message.getPeer(), transactionsMessage, startedAt, TX_KEEP_ALIVE)); + message.getPeer(), transactionsMessage, startedAt, txMsgKeepAlive)); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java index 6d61ea8f04..a43d016234 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java @@ -52,6 +52,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage; import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions; +import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPoolFactory; import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -1082,7 +1083,8 @@ public final class EthProtocolManagerTest { metricsSystem, mock(SyncState.class), PendingTransactions.DEFAULT_TX_RETENTION_HOURS, - Wei.ZERO); + Wei.ZERO, + TransactionPool.DEFAULT_TX_MSG_KEEP_ALIVE); // Send just a transaction message. final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {}); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java index 6069078953..a13e2f32fd 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java @@ -152,7 +152,8 @@ public class TestNode implements Closeable { metricsSystem, syncState, PendingTransactions.DEFAULT_TX_RETENTION_HOURS, - Wei.ZERO); + Wei.ZERO, + TransactionPool.DEFAULT_TX_MSG_KEEP_ALIVE); networkRunner.start(); selfPeer = DefaultPeer.fromEnodeURL(network.getLocalEnode().get()); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java index 5ed6432c44..57dab4c131 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java @@ -51,6 +51,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.SyncMode; import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; import tech.pegasys.pantheon.ethereum.eth.sync.TrailingPeerRequirements; import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions; +import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool; import tech.pegasys.pantheon.ethereum.graphql.GraphQLConfiguration; import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration; import tech.pegasys.pantheon.ethereum.jsonrpc.RpcApi; @@ -566,6 +567,13 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { arity = "1") private final Integer pendingTxRetentionPeriod = PendingTransactions.DEFAULT_TX_RETENTION_HOURS; + @Option( + names = {"--tx-pool-keep-alive-seconds"}, + paramLabel = MANDATORY_INTEGER_FORMAT_HELP, + description = "Keep alive of transactions in seconds (default: ${DEFAULT-VALUE})", + arity = "1") + private final Integer txMessageKeepAliveSeconds = TransactionPool.DEFAULT_TX_MSG_KEEP_ALIVE; + // Inner class so we can get to loggingLevel. public class PantheonExceptionHandler extends CommandLine.AbstractHandler, PantheonExceptionHandler> @@ -817,6 +825,7 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { new MiningParameters(coinbase, minTransactionGasPrice, extraData, isMiningEnabled)) .maxPendingTransactions(txPoolMaxSize) .pendingTransactionRetentionPeriod(pendingTxRetentionPeriod) + .txMessageKeepAliveSeconds(txMessageKeepAliveSeconds) .nodePrivateKeyFile(nodePrivateKeyFile()) .metricsSystem(metricsSystem.get()) .privacyParameters(privacyParameters()) diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java index b39bc88962..d56e312110 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java @@ -74,6 +74,7 @@ public abstract class PantheonControllerBuilder { protected Clock clock; protected Integer maxPendingTransactions; protected Integer pendingTransactionRetentionPeriod; + protected Integer txMessageKeepAliveSeconds = TransactionPool.DEFAULT_TX_MSG_KEEP_ALIVE; protected KeyPair nodeKeys; private StorageProvider storageProvider; private final List shutdownActions = new ArrayList<>(); @@ -159,6 +160,12 @@ public abstract class PantheonControllerBuilder { return this; } + public PantheonControllerBuilder txMessageKeepAliveSeconds( + final int txMessageKeepAliveSeconds) { + this.txMessageKeepAliveSeconds = txMessageKeepAliveSeconds; + return this; + } + public PantheonController build() throws IOException { checkNotNull(genesisConfig, "Missing genesis config"); checkNotNull(syncConfig, "Missing sync config"); @@ -235,7 +242,8 @@ public abstract class PantheonControllerBuilder { metricsSystem, syncState, pendingTransactionRetentionPeriod, - miningParameters.getMinTransactionGasPrice()); + miningParameters.getMinTransactionGasPrice(), + txMessageKeepAliveSeconds); final MiningCoordinator miningCoordinator = createMiningCoordinator( diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/cli/CommandTestAbstract.java b/pantheon/src/test/java/tech/pegasys/pantheon/cli/CommandTestAbstract.java index 896021b72e..ecc142a263 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/cli/CommandTestAbstract.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/cli/CommandTestAbstract.java @@ -134,6 +134,8 @@ public abstract class CommandTestAbstract { when(mockControllerBuilder.maxPendingTransactions(anyInt())).thenReturn(mockControllerBuilder); when(mockControllerBuilder.pendingTransactionRetentionPeriod(anyInt())) .thenReturn(mockControllerBuilder); + when(mockControllerBuilder.txMessageKeepAliveSeconds(anyInt())) + .thenReturn(mockControllerBuilder); when(mockControllerBuilder.nodePrivateKeyFile(any())).thenReturn(mockControllerBuilder); when(mockControllerBuilder.metricsSystem(any())).thenReturn(mockControllerBuilder); when(mockControllerBuilder.privacyParameters(any())).thenReturn(mockControllerBuilder); diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java index f09a98f8cb..440e6d98d5 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/cli/PantheonCommandTest.java @@ -2621,4 +2621,27 @@ public class PantheonCommandTest extends CommandTestAbstract { assertThat(commandOutput.toString()).isEmpty(); assertThat(commandErrorOutput.toString()).isEmpty(); } + + @Test + public void txMessageKeepAliveSeconds() { + final int txMessageKeepAliveSeconds = 999; + parseCommand("--tx-pool-keep-alive-seconds", String.valueOf(txMessageKeepAliveSeconds)); + + verify(mockControllerBuilder).txMessageKeepAliveSeconds(intArgumentCaptor.capture()); + verify(mockControllerBuilder).txMessageKeepAliveSeconds(eq(txMessageKeepAliveSeconds)); + + assertThat(commandOutput.toString()).isEmpty(); + assertThat(commandErrorOutput.toString()).isEmpty(); + } + + @Test + public void txMessageKeepAliveSecondsWithInvalidInputShouldFail() { + parseCommand("--tx-pool-keep-alive-seconds", "acbd"); + + verifyZeroInteractions(mockRunnerBuilder); + + assertThat(commandOutput.toString()).isEmpty(); + assertThat(commandErrorOutput.toString()) + .contains("Invalid value for option '--tx-pool-keep-alive-seconds': 'acbd' is not an int"); + } } diff --git a/pantheon/src/test/resources/everything_config.toml b/pantheon/src/test/resources/everything_config.toml index ec13a72937..7fbb1d441e 100644 --- a/pantheon/src/test/resources/everything_config.toml +++ b/pantheon/src/test/resources/everything_config.toml @@ -97,3 +97,4 @@ privacy-precompiled-address=9 tx-pool-retention-hours=999 tx-pool-max-size=1234 +tx-pool-keep-alive-seconds=60 \ No newline at end of file