diff --git a/consensus/clique/build.gradle b/consensus/clique/build.gradle
index ade80f4f63..6fd9c2c209 100644
--- a/consensus/clique/build.gradle
+++ b/consensus/clique/build.gradle
@@ -47,6 +47,7 @@ dependencies {
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(path: ':consensus:common', configuration: 'testArtifacts')
testImplementation project(':testutil')
+ testImplementation project(':metrics')
testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
diff --git a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java
index d2c38451ef..420cf37d36 100644
--- a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java
+++ b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java
@@ -45,6 +45,8 @@ import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -60,6 +62,7 @@ public class CliqueBlockCreatorTest {
private final Address proposerAddress = Util.publicKeyToAddress(proposerKeyPair.getPublicKey());
private final KeyPair otherKeyPair = KeyPair.generate();
private final List
validatorList = Lists.newArrayList();
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private ProtocolSchedule protocolSchedule;
private final WorldStateArchive stateArchive = createInMemoryWorldStateArchive();
@@ -113,7 +116,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
- new PendingTransactions(5, TestClock.fixed()),
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,
@@ -140,7 +143,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
- new PendingTransactions(5, TestClock.fixed()),
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,
@@ -166,7 +169,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
- new PendingTransactions(5, TestClock.fixed()),
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,
@@ -195,7 +198,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
- new PendingTransactions(5, TestClock.fixed()),
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,
diff --git a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java
index 666b8bda6a..6a09ada16c 100644
--- a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java
+++ b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java
@@ -37,6 +37,8 @@ import tech.pegasys.pantheon.ethereum.core.PendingTransactions;
import tech.pegasys.pantheon.ethereum.core.PrivacyParameters;
import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.core.Wei;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -58,6 +60,7 @@ public class CliqueMinerExecutorTest {
private final List validatorList = Lists.newArrayList();
private ProtocolContext cliqueProtocolContext;
private BlockHeaderTestFixture blockHeaderBuilder;
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Before
public void setup() {
@@ -89,7 +92,7 @@ public class CliqueMinerExecutorTest {
Executors.newSingleThreadExecutor(),
CliqueProtocolSchedule.create(
GENESIS_CONFIG_OPTIONS, proposerKeyPair, PrivacyParameters.noPrivacy()),
- new PendingTransactions(1, TestClock.fixed()),
+ new PendingTransactions(1, TestClock.fixed(), metricsSystem),
proposerKeyPair,
new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false),
mock(CliqueBlockScheduler.class),
@@ -120,7 +123,7 @@ public class CliqueMinerExecutorTest {
Executors.newSingleThreadExecutor(),
CliqueProtocolSchedule.create(
GENESIS_CONFIG_OPTIONS, proposerKeyPair, PrivacyParameters.noPrivacy()),
- new PendingTransactions(1, TestClock.fixed()),
+ new PendingTransactions(1, TestClock.fixed(), metricsSystem),
proposerKeyPair,
new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false),
mock(CliqueBlockScheduler.class),
diff --git a/consensus/ibft/build.gradle b/consensus/ibft/build.gradle
index e1fb27ca95..34c1e19027 100644
--- a/consensus/ibft/build.gradle
+++ b/consensus/ibft/build.gradle
@@ -47,7 +47,9 @@ dependencies {
testImplementation project(path: ':config:', configuration: 'testSupportArtifacts')
testImplementation project(path: ':consensus:common', configuration: 'testArtifacts')
testImplementation project(':testutil')
+ testImplementation project(':metrics')
+ integrationTestImplementation project(':metrics')
integrationTestImplementation 'junit:junit'
integrationTestImplementation 'org.assertj:assertj-core'
integrationTestImplementation 'org.mockito:mockito-core'
diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java
index b52316c661..c6edaf368c 100644
--- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java
+++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java
@@ -65,6 +65,8 @@ import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -84,6 +86,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
public class TestContextBuilder {
+ private static MetricsSystem metricsSystem = new NoOpMetricsSystem();
private static class ControllerAndState {
@@ -283,7 +286,7 @@ public class TestContextBuilder {
final IbftBlockCreatorFactory blockCreatorFactory =
new IbftBlockCreatorFactory(
(gasLimit) -> gasLimit,
- new PendingTransactions(1, clock), // changed from IbftPantheonController
+ new PendingTransactions(1, clock, metricsSystem), // changed from IbftPantheonController
protocolContext,
protocolSchedule,
miningParams,
diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java
index f9efa6e71f..889fa2dbdf 100644
--- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java
+++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java
@@ -38,6 +38,8 @@ import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -50,6 +52,7 @@ import com.google.common.collect.Lists;
import org.junit.Test;
public class IbftBlockCreatorTest {
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void createdBlockPassesValidationRulesAndHasAppropriateHashAndMixHash() {
@@ -91,7 +94,7 @@ public class IbftBlockCreatorTest {
0,
initialValidatorList)
.encode(),
- new PendingTransactions(1, TestClock.fixed()),
+ new PendingTransactions(1, TestClock.fixed(), metricsSystem),
protContext,
protocolSchedule,
parentGasLimit -> parentGasLimit,
diff --git a/consensus/ibftlegacy/build.gradle b/consensus/ibftlegacy/build.gradle
index 19e5bde444..ad7ac76e33 100644
--- a/consensus/ibftlegacy/build.gradle
+++ b/consensus/ibftlegacy/build.gradle
@@ -32,6 +32,7 @@ dependencies {
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(path: ':consensus:ibft', configuration: 'testSupportArtifacts')
testImplementation project(':testutil')
+ testImplementation project(':metrics')
testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
diff --git a/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java b/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java
index a753a06845..3f48518a51 100644
--- a/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java
+++ b/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java
@@ -38,6 +38,8 @@ import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -50,6 +52,7 @@ import com.google.common.collect.Lists;
import org.junit.Test;
public class IbftBlockCreatorTest {
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void headerProducedPassesValidationRules() {
@@ -96,7 +99,7 @@ public class IbftBlockCreatorTest {
null,
initialValidatorList)
.encode(),
- new PendingTransactions(1, TestClock.fixed()),
+ new PendingTransactions(1, TestClock.fixed(), metricsSystem),
protContext,
protocolSchedule,
parentGasLimit -> parentGasLimit,
diff --git a/ethereum/blockcreation/build.gradle b/ethereum/blockcreation/build.gradle
index 63e682930e..3ea5ee1b89 100644
--- a/ethereum/blockcreation/build.gradle
+++ b/ethereum/blockcreation/build.gradle
@@ -27,7 +27,7 @@ dependencies {
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(path: ':ethereum:core', configuration: 'testArtifacts')
testImplementation project(':testutil')
-
+ testImplementation project(':metrics')
testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java
index c52d9d28a5..1287283049 100644
--- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java
+++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java
@@ -46,6 +46,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.vm.TestBlockchain;
import tech.pegasys.pantheon.ethereum.worldstate.DefaultMutableWorldState;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -61,6 +63,7 @@ import org.junit.Test;
public class BlockTransactionSelectorTest {
private static final KeyPair keyPair = KeyPair.generate();
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void emptyPendingTransactionsResultsInEmptyVettingResult() {
@@ -71,7 +74,9 @@ public class BlockTransactionSelectorTest {
final TransactionProcessor transactionProcessor =
protocolSchedule.getByBlockNumber(0).getTransactionProcessor();
final DefaultMutableWorldState worldState = inMemoryWorldState();
- final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
+
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Supplier isCancelled = () -> false;
final ProcessableBlockHeader blockHeader =
@@ -108,7 +113,8 @@ public class BlockTransactionSelectorTest {
@Test
public void failedTransactionsAreIncludedInTheBlock() {
- final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Transaction transaction = createTransaction(1);
pendingTransactions.addRemoteTransaction(transaction);
@@ -159,7 +165,8 @@ public class BlockTransactionSelectorTest {
@Test
public void invalidTransactionsTransactionProcessingAreSkippedButBlockStillFills() {
- final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final List transactionsToInject = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
@@ -221,7 +228,8 @@ public class BlockTransactionSelectorTest {
@Test
public void subsetOfPendingTransactionsIncludedWhenBlockGasLimitHit() {
- final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final List transactionsToInject = Lists.newArrayList();
// Transactions are reported in reverse order.
@@ -286,7 +294,8 @@ public class BlockTransactionSelectorTest {
@Test
public void transactionOfferingGasPriceLessThanMinimumIsIdentifiedAndRemovedFromPending() {
- final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Blockchain blockchain = new TestBlockchain();
@@ -330,7 +339,8 @@ public class BlockTransactionSelectorTest {
@Test
public void transactionTooLargeForBlockDoesNotPreventMoreBeingAddedIfBlockOccupancyNotReached() {
- final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Blockchain blockchain = new TestBlockchain();
final DefaultMutableWorldState worldState = inMemoryWorldState();
final Supplier isCancelled = () -> false;
@@ -400,7 +410,8 @@ public class BlockTransactionSelectorTest {
@Test
public void transactionSelectionStopsWhenSufficientBlockOccupancyIsReached() {
- final PendingTransactions pendingTransactions = new PendingTransactions(10, TestClock.fixed());
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(10, TestClock.fixed(), metricsSystem);
final Blockchain blockchain = new TestBlockchain();
final DefaultMutableWorldState worldState = inMemoryWorldState();
final Supplier isCancelled = () -> false;
@@ -481,7 +492,8 @@ public class BlockTransactionSelectorTest {
@Test
public void shouldDiscardTransactionsThatFailValidation() {
- final PendingTransactions pendingTransactions = new PendingTransactions(10, TestClock.fixed());
+ final PendingTransactions pendingTransactions =
+ new PendingTransactions(10, TestClock.fixed(), metricsSystem);
final TransactionProcessor transactionProcessor = mock(TransactionProcessor.class);
final Blockchain blockchain = new TestBlockchain();
final DefaultMutableWorldState worldState = inMemoryWorldState();
diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java
index 5bb405a96c..4a53a940c0 100644
--- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java
+++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java
@@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.EthHashSolver;
import tech.pegasys.pantheon.ethereum.mainnet.EthHasher.Light;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolScheduleBuilder;
import tech.pegasys.pantheon.ethereum.mainnet.ValidationTestUtils;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -44,6 +46,7 @@ public class EthHashBlockCreatorTest {
private static final BytesValue BLOCK_1_EXTRA_DATA =
BytesValue.fromHexString("0x476574682f76312e302e302f6c696e75782f676f312e342e32");
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final ExecutionContextTestFixture executionContextTestFixture =
ExecutionContextTestFixture.builder()
@@ -63,7 +66,7 @@ public class EthHashBlockCreatorTest {
new EthHashBlockCreator(
BLOCK_1_COINBASE,
parent -> BLOCK_1_EXTRA_DATA,
- new PendingTransactions(1, TestClock.fixed()),
+ new PendingTransactions(1, TestClock.fixed(), metricsSystem),
executionContextTestFixture.getProtocolContext(),
executionContextTestFixture.getProtocolSchedule(),
gasLimit -> gasLimit,
diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java
index fd88a672a4..7e011febe3 100644
--- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java
+++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java
@@ -17,6 +17,8 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import tech.pegasys.pantheon.ethereum.core.MiningParameters;
import tech.pegasys.pantheon.ethereum.core.MiningParametersTestBuilder;
import tech.pegasys.pantheon.ethereum.core.PendingTransactions;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.Subscribers;
@@ -25,6 +27,7 @@ import java.util.concurrent.Executors;
import org.junit.Test;
public class EthHashMinerExecutorTest {
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void startingMiningWithoutCoinbaseThrowsException() {
@@ -36,7 +39,7 @@ public class EthHashMinerExecutorTest {
null,
Executors.newCachedThreadPool(),
null,
- new PendingTransactions(1, TestClock.fixed()),
+ new PendingTransactions(1, TestClock.fixed(), metricsSystem),
miningParameters,
new DefaultBlockScheduler(1, 10, TestClock.fixed()));
@@ -54,7 +57,7 @@ public class EthHashMinerExecutorTest {
null,
Executors.newCachedThreadPool(),
null,
- new PendingTransactions(1, TestClock.fixed()),
+ new PendingTransactions(1, TestClock.fixed(), metricsSystem),
miningParameters,
new DefaultBlockScheduler(1, 10, TestClock.fixed()));
diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java
new file mode 100644
index 0000000000..7927461307
--- /dev/null
+++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2018 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package tech.pegasys.pantheon.ethereum.core;
+
+@FunctionalInterface
+public interface PendingTransactionDroppedListener {
+
+ void onTransactionDropped(Transaction transaction);
+}
diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java
index 4541be7136..d3bc017708 100644
--- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java
+++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java
@@ -12,13 +12,17 @@
*/
package tech.pegasys.pantheon.ethereum.core;
-import static java.util.Collections.newSetFromMap;
import static java.util.Comparator.comparing;
+import tech.pegasys.pantheon.metrics.Counter;
+import tech.pegasys.pantheon.metrics.LabelledMetric;
+import tech.pegasys.pantheon.metrics.MetricCategory;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.util.Subscribers;
+
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,7 +33,6 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -51,28 +54,57 @@ public class PendingTransactions {
private final Map> transactionsBySender =
new HashMap<>();
- private final Collection listeners =
- newSetFromMap(new ConcurrentHashMap<>());
+ private final Subscribers listeners = new Subscribers<>();
+
+ private final Subscribers transactionDroppedListeners =
+ new Subscribers<>();
private final int maxPendingTransactions;
private final Clock clock;
- public PendingTransactions(final int maxPendingTransactions, final Clock clock) {
+ private final LabelledMetric transactionCounter;
+ private final Counter localTransactionAddedCounter;
+ private final Counter remoteTransactionAddedCounter;
+
+ public PendingTransactions(
+ final int maxPendingTransactions, final Clock clock, final MetricsSystem metricsSystem) {
this.maxPendingTransactions = maxPendingTransactions;
this.clock = clock;
+ transactionCounter =
+ metricsSystem.createLabelledCounter(
+ MetricCategory.TRANSACTION_POOL,
+ "transactions_total",
+ "Count of transactions changed in the transaction pool",
+ "source");
+ localTransactionAddedCounter = transactionCounter.labels("local", "added");
+ remoteTransactionAddedCounter = transactionCounter.labels("remote", "added");
}
public boolean addRemoteTransaction(final Transaction transaction) {
final TransactionInfo transactionInfo =
new TransactionInfo(transaction, false, clock.instant());
- return addTransaction(transactionInfo);
+ boolean addTransaction = addTransaction(transactionInfo);
+ remoteTransactionAddedCounter.inc();
+ return addTransaction;
}
boolean addLocalTransaction(final Transaction transaction) {
- return addTransaction(new TransactionInfo(transaction, true, clock.instant()));
+ boolean addTransaction =
+ addTransaction(new TransactionInfo(transaction, true, clock.instant()));
+ localTransactionAddedCounter.inc();
+ return addTransaction;
}
public void removeTransaction(final Transaction transaction) {
+ doRemoveTransaction(transaction, false);
+ notifyTransactionDropped(transaction);
+ }
+
+ public void transactionAddedToBlock(final Transaction transaction) {
+ doRemoveTransaction(transaction, true);
+ }
+
+ private void doRemoveTransaction(final Transaction transaction, final boolean addedToBlock) {
synchronized (pendingTransactions) {
final TransactionInfo removedTransactionInfo = pendingTransactions.remove(transaction.hash());
if (removedTransactionInfo != null) {
@@ -85,10 +117,19 @@ public class PendingTransactions {
transactionsBySender.remove(transaction.getSender());
}
});
+ incrementTransactionRemovedCounter(
+ removedTransactionInfo.isReceivedFromLocalSource(), addedToBlock);
}
}
}
+ private void incrementTransactionRemovedCounter(
+ final boolean receivedFromLocalSource, final boolean addedToBlock) {
+ final String location = receivedFromLocalSource ? "local" : "remote";
+ final String operation = addedToBlock ? "addedToBlock" : "dropped";
+ transactionCounter.labels(location, "removed", operation).inc();
+ }
+
/*
* The BlockTransaction selection process (part of block mining) requires synchronised access to
* all pendingTransactions - this allows it to iterate over the available transactions without
@@ -179,6 +220,10 @@ public class PendingTransactions {
listeners.forEach(listener -> listener.onTransactionAdded(transaction));
}
+ private void notifyTransactionDropped(final Transaction transaction) {
+ transactionDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction));
+ }
+
public int size() {
synchronized (pendingTransactions) {
return pendingTransactions.size();
@@ -199,7 +244,11 @@ public class PendingTransactions {
}
public void addTransactionListener(final PendingTransactionListener listener) {
- listeners.add(listener);
+ listeners.subscribe(listener);
+ }
+
+ public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) {
+ transactionDroppedListeners.subscribe(listener);
}
public OptionalLong getNextNonceForSender(final Address sender) {
diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java
index d6108df149..dfa3153e39 100644
--- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java
+++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java
@@ -110,9 +110,13 @@ public class TransactionPool implements BlockAddedObserver {
pendingTransactions.addTransactionListener(listener);
}
+ public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) {
+ pendingTransactions.addTransactionDroppedListener(listener);
+ }
+
@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
- event.getAddedTransactions().forEach(pendingTransactions::removeTransaction);
+ event.getAddedTransactions().forEach(pendingTransactions::transactionAddedToBlock);
addRemoteTransactions(event.getRemovedTransactions());
}
diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java
index 5eccd26a06..3168d22fe3 100644
--- a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java
+++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java
@@ -19,6 +19,8 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.core.PendingTransactions.TransactionSelectionResult;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import java.util.ArrayList;
@@ -33,12 +35,16 @@ public class PendingTransactionsTest {
private static final int MAX_TRANSACTIONS = 5;
private static final KeyPair KEYS1 = KeyPair.generate();
private static final KeyPair KEYS2 = KeyPair.generate();
+
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final PendingTransactions transactions =
- new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed());
+ new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private final Transaction transaction1 = createTransaction(2);
private final Transaction transaction2 = createTransaction(1);
private final PendingTransactionListener listener = mock(PendingTransactionListener.class);
+ private final PendingTransactionDroppedListener droppedListener =
+ mock(PendingTransactionDroppedListener.class);
private static final Address SENDER1 = Util.publicKeyToAddress(KEYS1.getPublicKey());
private static final Address SENDER2 = Util.publicKeyToAddress(KEYS2.getPublicKey());
@@ -133,6 +139,39 @@ public class PendingTransactionsTest {
verify(listener).onTransactionAdded(transaction1);
}
+ @Test
+ public void shouldNotifyDroppedListenerWhenRemoteTransactionDropped() {
+ transactions.addRemoteTransaction(transaction1);
+
+ transactions.addTransactionDroppedListener(droppedListener);
+
+ transactions.removeTransaction(transaction1);
+
+ verify(droppedListener).onTransactionDropped(transaction1);
+ }
+
+ @Test
+ public void shouldNotifyDroppedListenerWhenLocalTransactionDropped() {
+ transactions.addLocalTransaction(transaction1);
+
+ transactions.addTransactionDroppedListener(droppedListener);
+
+ transactions.removeTransaction(transaction1);
+
+ verify(droppedListener).onTransactionDropped(transaction1);
+ }
+
+ @Test
+ public void shouldNotNotifyDroppedListenerWhenTransactionAddedToBlock() {
+ transactions.addRemoteTransaction(transaction1);
+
+ transactions.addTransactionDroppedListener(droppedListener);
+
+ transactions.transactionAddedToBlock(transaction1);
+
+ verifyZeroInteractions(droppedListener);
+ }
+
@Test
public void selectTransactionsUntilSelectorRequestsNoMore() {
transactions.addRemoteTransaction(transaction1);
diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java
index e3b6915ebb..09bf466a66 100644
--- a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java
+++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java
@@ -42,6 +42,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.uint.UInt256;
@@ -58,6 +60,7 @@ public class TransactionPoolTest {
private final PendingTransactionListener listener = mock(PendingTransactionListener.class);
private final TransactionBatchAddedListener batchAddedListener =
mock(TransactionBatchAddedListener.class);
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@SuppressWarnings("unchecked")
private final ProtocolSchedule protocolSchedule = mock(ProtocolSchedule.class);
@@ -68,7 +71,7 @@ public class TransactionPoolTest {
private final TransactionValidator transactionValidator = mock(TransactionValidator.class);
private MutableBlockchain blockchain;
private final PendingTransactions transactions =
- new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed());
+ new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private final Transaction transaction1 = createTransaction(1);
private final Transaction transaction2 = createTransaction(2);
private TransactionPool transactionPool;
diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java
index 28243225fc..5a4b22dd83 100644
--- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java
+++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java
@@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.core.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.time.Clock;
@@ -28,9 +29,10 @@ public class TransactionPoolFactory {
final ProtocolContext> protocolContext,
final EthContext ethContext,
final Clock clock,
- final int maxPendingTransactions) {
+ final int maxPendingTransactions,
+ final MetricsSystem metricsSystem) {
final PendingTransactions pendingTransactions =
- new PendingTransactions(maxPendingTransactions, clock);
+ new PendingTransactions(maxPendingTransactions, clock, metricsSystem);
final PeerTransactionTracker transactionTracker = new PeerTransactionTracker();
final TransactionsMessageSender transactionsMessageSender =
diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
index ad28a40881..07174154fb 100644
--- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
+++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
@@ -58,6 +58,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -91,6 +92,7 @@ public final class EthProtocolManagerTest {
private static ProtocolSchedule protocolSchedule;
private static BlockDataGenerator gen;
private static ProtocolContext protocolContext;
+ private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@BeforeClass
public static void setup() {
@@ -1020,7 +1022,8 @@ public final class EthProtocolManagerTest {
protocolContext,
ethManager.ethContext(),
TestClock.fixed(),
- PendingTransactions.MAX_PENDING_TRANSACTIONS);
+ PendingTransactions.MAX_PENDING_TRANSACTIONS,
+ metricsSystem);
// Send just a transaction message.
final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {});
diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
index 308df45d1a..d79d859ea5 100644
--- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
+++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
@@ -46,6 +46,7 @@ import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -66,6 +67,7 @@ import org.apache.logging.log4j.Logger;
public class TestNode implements Closeable {
private static final Logger LOG = LogManager.getLogger();
+ private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
protected final Integer port;
protected final SECP256K1.KeyPair kp;
@@ -138,7 +140,8 @@ public class TestNode implements Closeable {
protocolContext,
ethContext,
TestClock.fixed(),
- PendingTransactions.MAX_PENDING_TRANSACTIONS);
+ PendingTransactions.MAX_PENDING_TRANSACTIONS,
+ metricsSystem);
networkRunner.start();
selfPeer = new DefaultPeer(id(), endpoint());
diff --git a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java
index dbee2c7a16..9edd28b3a3 100644
--- a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java
+++ b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java
@@ -43,6 +43,8 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcError;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcErrorResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse;
+import tech.pegasys.pantheon.metrics.MetricsSystem;
+import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
@@ -64,8 +66,10 @@ public class EthGetFilterChangesIntegrationTest {
private final String ETH_METHOD = "eth_getFilterChanges";
private final String JSON_RPC_VERSION = "2.0";
private TransactionPool transactionPool;
+ private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
+
private final PendingTransactions transactions =
- new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed());
+ new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private static final int MAX_TRANSACTIONS = 5;
private static final KeyPair keyPair = KeyPair.generate();
private final Transaction transaction = createTransaction(1);
diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java
new file mode 100644
index 0000000000..8743b43584
--- /dev/null
+++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending;
+
+import tech.pegasys.pantheon.ethereum.core.Hash;
+import tech.pegasys.pantheon.ethereum.core.PendingTransactionDroppedListener;
+import tech.pegasys.pantheon.ethereum.core.Transaction;
+import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription;
+import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
+import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
+
+import java.util.List;
+
+public class PendingTransactionDroppedSubscriptionService
+ implements PendingTransactionDroppedListener {
+
+ private final SubscriptionManager subscriptionManager;
+
+ public PendingTransactionDroppedSubscriptionService(
+ final SubscriptionManager subscriptionManager) {
+ this.subscriptionManager = subscriptionManager;
+ }
+
+ @Override
+ public void onTransactionDropped(final Transaction transaction) {
+ notifySubscribers(transaction.hash());
+ }
+
+ private void notifySubscribers(final Hash pendingTransaction) {
+ final List subscriptions = pendingDroppedTransactionSubscriptions();
+
+ final PendingTransactionResult msg = new PendingTransactionResult(pendingTransaction);
+ for (final Subscription subscription : subscriptions) {
+ subscriptionManager.sendMessage(subscription.getId(), msg);
+ }
+ }
+
+ private List pendingDroppedTransactionSubscriptions() {
+ return subscriptionManager.subscriptionsOfType(
+ SubscriptionType.DROPPED_PENDING_TRANSACTIONS, Subscription.class);
+ }
+}
diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java
index ca63f47584..9dbbfd7a87 100644
--- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java
+++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java
@@ -37,9 +37,9 @@ public class PendingTransactionSubscriptionService implements PendingTransaction
private void notifySubscribers(final Hash pendingTransaction) {
final List subscriptions = pendingTransactionSubscriptions();
+ final PendingTransactionResult msg = new PendingTransactionResult(pendingTransaction);
for (final Subscription subscription : subscriptions) {
- subscriptionManager.sendMessage(
- subscription.getId(), new PendingTransactionResult(pendingTransaction));
+ subscriptionManager.sendMessage(subscription.getId(), msg);
}
}
diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java
index 0e12003f88..86ca07045c 100644
--- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java
+++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java
@@ -24,6 +24,9 @@ public enum SubscriptionType {
@JsonProperty("newPendingTransactions")
NEW_PENDING_TRANSACTIONS("newPendingTransactions"),
+ @JsonProperty("droppedPendingTransactions")
+ DROPPED_PENDING_TRANSACTIONS("droppedPendingTransactions"),
+
@JsonProperty("syncing")
SYNCING("syncing");
diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java
new file mode 100644
index 0000000000..46bd8d9356
--- /dev/null
+++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2018 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.refEq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import tech.pegasys.pantheon.ethereum.chain.Blockchain;
+import tech.pegasys.pantheon.ethereum.core.Block;
+import tech.pegasys.pantheon.ethereum.core.Hash;
+import tech.pegasys.pantheon.ethereum.core.Transaction;
+import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription;
+import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
+import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PendingTransactionDroppedSubscriptionServiceTest {
+
+ private static final Hash TX_ONE =
+ Hash.fromHexString("0x15876958423545c3c7b0fcf9be8ffb543305ee1b43db87ed380dcf0cd16589f7");
+
+ @Mock private SubscriptionManager subscriptionManager;
+ @Mock private Blockchain blockchain;
+ @Mock private Block block;
+
+ private PendingTransactionDroppedSubscriptionService service;
+
+ @Before
+ public void setUp() {
+ service = new PendingTransactionDroppedSubscriptionService(subscriptionManager);
+ }
+
+ @Test
+ public void onTransactionAddedMustSendMessage() {
+ final long[] subscriptionIds = new long[] {5, 56, 989};
+ setUpSubscriptions(subscriptionIds);
+ final Transaction pending = transaction(TX_ONE);
+
+ service.onTransactionDropped(pending);
+
+ verifyZeroInteractions(block);
+ verifyZeroInteractions(blockchain);
+ verifySubscriptionMangerInteractions(messages(TX_ONE, subscriptionIds));
+ }
+
+ private void verifySubscriptionMangerInteractions(final Map expected) {
+ verify(subscriptionManager)
+ .subscriptionsOfType(SubscriptionType.DROPPED_PENDING_TRANSACTIONS, Subscription.class);
+
+ for (final Map.Entry message : expected.entrySet()) {
+ verify(subscriptionManager)
+ .sendMessage(
+ eq(message.getKey()), refEq(new PendingTransactionResult(message.getValue())));
+ }
+
+ verifyNoMoreInteractions(subscriptionManager);
+ }
+
+ private Map messages(final Hash result, final long... subscriptionIds) {
+ final Map messages = new HashMap<>();
+
+ for (final long subscriptionId : subscriptionIds) {
+ messages.put(subscriptionId, result);
+ }
+
+ return messages;
+ }
+
+ private Transaction transaction(final Hash hash) {
+ final Transaction tx = mock(Transaction.class);
+ when(tx.hash()).thenReturn(hash);
+ return tx;
+ }
+
+ private void setUpSubscriptions(final long... subscriptionsIds) {
+ when(subscriptionManager.subscriptionsOfType(any(), any()))
+ .thenReturn(
+ Arrays.stream(subscriptionsIds)
+ .mapToObj(id -> new Subscription(id, SubscriptionType.DROPPED_PENDING_TRANSACTIONS))
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java
index 8f5ba98cdb..cf71690dac 100644
--- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java
+++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java
@@ -25,7 +25,8 @@ public enum MetricCategory {
PROCESS("process", false),
ROCKSDB("rocksdb"),
RPC("rpc"),
- SYNCHRONIZER("synchronizer");
+ SYNCHRONIZER("synchronizer"),
+ TRANSACTION_POOL("transaction_pool");
// Why not BIG_QUEUE and ROCKSDB? They hurt performance under load.
public static final Set DEFAULT_METRIC_CATEGORIES =
diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java
index baf6090a43..2a93acf1a5 100644
--- a/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java
+++ b/pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java
@@ -37,6 +37,7 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.methods.WebSocketMethods
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.blockheaders.NewBlockHeadersSubscriptionService;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.logs.LogsSubscriptionService;
+import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending.PendingTransactionDroppedSubscriptionService;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending.PendingTransactionSubscriptionService;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing.SyncingSubscriptionService;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
@@ -457,7 +458,10 @@ public class RunnerBuilder {
final SubscriptionManager subscriptionManager = new SubscriptionManager();
final PendingTransactionSubscriptionService pendingTransactions =
new PendingTransactionSubscriptionService(subscriptionManager);
+ final PendingTransactionDroppedSubscriptionService pendingTransactionsRemoved =
+ new PendingTransactionDroppedSubscriptionService(subscriptionManager);
transactionPool.addTransactionListener(pendingTransactions);
+ transactionPool.addTransactionDroppedListener(pendingTransactionsRemoved);
vertx.deployVerticle(subscriptionManager);
return subscriptionManager;
diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
index 1b30ad4253..243c9d485a 100644
--- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
+++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
@@ -176,7 +176,8 @@ public class CliquePantheonController implements PantheonController {
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
- protocolSchedule, protocolContext, ethContext, clock, maxPendingTransactions);
+ protocolSchedule,
+ protocolContext,
+ ethContext,
+ clock,
+ maxPendingTransactions,
+ metricsSystem);
final IbftEventQueue ibftEventQueue = new IbftEventQueue(ibftConfig.getMessageQueueLimit());
diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java
index 240948a46d..46e0acecab 100644
--- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java
+++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java
@@ -153,7 +153,8 @@ public class MainnetPantheonController implements PantheonController {
protocolContext,
ethProtocolManager.ethContext(),
clock,
- maxPendingTransactions);
+ maxPendingTransactions,
+ metricsSystem);
final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final EthHashMinerExecutor executor =