From cdefb330be1ad25119e312618803479a12f28de7 Mon Sep 17 00:00:00 2001 From: Rob Dawson Date: Thu, 28 Mar 2019 20:25:15 +1000 Subject: [PATCH] [PAN-2327] Notify of dropped messages (#1156) * Added ability to subscribe to dropped transactions from the transaction pending pool. Implemented subscription webservice to support this. * Added metrics to the pending transactions, tracking the number of local and remote transactions in the pool. * Converted listener management in pending transactions to use the Subscribers util object. Signed-off-by: Adrian Sutton --- consensus/clique/build.gradle | 1 + .../blockcreation/CliqueBlockCreatorTest.java | 11 +- .../CliqueMinerExecutorTest.java | 7 +- consensus/ibft/build.gradle | 2 + .../ibft/support/TestContextBuilder.java | 5 +- .../blockcreation/IbftBlockCreatorTest.java | 5 +- consensus/ibftlegacy/build.gradle | 1 + .../blockcreation/IbftBlockCreatorTest.java | 5 +- ethereum/blockcreation/build.gradle | 2 +- .../BlockTransactionSelectorTest.java | 28 +++-- .../EthHashBlockCreatorTest.java | 5 +- .../EthHashMinerExecutorTest.java | 7 +- .../PendingTransactionDroppedListener.java | 19 +++ .../ethereum/core/PendingTransactions.java | 67 +++++++++-- .../ethereum/core/TransactionPool.java | 6 +- .../core/PendingTransactionsTest.java | 41 ++++++- .../ethereum/core/TransactionPoolTest.java | 5 +- .../transactions/TransactionPoolFactory.java | 6 +- .../eth/manager/EthProtocolManagerTest.java | 5 +- .../ethereum/eth/transactions/TestNode.java | 5 +- .../EthGetFilterChangesIntegrationTest.java | 6 +- ...TransactionDroppedSubscriptionService.java | 52 +++++++++ ...PendingTransactionSubscriptionService.java | 4 +- .../request/SubscriptionType.java | 3 + ...sactionDroppedSubscriptionServiceTest.java | 109 ++++++++++++++++++ .../pantheon/metrics/MetricCategory.java | 3 +- .../tech/pegasys/pantheon/RunnerBuilder.java | 4 + .../controller/CliquePantheonController.java | 3 +- .../IbftLegacyPantheonController.java | 3 +- .../controller/IbftPantheonController.java | 7 +- .../controller/MainnetPantheonController.java | 3 +- 31 files changed, 385 insertions(+), 45 deletions(-) create mode 100644 ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java create mode 100644 ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java create mode 100644 ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java diff --git a/consensus/clique/build.gradle b/consensus/clique/build.gradle index ade80f4f63..6fd9c2c209 100644 --- a/consensus/clique/build.gradle +++ b/consensus/clique/build.gradle @@ -47,6 +47,7 @@ dependencies { testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts') testImplementation project(path: ':consensus:common', configuration: 'testArtifacts') testImplementation project(':testutil') + testImplementation project(':metrics') testImplementation 'junit:junit' testImplementation 'org.assertj:assertj-core' diff --git a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java index d2c38451ef..420cf37d36 100644 --- a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java +++ b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java @@ -45,6 +45,8 @@ import tech.pegasys.pantheon.ethereum.core.Util; import tech.pegasys.pantheon.ethereum.core.Wei; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -60,6 +62,7 @@ public class CliqueBlockCreatorTest { private final Address proposerAddress = Util.publicKeyToAddress(proposerKeyPair.getPublicKey()); private final KeyPair otherKeyPair = KeyPair.generate(); private final List
validatorList = Lists.newArrayList(); + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private ProtocolSchedule protocolSchedule; private final WorldStateArchive stateArchive = createInMemoryWorldStateArchive(); @@ -113,7 +116,7 @@ public class CliqueBlockCreatorTest { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed()), + new PendingTransactions(5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, @@ -140,7 +143,7 @@ public class CliqueBlockCreatorTest { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed()), + new PendingTransactions(5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, @@ -166,7 +169,7 @@ public class CliqueBlockCreatorTest { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed()), + new PendingTransactions(5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, @@ -195,7 +198,7 @@ public class CliqueBlockCreatorTest { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed()), + new PendingTransactions(5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, diff --git a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java index 666b8bda6a..6a09ada16c 100644 --- a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java +++ b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java @@ -37,6 +37,8 @@ import tech.pegasys.pantheon.ethereum.core.PendingTransactions; import tech.pegasys.pantheon.ethereum.core.PrivacyParameters; import tech.pegasys.pantheon.ethereum.core.Util; import tech.pegasys.pantheon.ethereum.core.Wei; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -58,6 +60,7 @@ public class CliqueMinerExecutorTest { private final List
validatorList = Lists.newArrayList(); private ProtocolContext cliqueProtocolContext; private BlockHeaderTestFixture blockHeaderBuilder; + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Before public void setup() { @@ -89,7 +92,7 @@ public class CliqueMinerExecutorTest { Executors.newSingleThreadExecutor(), CliqueProtocolSchedule.create( GENESIS_CONFIG_OPTIONS, proposerKeyPair, PrivacyParameters.noPrivacy()), - new PendingTransactions(1, TestClock.fixed()), + new PendingTransactions(1, TestClock.fixed(), metricsSystem), proposerKeyPair, new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false), mock(CliqueBlockScheduler.class), @@ -120,7 +123,7 @@ public class CliqueMinerExecutorTest { Executors.newSingleThreadExecutor(), CliqueProtocolSchedule.create( GENESIS_CONFIG_OPTIONS, proposerKeyPair, PrivacyParameters.noPrivacy()), - new PendingTransactions(1, TestClock.fixed()), + new PendingTransactions(1, TestClock.fixed(), metricsSystem), proposerKeyPair, new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false), mock(CliqueBlockScheduler.class), diff --git a/consensus/ibft/build.gradle b/consensus/ibft/build.gradle index e1fb27ca95..34c1e19027 100644 --- a/consensus/ibft/build.gradle +++ b/consensus/ibft/build.gradle @@ -47,7 +47,9 @@ dependencies { testImplementation project(path: ':config:', configuration: 'testSupportArtifacts') testImplementation project(path: ':consensus:common', configuration: 'testArtifacts') testImplementation project(':testutil') + testImplementation project(':metrics') + integrationTestImplementation project(':metrics') integrationTestImplementation 'junit:junit' integrationTestImplementation 'org.assertj:assertj-core' integrationTestImplementation 'org.mockito:mockito-core' diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java index b52316c661..c6edaf368c 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java @@ -65,6 +65,8 @@ import tech.pegasys.pantheon.ethereum.core.Util; import tech.pegasys.pantheon.ethereum.core.Wei; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.Subscribers; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -84,6 +86,7 @@ import java.util.stream.Collectors; import com.google.common.collect.Iterables; public class TestContextBuilder { + private static MetricsSystem metricsSystem = new NoOpMetricsSystem(); private static class ControllerAndState { @@ -283,7 +286,7 @@ public class TestContextBuilder { final IbftBlockCreatorFactory blockCreatorFactory = new IbftBlockCreatorFactory( (gasLimit) -> gasLimit, - new PendingTransactions(1, clock), // changed from IbftPantheonController + new PendingTransactions(1, clock, metricsSystem), // changed from IbftPantheonController protocolContext, protocolSchedule, miningParams, diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java index f9efa6e71f..889fa2dbdf 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java @@ -38,6 +38,8 @@ import tech.pegasys.pantheon.ethereum.core.Wei; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -50,6 +52,7 @@ import com.google.common.collect.Lists; import org.junit.Test; public class IbftBlockCreatorTest { + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test public void createdBlockPassesValidationRulesAndHasAppropriateHashAndMixHash() { @@ -91,7 +94,7 @@ public class IbftBlockCreatorTest { 0, initialValidatorList) .encode(), - new PendingTransactions(1, TestClock.fixed()), + new PendingTransactions(1, TestClock.fixed(), metricsSystem), protContext, protocolSchedule, parentGasLimit -> parentGasLimit, diff --git a/consensus/ibftlegacy/build.gradle b/consensus/ibftlegacy/build.gradle index 19e5bde444..ad7ac76e33 100644 --- a/consensus/ibftlegacy/build.gradle +++ b/consensus/ibftlegacy/build.gradle @@ -32,6 +32,7 @@ dependencies { testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts') testImplementation project(path: ':consensus:ibft', configuration: 'testSupportArtifacts') testImplementation project(':testutil') + testImplementation project(':metrics') testImplementation 'junit:junit' testImplementation 'org.assertj:assertj-core' diff --git a/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java b/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java index a753a06845..3f48518a51 100644 --- a/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java +++ b/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java @@ -38,6 +38,8 @@ import tech.pegasys.pantheon.ethereum.core.Wei; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -50,6 +52,7 @@ import com.google.common.collect.Lists; import org.junit.Test; public class IbftBlockCreatorTest { + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test public void headerProducedPassesValidationRules() { @@ -96,7 +99,7 @@ public class IbftBlockCreatorTest { null, initialValidatorList) .encode(), - new PendingTransactions(1, TestClock.fixed()), + new PendingTransactions(1, TestClock.fixed(), metricsSystem), protContext, protocolSchedule, parentGasLimit -> parentGasLimit, diff --git a/ethereum/blockcreation/build.gradle b/ethereum/blockcreation/build.gradle index 63e682930e..3ea5ee1b89 100644 --- a/ethereum/blockcreation/build.gradle +++ b/ethereum/blockcreation/build.gradle @@ -27,7 +27,7 @@ dependencies { testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts') testImplementation project(path: ':ethereum:core', configuration: 'testArtifacts') testImplementation project(':testutil') - + testImplementation project(':metrics') testImplementation 'junit:junit' testImplementation 'org.assertj:assertj-core' testImplementation 'org.awaitility:awaitility' diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java index c52d9d28a5..1287283049 100644 --- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java +++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java @@ -46,6 +46,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult; import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage; import tech.pegasys.pantheon.ethereum.vm.TestBlockchain; import tech.pegasys.pantheon.ethereum.worldstate.DefaultMutableWorldState; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -61,6 +63,7 @@ import org.junit.Test; public class BlockTransactionSelectorTest { private static final KeyPair keyPair = KeyPair.generate(); + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test public void emptyPendingTransactionsResultsInEmptyVettingResult() { @@ -71,7 +74,9 @@ public class BlockTransactionSelectorTest { final TransactionProcessor transactionProcessor = protocolSchedule.getByBlockNumber(0).getTransactionProcessor(); final DefaultMutableWorldState worldState = inMemoryWorldState(); - final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed()); + + final PendingTransactions pendingTransactions = + new PendingTransactions(5, TestClock.fixed(), metricsSystem); final Supplier isCancelled = () -> false; final ProcessableBlockHeader blockHeader = @@ -108,7 +113,8 @@ public class BlockTransactionSelectorTest { @Test public void failedTransactionsAreIncludedInTheBlock() { - final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed()); + final PendingTransactions pendingTransactions = + new PendingTransactions(5, TestClock.fixed(), metricsSystem); final Transaction transaction = createTransaction(1); pendingTransactions.addRemoteTransaction(transaction); @@ -159,7 +165,8 @@ public class BlockTransactionSelectorTest { @Test public void invalidTransactionsTransactionProcessingAreSkippedButBlockStillFills() { - final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed()); + final PendingTransactions pendingTransactions = + new PendingTransactions(5, TestClock.fixed(), metricsSystem); final List transactionsToInject = Lists.newArrayList(); for (int i = 0; i < 5; i++) { @@ -221,7 +228,8 @@ public class BlockTransactionSelectorTest { @Test public void subsetOfPendingTransactionsIncludedWhenBlockGasLimitHit() { - final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed()); + final PendingTransactions pendingTransactions = + new PendingTransactions(5, TestClock.fixed(), metricsSystem); final List transactionsToInject = Lists.newArrayList(); // Transactions are reported in reverse order. @@ -286,7 +294,8 @@ public class BlockTransactionSelectorTest { @Test public void transactionOfferingGasPriceLessThanMinimumIsIdentifiedAndRemovedFromPending() { - final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed()); + final PendingTransactions pendingTransactions = + new PendingTransactions(5, TestClock.fixed(), metricsSystem); final Blockchain blockchain = new TestBlockchain(); @@ -330,7 +339,8 @@ public class BlockTransactionSelectorTest { @Test public void transactionTooLargeForBlockDoesNotPreventMoreBeingAddedIfBlockOccupancyNotReached() { - final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed()); + final PendingTransactions pendingTransactions = + new PendingTransactions(5, TestClock.fixed(), metricsSystem); final Blockchain blockchain = new TestBlockchain(); final DefaultMutableWorldState worldState = inMemoryWorldState(); final Supplier isCancelled = () -> false; @@ -400,7 +410,8 @@ public class BlockTransactionSelectorTest { @Test public void transactionSelectionStopsWhenSufficientBlockOccupancyIsReached() { - final PendingTransactions pendingTransactions = new PendingTransactions(10, TestClock.fixed()); + final PendingTransactions pendingTransactions = + new PendingTransactions(10, TestClock.fixed(), metricsSystem); final Blockchain blockchain = new TestBlockchain(); final DefaultMutableWorldState worldState = inMemoryWorldState(); final Supplier isCancelled = () -> false; @@ -481,7 +492,8 @@ public class BlockTransactionSelectorTest { @Test public void shouldDiscardTransactionsThatFailValidation() { - final PendingTransactions pendingTransactions = new PendingTransactions(10, TestClock.fixed()); + final PendingTransactions pendingTransactions = + new PendingTransactions(10, TestClock.fixed(), metricsSystem); final TransactionProcessor transactionProcessor = mock(TransactionProcessor.class); final Blockchain blockchain = new TestBlockchain(); final DefaultMutableWorldState worldState = inMemoryWorldState(); diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java index 5bb405a96c..4a53a940c0 100644 --- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java +++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java @@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.EthHashSolver; import tech.pegasys.pantheon.ethereum.mainnet.EthHasher.Light; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolScheduleBuilder; import tech.pegasys.pantheon.ethereum.mainnet.ValidationTestUtils; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -44,6 +46,7 @@ public class EthHashBlockCreatorTest { private static final BytesValue BLOCK_1_EXTRA_DATA = BytesValue.fromHexString("0x476574682f76312e302e302f6c696e75782f676f312e342e32"); + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private final ExecutionContextTestFixture executionContextTestFixture = ExecutionContextTestFixture.builder() @@ -63,7 +66,7 @@ public class EthHashBlockCreatorTest { new EthHashBlockCreator( BLOCK_1_COINBASE, parent -> BLOCK_1_EXTRA_DATA, - new PendingTransactions(1, TestClock.fixed()), + new PendingTransactions(1, TestClock.fixed(), metricsSystem), executionContextTestFixture.getProtocolContext(), executionContextTestFixture.getProtocolSchedule(), gasLimit -> gasLimit, diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java index fd88a672a4..7e011febe3 100644 --- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java +++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java @@ -17,6 +17,8 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import tech.pegasys.pantheon.ethereum.core.MiningParameters; import tech.pegasys.pantheon.ethereum.core.MiningParametersTestBuilder; import tech.pegasys.pantheon.ethereum.core.PendingTransactions; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.Subscribers; @@ -25,6 +27,7 @@ import java.util.concurrent.Executors; import org.junit.Test; public class EthHashMinerExecutorTest { + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test public void startingMiningWithoutCoinbaseThrowsException() { @@ -36,7 +39,7 @@ public class EthHashMinerExecutorTest { null, Executors.newCachedThreadPool(), null, - new PendingTransactions(1, TestClock.fixed()), + new PendingTransactions(1, TestClock.fixed(), metricsSystem), miningParameters, new DefaultBlockScheduler(1, 10, TestClock.fixed())); @@ -54,7 +57,7 @@ public class EthHashMinerExecutorTest { null, Executors.newCachedThreadPool(), null, - new PendingTransactions(1, TestClock.fixed()), + new PendingTransactions(1, TestClock.fixed(), metricsSystem), miningParameters, new DefaultBlockScheduler(1, 10, TestClock.fixed())); diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java new file mode 100644 index 0000000000..7927461307 --- /dev/null +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java @@ -0,0 +1,19 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.core; + +@FunctionalInterface +public interface PendingTransactionDroppedListener { + + void onTransactionDropped(Transaction transaction); +} diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java index 4541be7136..d3bc017708 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java @@ -12,13 +12,17 @@ */ package tech.pegasys.pantheon.ethereum.core; -import static java.util.Collections.newSetFromMap; import static java.util.Comparator.comparing; +import tech.pegasys.pantheon.metrics.Counter; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.MetricCategory; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.util.Subscribers; + import java.time.Clock; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,7 +33,6 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -51,28 +54,57 @@ public class PendingTransactions { private final Map> transactionsBySender = new HashMap<>(); - private final Collection listeners = - newSetFromMap(new ConcurrentHashMap<>()); + private final Subscribers listeners = new Subscribers<>(); + + private final Subscribers transactionDroppedListeners = + new Subscribers<>(); private final int maxPendingTransactions; private final Clock clock; - public PendingTransactions(final int maxPendingTransactions, final Clock clock) { + private final LabelledMetric transactionCounter; + private final Counter localTransactionAddedCounter; + private final Counter remoteTransactionAddedCounter; + + public PendingTransactions( + final int maxPendingTransactions, final Clock clock, final MetricsSystem metricsSystem) { this.maxPendingTransactions = maxPendingTransactions; this.clock = clock; + transactionCounter = + metricsSystem.createLabelledCounter( + MetricCategory.TRANSACTION_POOL, + "transactions_total", + "Count of transactions changed in the transaction pool", + "source"); + localTransactionAddedCounter = transactionCounter.labels("local", "added"); + remoteTransactionAddedCounter = transactionCounter.labels("remote", "added"); } public boolean addRemoteTransaction(final Transaction transaction) { final TransactionInfo transactionInfo = new TransactionInfo(transaction, false, clock.instant()); - return addTransaction(transactionInfo); + boolean addTransaction = addTransaction(transactionInfo); + remoteTransactionAddedCounter.inc(); + return addTransaction; } boolean addLocalTransaction(final Transaction transaction) { - return addTransaction(new TransactionInfo(transaction, true, clock.instant())); + boolean addTransaction = + addTransaction(new TransactionInfo(transaction, true, clock.instant())); + localTransactionAddedCounter.inc(); + return addTransaction; } public void removeTransaction(final Transaction transaction) { + doRemoveTransaction(transaction, false); + notifyTransactionDropped(transaction); + } + + public void transactionAddedToBlock(final Transaction transaction) { + doRemoveTransaction(transaction, true); + } + + private void doRemoveTransaction(final Transaction transaction, final boolean addedToBlock) { synchronized (pendingTransactions) { final TransactionInfo removedTransactionInfo = pendingTransactions.remove(transaction.hash()); if (removedTransactionInfo != null) { @@ -85,10 +117,19 @@ public class PendingTransactions { transactionsBySender.remove(transaction.getSender()); } }); + incrementTransactionRemovedCounter( + removedTransactionInfo.isReceivedFromLocalSource(), addedToBlock); } } } + private void incrementTransactionRemovedCounter( + final boolean receivedFromLocalSource, final boolean addedToBlock) { + final String location = receivedFromLocalSource ? "local" : "remote"; + final String operation = addedToBlock ? "addedToBlock" : "dropped"; + transactionCounter.labels(location, "removed", operation).inc(); + } + /* * The BlockTransaction selection process (part of block mining) requires synchronised access to * all pendingTransactions - this allows it to iterate over the available transactions without @@ -179,6 +220,10 @@ public class PendingTransactions { listeners.forEach(listener -> listener.onTransactionAdded(transaction)); } + private void notifyTransactionDropped(final Transaction transaction) { + transactionDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction)); + } + public int size() { synchronized (pendingTransactions) { return pendingTransactions.size(); @@ -199,7 +244,11 @@ public class PendingTransactions { } public void addTransactionListener(final PendingTransactionListener listener) { - listeners.add(listener); + listeners.subscribe(listener); + } + + public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) { + transactionDroppedListeners.subscribe(listener); } public OptionalLong getNextNonceForSender(final Address sender) { diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java index d6108df149..dfa3153e39 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java @@ -110,9 +110,13 @@ public class TransactionPool implements BlockAddedObserver { pendingTransactions.addTransactionListener(listener); } + public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) { + pendingTransactions.addTransactionDroppedListener(listener); + } + @Override public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) { - event.getAddedTransactions().forEach(pendingTransactions::removeTransaction); + event.getAddedTransactions().forEach(pendingTransactions::transactionAddedToBlock); addRemoteTransactions(event.getRemovedTransactions()); } diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java index 5eccd26a06..3168d22fe3 100644 --- a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java +++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java @@ -19,6 +19,8 @@ import static org.mockito.Mockito.verifyZeroInteractions; import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; import tech.pegasys.pantheon.ethereum.core.PendingTransactions.TransactionSelectionResult; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import java.util.ArrayList; @@ -33,12 +35,16 @@ public class PendingTransactionsTest { private static final int MAX_TRANSACTIONS = 5; private static final KeyPair KEYS1 = KeyPair.generate(); private static final KeyPair KEYS2 = KeyPair.generate(); + + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private final PendingTransactions transactions = - new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed()); + new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); private final Transaction transaction1 = createTransaction(2); private final Transaction transaction2 = createTransaction(1); private final PendingTransactionListener listener = mock(PendingTransactionListener.class); + private final PendingTransactionDroppedListener droppedListener = + mock(PendingTransactionDroppedListener.class); private static final Address SENDER1 = Util.publicKeyToAddress(KEYS1.getPublicKey()); private static final Address SENDER2 = Util.publicKeyToAddress(KEYS2.getPublicKey()); @@ -133,6 +139,39 @@ public class PendingTransactionsTest { verify(listener).onTransactionAdded(transaction1); } + @Test + public void shouldNotifyDroppedListenerWhenRemoteTransactionDropped() { + transactions.addRemoteTransaction(transaction1); + + transactions.addTransactionDroppedListener(droppedListener); + + transactions.removeTransaction(transaction1); + + verify(droppedListener).onTransactionDropped(transaction1); + } + + @Test + public void shouldNotifyDroppedListenerWhenLocalTransactionDropped() { + transactions.addLocalTransaction(transaction1); + + transactions.addTransactionDroppedListener(droppedListener); + + transactions.removeTransaction(transaction1); + + verify(droppedListener).onTransactionDropped(transaction1); + } + + @Test + public void shouldNotNotifyDroppedListenerWhenTransactionAddedToBlock() { + transactions.addRemoteTransaction(transaction1); + + transactions.addTransactionDroppedListener(droppedListener); + + transactions.transactionAddedToBlock(transaction1); + + verifyZeroInteractions(droppedListener); + } + @Test public void selectTransactionsUntilSelectorRequestsNoMore() { transactions.addRemoteTransaction(transaction1); diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java index e3b6915ebb..09bf466a66 100644 --- a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java +++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java @@ -42,6 +42,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec; import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator; import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.uint.UInt256; @@ -58,6 +60,7 @@ public class TransactionPoolTest { private final PendingTransactionListener listener = mock(PendingTransactionListener.class); private final TransactionBatchAddedListener batchAddedListener = mock(TransactionBatchAddedListener.class); + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @SuppressWarnings("unchecked") private final ProtocolSchedule protocolSchedule = mock(ProtocolSchedule.class); @@ -68,7 +71,7 @@ public class TransactionPoolTest { private final TransactionValidator transactionValidator = mock(TransactionValidator.class); private MutableBlockchain blockchain; private final PendingTransactions transactions = - new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed()); + new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); private final Transaction transaction1 = createTransaction(1); private final Transaction transaction2 = createTransaction(2); private TransactionPool transactionPool; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java index 28243225fc..5a4b22dd83 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java @@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.core.TransactionPool; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.MetricsSystem; import java.time.Clock; @@ -28,9 +29,10 @@ public class TransactionPoolFactory { final ProtocolContext protocolContext, final EthContext ethContext, final Clock clock, - final int maxPendingTransactions) { + final int maxPendingTransactions, + final MetricsSystem metricsSystem) { final PendingTransactions pendingTransactions = - new PendingTransactions(maxPendingTransactions, clock); + new PendingTransactions(maxPendingTransactions, clock, metricsSystem); final PeerTransactionTracker transactionTracker = new PeerTransactionTracker(); final TransactionsMessageSender transactionsMessageSender = diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java index ad28a40881..07174154fb 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java @@ -58,6 +58,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -91,6 +92,7 @@ public final class EthProtocolManagerTest { private static ProtocolSchedule protocolSchedule; private static BlockDataGenerator gen; private static ProtocolContext protocolContext; + private static final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @BeforeClass public static void setup() { @@ -1020,7 +1022,8 @@ public final class EthProtocolManagerTest { protocolContext, ethManager.ethContext(), TestClock.fixed(), - PendingTransactions.MAX_PENDING_TRANSACTIONS); + PendingTransactions.MAX_PENDING_TRANSACTIONS, + metricsSystem); // Send just a transaction message. final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {}); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java index 308df45d1a..d79d859ea5 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java @@ -46,6 +46,7 @@ import tech.pegasys.pantheon.ethereum.p2p.peers.Peer; import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -66,6 +67,7 @@ import org.apache.logging.log4j.Logger; public class TestNode implements Closeable { private static final Logger LOG = LogManager.getLogger(); + private static final MetricsSystem metricsSystem = new NoOpMetricsSystem(); protected final Integer port; protected final SECP256K1.KeyPair kp; @@ -138,7 +140,8 @@ public class TestNode implements Closeable { protocolContext, ethContext, TestClock.fixed(), - PendingTransactions.MAX_PENDING_TRANSACTIONS); + PendingTransactions.MAX_PENDING_TRANSACTIONS, + metricsSystem); networkRunner.start(); selfPeer = new DefaultPeer(id(), endpoint()); diff --git a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java index dbee2c7a16..9edd28b3a3 100644 --- a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java @@ -43,6 +43,8 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcError; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcErrorResponse; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcResponse; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; @@ -64,8 +66,10 @@ public class EthGetFilterChangesIntegrationTest { private final String ETH_METHOD = "eth_getFilterChanges"; private final String JSON_RPC_VERSION = "2.0"; private TransactionPool transactionPool; + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); + private final PendingTransactions transactions = - new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed()); + new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); private static final int MAX_TRANSACTIONS = 5; private static final KeyPair keyPair = KeyPair.generate(); private final Transaction transaction = createTransaction(1); diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java new file mode 100644 index 0000000000..8743b43584 --- /dev/null +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java @@ -0,0 +1,52 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.PendingTransactionDroppedListener; +import tech.pegasys.pantheon.ethereum.core.Transaction; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; + +import java.util.List; + +public class PendingTransactionDroppedSubscriptionService + implements PendingTransactionDroppedListener { + + private final SubscriptionManager subscriptionManager; + + public PendingTransactionDroppedSubscriptionService( + final SubscriptionManager subscriptionManager) { + this.subscriptionManager = subscriptionManager; + } + + @Override + public void onTransactionDropped(final Transaction transaction) { + notifySubscribers(transaction.hash()); + } + + private void notifySubscribers(final Hash pendingTransaction) { + final List subscriptions = pendingDroppedTransactionSubscriptions(); + + final PendingTransactionResult msg = new PendingTransactionResult(pendingTransaction); + for (final Subscription subscription : subscriptions) { + subscriptionManager.sendMessage(subscription.getId(), msg); + } + } + + private List pendingDroppedTransactionSubscriptions() { + return subscriptionManager.subscriptionsOfType( + SubscriptionType.DROPPED_PENDING_TRANSACTIONS, Subscription.class); + } +} diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java index ca63f47584..9dbbfd7a87 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java @@ -37,9 +37,9 @@ public class PendingTransactionSubscriptionService implements PendingTransaction private void notifySubscribers(final Hash pendingTransaction) { final List subscriptions = pendingTransactionSubscriptions(); + final PendingTransactionResult msg = new PendingTransactionResult(pendingTransaction); for (final Subscription subscription : subscriptions) { - subscriptionManager.sendMessage( - subscription.getId(), new PendingTransactionResult(pendingTransaction)); + subscriptionManager.sendMessage(subscription.getId(), msg); } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java index 0e12003f88..86ca07045c 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java @@ -24,6 +24,9 @@ public enum SubscriptionType { @JsonProperty("newPendingTransactions") NEW_PENDING_TRANSACTIONS("newPendingTransactions"), + @JsonProperty("droppedPendingTransactions") + DROPPED_PENDING_TRANSACTIONS("droppedPendingTransactions"), + @JsonProperty("syncing") SYNCING("syncing"); diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java new file mode 100644 index 0000000000..46bd8d9356 --- /dev/null +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.refEq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import tech.pegasys.pantheon.ethereum.chain.Blockchain; +import tech.pegasys.pantheon.ethereum.core.Block; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.Transaction; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PendingTransactionDroppedSubscriptionServiceTest { + + private static final Hash TX_ONE = + Hash.fromHexString("0x15876958423545c3c7b0fcf9be8ffb543305ee1b43db87ed380dcf0cd16589f7"); + + @Mock private SubscriptionManager subscriptionManager; + @Mock private Blockchain blockchain; + @Mock private Block block; + + private PendingTransactionDroppedSubscriptionService service; + + @Before + public void setUp() { + service = new PendingTransactionDroppedSubscriptionService(subscriptionManager); + } + + @Test + public void onTransactionAddedMustSendMessage() { + final long[] subscriptionIds = new long[] {5, 56, 989}; + setUpSubscriptions(subscriptionIds); + final Transaction pending = transaction(TX_ONE); + + service.onTransactionDropped(pending); + + verifyZeroInteractions(block); + verifyZeroInteractions(blockchain); + verifySubscriptionMangerInteractions(messages(TX_ONE, subscriptionIds)); + } + + private void verifySubscriptionMangerInteractions(final Map expected) { + verify(subscriptionManager) + .subscriptionsOfType(SubscriptionType.DROPPED_PENDING_TRANSACTIONS, Subscription.class); + + for (final Map.Entry message : expected.entrySet()) { + verify(subscriptionManager) + .sendMessage( + eq(message.getKey()), refEq(new PendingTransactionResult(message.getValue()))); + } + + verifyNoMoreInteractions(subscriptionManager); + } + + private Map messages(final Hash result, final long... subscriptionIds) { + final Map messages = new HashMap<>(); + + for (final long subscriptionId : subscriptionIds) { + messages.put(subscriptionId, result); + } + + return messages; + } + + private Transaction transaction(final Hash hash) { + final Transaction tx = mock(Transaction.class); + when(tx.hash()).thenReturn(hash); + return tx; + } + + private void setUpSubscriptions(final long... subscriptionsIds) { + when(subscriptionManager.subscriptionsOfType(any(), any())) + .thenReturn( + Arrays.stream(subscriptionsIds) + .mapToObj(id -> new Subscription(id, SubscriptionType.DROPPED_PENDING_TRANSACTIONS)) + .collect(Collectors.toList())); + } +} diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java index 8f5ba98cdb..cf71690dac 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java @@ -25,7 +25,8 @@ public enum MetricCategory { PROCESS("process", false), ROCKSDB("rocksdb"), RPC("rpc"), - SYNCHRONIZER("synchronizer"); + SYNCHRONIZER("synchronizer"), + TRANSACTION_POOL("transaction_pool"); // Why not BIG_QUEUE and ROCKSDB? They hurt performance under load. public static final Set DEFAULT_METRIC_CATEGORIES = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java index baf6090a43..2a93acf1a5 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java @@ -37,6 +37,7 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.methods.WebSocketMethods import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.blockheaders.NewBlockHeadersSubscriptionService; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.logs.LogsSubscriptionService; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending.PendingTransactionDroppedSubscriptionService; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending.PendingTransactionSubscriptionService; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing.SyncingSubscriptionService; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -457,7 +458,10 @@ public class RunnerBuilder { final SubscriptionManager subscriptionManager = new SubscriptionManager(); final PendingTransactionSubscriptionService pendingTransactions = new PendingTransactionSubscriptionService(subscriptionManager); + final PendingTransactionDroppedSubscriptionService pendingTransactionsRemoved = + new PendingTransactionDroppedSubscriptionService(subscriptionManager); transactionPool.addTransactionListener(pendingTransactions); + transactionPool.addTransactionDroppedListener(pendingTransactionsRemoved); vertx.deployVerticle(subscriptionManager); return subscriptionManager; diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java index 1b30ad4253..243c9d485a 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java @@ -176,7 +176,8 @@ public class CliquePantheonController implements PantheonController { final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( - protocolSchedule, protocolContext, ethContext, clock, maxPendingTransactions); + protocolSchedule, + protocolContext, + ethContext, + clock, + maxPendingTransactions, + metricsSystem); final IbftEventQueue ibftEventQueue = new IbftEventQueue(ibftConfig.getMessageQueueLimit()); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index 240948a46d..46e0acecab 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -153,7 +153,8 @@ public class MainnetPantheonController implements PantheonController { protocolContext, ethProtocolManager.ethContext(), clock, - maxPendingTransactions); + maxPendingTransactions, + metricsSystem); final ExecutorService minerThreadPool = Executors.newCachedThreadPool(); final EthHashMinerExecutor executor =