[PAN-2327] Notify of dropped messages (#1156)

* Added ability to subscribe to dropped transactions from the transaction pending pool.
Implemented subscription webservice to support this.
* Added metrics to the pending transactions, tracking the number of local and remote transactions in the pool.
* Converted listener management in pending transactions to use the Subscribers util object.

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Rob Dawson 6 years ago committed by GitHub
parent 324222752a
commit cdefb330be
  1. 1
      consensus/clique/build.gradle
  2. 11
      consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java
  3. 7
      consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java
  4. 2
      consensus/ibft/build.gradle
  5. 5
      consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java
  6. 5
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java
  7. 1
      consensus/ibftlegacy/build.gradle
  8. 5
      consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java
  9. 2
      ethereum/blockcreation/build.gradle
  10. 28
      ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java
  11. 5
      ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java
  12. 7
      ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java
  13. 19
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionDroppedListener.java
  14. 67
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java
  15. 6
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/TransactionPool.java
  16. 41
      ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/PendingTransactionsTest.java
  17. 5
      ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/core/TransactionPoolTest.java
  18. 6
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java
  19. 5
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
  20. 5
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
  21. 6
      ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java
  22. 52
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java
  23. 4
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java
  24. 3
      ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/request/SubscriptionType.java
  25. 109
      ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java
  26. 3
      metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java
  27. 4
      pantheon/src/main/java/tech/pegasys/pantheon/RunnerBuilder.java
  28. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
  29. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java
  30. 7
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java
  31. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java

@ -47,6 +47,7 @@ dependencies {
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(path: ':consensus:common', configuration: 'testArtifacts')
testImplementation project(':testutil')
testImplementation project(':metrics')
testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'

@ -45,6 +45,8 @@ import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -60,6 +62,7 @@ public class CliqueBlockCreatorTest {
private final Address proposerAddress = Util.publicKeyToAddress(proposerKeyPair.getPublicKey());
private final KeyPair otherKeyPair = KeyPair.generate();
private final List<Address> validatorList = Lists.newArrayList();
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private ProtocolSchedule<CliqueContext> protocolSchedule;
private final WorldStateArchive stateArchive = createInMemoryWorldStateArchive();
@ -113,7 +116,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
new PendingTransactions(5, TestClock.fixed()),
new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,
@ -140,7 +143,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
new PendingTransactions(5, TestClock.fixed()),
new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,
@ -166,7 +169,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
new PendingTransactions(5, TestClock.fixed()),
new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,
@ -195,7 +198,7 @@ public class CliqueBlockCreatorTest {
new CliqueBlockCreator(
coinbase,
parent -> extraData.encode(),
new PendingTransactions(5, TestClock.fixed()),
new PendingTransactions(5, TestClock.fixed(), metricsSystem),
protocolContext,
protocolSchedule,
gasLimit -> gasLimit,

@ -37,6 +37,8 @@ import tech.pegasys.pantheon.ethereum.core.PendingTransactions;
import tech.pegasys.pantheon.ethereum.core.PrivacyParameters;
import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -58,6 +60,7 @@ public class CliqueMinerExecutorTest {
private final List<Address> validatorList = Lists.newArrayList();
private ProtocolContext<CliqueContext> cliqueProtocolContext;
private BlockHeaderTestFixture blockHeaderBuilder;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Before
public void setup() {
@ -89,7 +92,7 @@ public class CliqueMinerExecutorTest {
Executors.newSingleThreadExecutor(),
CliqueProtocolSchedule.create(
GENESIS_CONFIG_OPTIONS, proposerKeyPair, PrivacyParameters.noPrivacy()),
new PendingTransactions(1, TestClock.fixed()),
new PendingTransactions(1, TestClock.fixed(), metricsSystem),
proposerKeyPair,
new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false),
mock(CliqueBlockScheduler.class),
@ -120,7 +123,7 @@ public class CliqueMinerExecutorTest {
Executors.newSingleThreadExecutor(),
CliqueProtocolSchedule.create(
GENESIS_CONFIG_OPTIONS, proposerKeyPair, PrivacyParameters.noPrivacy()),
new PendingTransactions(1, TestClock.fixed()),
new PendingTransactions(1, TestClock.fixed(), metricsSystem),
proposerKeyPair,
new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false),
mock(CliqueBlockScheduler.class),

@ -47,7 +47,9 @@ dependencies {
testImplementation project(path: ':config:', configuration: 'testSupportArtifacts')
testImplementation project(path: ':consensus:common', configuration: 'testArtifacts')
testImplementation project(':testutil')
testImplementation project(':metrics')
integrationTestImplementation project(':metrics')
integrationTestImplementation 'junit:junit'
integrationTestImplementation 'org.assertj:assertj-core'
integrationTestImplementation 'org.mockito:mockito-core'

@ -65,6 +65,8 @@ import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -84,6 +86,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
public class TestContextBuilder {
private static MetricsSystem metricsSystem = new NoOpMetricsSystem();
private static class ControllerAndState {
@ -283,7 +286,7 @@ public class TestContextBuilder {
final IbftBlockCreatorFactory blockCreatorFactory =
new IbftBlockCreatorFactory(
(gasLimit) -> gasLimit,
new PendingTransactions(1, clock), // changed from IbftPantheonController
new PendingTransactions(1, clock, metricsSystem), // changed from IbftPantheonController
protocolContext,
protocolSchedule,
miningParams,

@ -38,6 +38,8 @@ import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -50,6 +52,7 @@ import com.google.common.collect.Lists;
import org.junit.Test;
public class IbftBlockCreatorTest {
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void createdBlockPassesValidationRulesAndHasAppropriateHashAndMixHash() {
@ -91,7 +94,7 @@ public class IbftBlockCreatorTest {
0,
initialValidatorList)
.encode(),
new PendingTransactions(1, TestClock.fixed()),
new PendingTransactions(1, TestClock.fixed(), metricsSystem),
protContext,
protocolSchedule,
parentGasLimit -> parentGasLimit,

@ -32,6 +32,7 @@ dependencies {
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(path: ':consensus:ibft', configuration: 'testSupportArtifacts')
testImplementation project(':testutil')
testImplementation project(':metrics')
testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'

@ -38,6 +38,8 @@ import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -50,6 +52,7 @@ import com.google.common.collect.Lists;
import org.junit.Test;
public class IbftBlockCreatorTest {
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void headerProducedPassesValidationRules() {
@ -96,7 +99,7 @@ public class IbftBlockCreatorTest {
null,
initialValidatorList)
.encode(),
new PendingTransactions(1, TestClock.fixed()),
new PendingTransactions(1, TestClock.fixed(), metricsSystem),
protContext,
protocolSchedule,
parentGasLimit -> parentGasLimit,

@ -27,7 +27,7 @@ dependencies {
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(path: ':ethereum:core', configuration: 'testArtifacts')
testImplementation project(':testutil')
testImplementation project(':metrics')
testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'

@ -46,6 +46,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.vm.TestBlockchain;
import tech.pegasys.pantheon.ethereum.worldstate.DefaultMutableWorldState;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -61,6 +63,7 @@ import org.junit.Test;
public class BlockTransactionSelectorTest {
private static final KeyPair keyPair = KeyPair.generate();
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void emptyPendingTransactionsResultsInEmptyVettingResult() {
@ -71,7 +74,9 @@ public class BlockTransactionSelectorTest {
final TransactionProcessor transactionProcessor =
protocolSchedule.getByBlockNumber(0).getTransactionProcessor();
final DefaultMutableWorldState worldState = inMemoryWorldState();
final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Supplier<Boolean> isCancelled = () -> false;
final ProcessableBlockHeader blockHeader =
@ -108,7 +113,8 @@ public class BlockTransactionSelectorTest {
@Test
public void failedTransactionsAreIncludedInTheBlock() {
final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Transaction transaction = createTransaction(1);
pendingTransactions.addRemoteTransaction(transaction);
@ -159,7 +165,8 @@ public class BlockTransactionSelectorTest {
@Test
public void invalidTransactionsTransactionProcessingAreSkippedButBlockStillFills() {
final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final List<Transaction> transactionsToInject = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
@ -221,7 +228,8 @@ public class BlockTransactionSelectorTest {
@Test
public void subsetOfPendingTransactionsIncludedWhenBlockGasLimitHit() {
final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final List<Transaction> transactionsToInject = Lists.newArrayList();
// Transactions are reported in reverse order.
@ -286,7 +294,8 @@ public class BlockTransactionSelectorTest {
@Test
public void transactionOfferingGasPriceLessThanMinimumIsIdentifiedAndRemovedFromPending() {
final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Blockchain blockchain = new TestBlockchain();
@ -330,7 +339,8 @@ public class BlockTransactionSelectorTest {
@Test
public void transactionTooLargeForBlockDoesNotPreventMoreBeingAddedIfBlockOccupancyNotReached() {
final PendingTransactions pendingTransactions = new PendingTransactions(5, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(5, TestClock.fixed(), metricsSystem);
final Blockchain blockchain = new TestBlockchain();
final DefaultMutableWorldState worldState = inMemoryWorldState();
final Supplier<Boolean> isCancelled = () -> false;
@ -400,7 +410,8 @@ public class BlockTransactionSelectorTest {
@Test
public void transactionSelectionStopsWhenSufficientBlockOccupancyIsReached() {
final PendingTransactions pendingTransactions = new PendingTransactions(10, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(10, TestClock.fixed(), metricsSystem);
final Blockchain blockchain = new TestBlockchain();
final DefaultMutableWorldState worldState = inMemoryWorldState();
final Supplier<Boolean> isCancelled = () -> false;
@ -481,7 +492,8 @@ public class BlockTransactionSelectorTest {
@Test
public void shouldDiscardTransactionsThatFailValidation() {
final PendingTransactions pendingTransactions = new PendingTransactions(10, TestClock.fixed());
final PendingTransactions pendingTransactions =
new PendingTransactions(10, TestClock.fixed(), metricsSystem);
final TransactionProcessor transactionProcessor = mock(TransactionProcessor.class);
final Blockchain blockchain = new TestBlockchain();
final DefaultMutableWorldState worldState = inMemoryWorldState();

@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.EthHashSolver;
import tech.pegasys.pantheon.ethereum.mainnet.EthHasher.Light;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolScheduleBuilder;
import tech.pegasys.pantheon.ethereum.mainnet.ValidationTestUtils;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -44,6 +46,7 @@ public class EthHashBlockCreatorTest {
private static final BytesValue BLOCK_1_EXTRA_DATA =
BytesValue.fromHexString("0x476574682f76312e302e302f6c696e75782f676f312e342e32");
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final ExecutionContextTestFixture executionContextTestFixture =
ExecutionContextTestFixture.builder()
@ -63,7 +66,7 @@ public class EthHashBlockCreatorTest {
new EthHashBlockCreator(
BLOCK_1_COINBASE,
parent -> BLOCK_1_EXTRA_DATA,
new PendingTransactions(1, TestClock.fixed()),
new PendingTransactions(1, TestClock.fixed(), metricsSystem),
executionContextTestFixture.getProtocolContext(),
executionContextTestFixture.getProtocolSchedule(),
gasLimit -> gasLimit,

@ -17,6 +17,8 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import tech.pegasys.pantheon.ethereum.core.MiningParameters;
import tech.pegasys.pantheon.ethereum.core.MiningParametersTestBuilder;
import tech.pegasys.pantheon.ethereum.core.PendingTransactions;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.Subscribers;
@ -25,6 +27,7 @@ import java.util.concurrent.Executors;
import org.junit.Test;
public class EthHashMinerExecutorTest {
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void startingMiningWithoutCoinbaseThrowsException() {
@ -36,7 +39,7 @@ public class EthHashMinerExecutorTest {
null,
Executors.newCachedThreadPool(),
null,
new PendingTransactions(1, TestClock.fixed()),
new PendingTransactions(1, TestClock.fixed(), metricsSystem),
miningParameters,
new DefaultBlockScheduler(1, 10, TestClock.fixed()));
@ -54,7 +57,7 @@ public class EthHashMinerExecutorTest {
null,
Executors.newCachedThreadPool(),
null,
new PendingTransactions(1, TestClock.fixed()),
new PendingTransactions(1, TestClock.fixed(), metricsSystem),
miningParameters,
new DefaultBlockScheduler(1, 10, TestClock.fixed()));

@ -0,0 +1,19 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.core;
@FunctionalInterface
public interface PendingTransactionDroppedListener {
void onTransactionDropped(Transaction transaction);
}

@ -12,13 +12,17 @@
*/
package tech.pegasys.pantheon.ethereum.core;
import static java.util.Collections.newSetFromMap;
import static java.util.Comparator.comparing;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.Subscribers;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -29,7 +33,6 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -51,28 +54,57 @@ public class PendingTransactions {
private final Map<Address, SortedMap<Long, TransactionInfo>> transactionsBySender =
new HashMap<>();
private final Collection<PendingTransactionListener> listeners =
newSetFromMap(new ConcurrentHashMap<>());
private final Subscribers<PendingTransactionListener> listeners = new Subscribers<>();
private final Subscribers<PendingTransactionDroppedListener> transactionDroppedListeners =
new Subscribers<>();
private final int maxPendingTransactions;
private final Clock clock;
public PendingTransactions(final int maxPendingTransactions, final Clock clock) {
private final LabelledMetric<Counter> transactionCounter;
private final Counter localTransactionAddedCounter;
private final Counter remoteTransactionAddedCounter;
public PendingTransactions(
final int maxPendingTransactions, final Clock clock, final MetricsSystem metricsSystem) {
this.maxPendingTransactions = maxPendingTransactions;
this.clock = clock;
transactionCounter =
metricsSystem.createLabelledCounter(
MetricCategory.TRANSACTION_POOL,
"transactions_total",
"Count of transactions changed in the transaction pool",
"source");
localTransactionAddedCounter = transactionCounter.labels("local", "added");
remoteTransactionAddedCounter = transactionCounter.labels("remote", "added");
}
public boolean addRemoteTransaction(final Transaction transaction) {
final TransactionInfo transactionInfo =
new TransactionInfo(transaction, false, clock.instant());
return addTransaction(transactionInfo);
boolean addTransaction = addTransaction(transactionInfo);
remoteTransactionAddedCounter.inc();
return addTransaction;
}
boolean addLocalTransaction(final Transaction transaction) {
return addTransaction(new TransactionInfo(transaction, true, clock.instant()));
boolean addTransaction =
addTransaction(new TransactionInfo(transaction, true, clock.instant()));
localTransactionAddedCounter.inc();
return addTransaction;
}
public void removeTransaction(final Transaction transaction) {
doRemoveTransaction(transaction, false);
notifyTransactionDropped(transaction);
}
public void transactionAddedToBlock(final Transaction transaction) {
doRemoveTransaction(transaction, true);
}
private void doRemoveTransaction(final Transaction transaction, final boolean addedToBlock) {
synchronized (pendingTransactions) {
final TransactionInfo removedTransactionInfo = pendingTransactions.remove(transaction.hash());
if (removedTransactionInfo != null) {
@ -85,10 +117,19 @@ public class PendingTransactions {
transactionsBySender.remove(transaction.getSender());
}
});
incrementTransactionRemovedCounter(
removedTransactionInfo.isReceivedFromLocalSource(), addedToBlock);
}
}
}
private void incrementTransactionRemovedCounter(
final boolean receivedFromLocalSource, final boolean addedToBlock) {
final String location = receivedFromLocalSource ? "local" : "remote";
final String operation = addedToBlock ? "addedToBlock" : "dropped";
transactionCounter.labels(location, "removed", operation).inc();
}
/*
* The BlockTransaction selection process (part of block mining) requires synchronised access to
* all pendingTransactions - this allows it to iterate over the available transactions without
@ -179,6 +220,10 @@ public class PendingTransactions {
listeners.forEach(listener -> listener.onTransactionAdded(transaction));
}
private void notifyTransactionDropped(final Transaction transaction) {
transactionDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction));
}
public int size() {
synchronized (pendingTransactions) {
return pendingTransactions.size();
@ -199,7 +244,11 @@ public class PendingTransactions {
}
public void addTransactionListener(final PendingTransactionListener listener) {
listeners.add(listener);
listeners.subscribe(listener);
}
public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) {
transactionDroppedListeners.subscribe(listener);
}
public OptionalLong getNextNonceForSender(final Address sender) {

@ -110,9 +110,13 @@ public class TransactionPool implements BlockAddedObserver {
pendingTransactions.addTransactionListener(listener);
}
public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) {
pendingTransactions.addTransactionDroppedListener(listener);
}
@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
event.getAddedTransactions().forEach(pendingTransactions::removeTransaction);
event.getAddedTransactions().forEach(pendingTransactions::transactionAddedToBlock);
addRemoteTransactions(event.getRemovedTransactions());
}

@ -19,6 +19,8 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.core.PendingTransactions.TransactionSelectionResult;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import java.util.ArrayList;
@ -33,12 +35,16 @@ public class PendingTransactionsTest {
private static final int MAX_TRANSACTIONS = 5;
private static final KeyPair KEYS1 = KeyPair.generate();
private static final KeyPair KEYS2 = KeyPair.generate();
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final PendingTransactions transactions =
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed());
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private final Transaction transaction1 = createTransaction(2);
private final Transaction transaction2 = createTransaction(1);
private final PendingTransactionListener listener = mock(PendingTransactionListener.class);
private final PendingTransactionDroppedListener droppedListener =
mock(PendingTransactionDroppedListener.class);
private static final Address SENDER1 = Util.publicKeyToAddress(KEYS1.getPublicKey());
private static final Address SENDER2 = Util.publicKeyToAddress(KEYS2.getPublicKey());
@ -133,6 +139,39 @@ public class PendingTransactionsTest {
verify(listener).onTransactionAdded(transaction1);
}
@Test
public void shouldNotifyDroppedListenerWhenRemoteTransactionDropped() {
transactions.addRemoteTransaction(transaction1);
transactions.addTransactionDroppedListener(droppedListener);
transactions.removeTransaction(transaction1);
verify(droppedListener).onTransactionDropped(transaction1);
}
@Test
public void shouldNotifyDroppedListenerWhenLocalTransactionDropped() {
transactions.addLocalTransaction(transaction1);
transactions.addTransactionDroppedListener(droppedListener);
transactions.removeTransaction(transaction1);
verify(droppedListener).onTransactionDropped(transaction1);
}
@Test
public void shouldNotNotifyDroppedListenerWhenTransactionAddedToBlock() {
transactions.addRemoteTransaction(transaction1);
transactions.addTransactionDroppedListener(droppedListener);
transactions.transactionAddedToBlock(transaction1);
verifyZeroInteractions(droppedListener);
}
@Test
public void selectTransactionsUntilSelectorRequestsNoMore() {
transactions.addRemoteTransaction(transaction1);

@ -42,6 +42,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -58,6 +60,7 @@ public class TransactionPoolTest {
private final PendingTransactionListener listener = mock(PendingTransactionListener.class);
private final TransactionBatchAddedListener batchAddedListener =
mock(TransactionBatchAddedListener.class);
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@SuppressWarnings("unchecked")
private final ProtocolSchedule<Void> protocolSchedule = mock(ProtocolSchedule.class);
@ -68,7 +71,7 @@ public class TransactionPoolTest {
private final TransactionValidator transactionValidator = mock(TransactionValidator.class);
private MutableBlockchain blockchain;
private final PendingTransactions transactions =
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed());
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private final Transaction transaction1 = createTransaction(1);
private final Transaction transaction2 = createTransaction(2);
private TransactionPool transactionPool;

@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.core.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.time.Clock;
@ -28,9 +29,10 @@ public class TransactionPoolFactory {
final ProtocolContext<?> protocolContext,
final EthContext ethContext,
final Clock clock,
final int maxPendingTransactions) {
final int maxPendingTransactions,
final MetricsSystem metricsSystem) {
final PendingTransactions pendingTransactions =
new PendingTransactions(maxPendingTransactions, clock);
new PendingTransactions(maxPendingTransactions, clock, metricsSystem);
final PeerTransactionTracker transactionTracker = new PeerTransactionTracker();
final TransactionsMessageSender transactionsMessageSender =

@ -58,6 +58,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -91,6 +92,7 @@ public final class EthProtocolManagerTest {
private static ProtocolSchedule<Void> protocolSchedule;
private static BlockDataGenerator gen;
private static ProtocolContext<Void> protocolContext;
private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@BeforeClass
public static void setup() {
@ -1020,7 +1022,8 @@ public final class EthProtocolManagerTest {
protocolContext,
ethManager.ethContext(),
TestClock.fixed(),
PendingTransactions.MAX_PENDING_TRANSACTIONS);
PendingTransactions.MAX_PENDING_TRANSACTIONS,
metricsSystem);
// Send just a transaction message.
final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {});

@ -46,6 +46,7 @@ import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -66,6 +67,7 @@ import org.apache.logging.log4j.Logger;
public class TestNode implements Closeable {
private static final Logger LOG = LogManager.getLogger();
private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
protected final Integer port;
protected final SECP256K1.KeyPair kp;
@ -138,7 +140,8 @@ public class TestNode implements Closeable {
protocolContext,
ethContext,
TestClock.fixed(),
PendingTransactions.MAX_PENDING_TRANSACTIONS);
PendingTransactions.MAX_PENDING_TRANSACTIONS,
metricsSystem);
networkRunner.start();
selfPeer = new DefaultPeer(id(), endpoint());

@ -43,6 +43,8 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcError;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcErrorResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -64,8 +66,10 @@ public class EthGetFilterChangesIntegrationTest {
private final String ETH_METHOD = "eth_getFilterChanges";
private final String JSON_RPC_VERSION = "2.0";
private TransactionPool transactionPool;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final PendingTransactions transactions =
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed());
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private static final int MAX_TRANSACTIONS = 5;
private static final KeyPair keyPair = KeyPair.generate();
private final Transaction transaction = createTransaction(1);

@ -0,0 +1,52 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.PendingTransactionDroppedListener;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import java.util.List;
public class PendingTransactionDroppedSubscriptionService
implements PendingTransactionDroppedListener {
private final SubscriptionManager subscriptionManager;
public PendingTransactionDroppedSubscriptionService(
final SubscriptionManager subscriptionManager) {
this.subscriptionManager = subscriptionManager;
}
@Override
public void onTransactionDropped(final Transaction transaction) {
notifySubscribers(transaction.hash());
}
private void notifySubscribers(final Hash pendingTransaction) {
final List<Subscription> subscriptions = pendingDroppedTransactionSubscriptions();
final PendingTransactionResult msg = new PendingTransactionResult(pendingTransaction);
for (final Subscription subscription : subscriptions) {
subscriptionManager.sendMessage(subscription.getId(), msg);
}
}
private List<Subscription> pendingDroppedTransactionSubscriptions() {
return subscriptionManager.subscriptionsOfType(
SubscriptionType.DROPPED_PENDING_TRANSACTIONS, Subscription.class);
}
}

@ -37,9 +37,9 @@ public class PendingTransactionSubscriptionService implements PendingTransaction
private void notifySubscribers(final Hash pendingTransaction) {
final List<Subscription> subscriptions = pendingTransactionSubscriptions();
final PendingTransactionResult msg = new PendingTransactionResult(pendingTransaction);
for (final Subscription subscription : subscriptions) {
subscriptionManager.sendMessage(
subscription.getId(), new PendingTransactionResult(pendingTransaction));
subscriptionManager.sendMessage(subscription.getId(), msg);
}
}

@ -24,6 +24,9 @@ public enum SubscriptionType {
@JsonProperty("newPendingTransactions")
NEW_PENDING_TRANSACTIONS("newPendingTransactions"),
@JsonProperty("droppedPendingTransactions")
DROPPED_PENDING_TRANSACTIONS("droppedPendingTransactions"),
@JsonProperty("syncing")
SYNCING("syncing");

@ -0,0 +1,109 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class PendingTransactionDroppedSubscriptionServiceTest {
private static final Hash TX_ONE =
Hash.fromHexString("0x15876958423545c3c7b0fcf9be8ffb543305ee1b43db87ed380dcf0cd16589f7");
@Mock private SubscriptionManager subscriptionManager;
@Mock private Blockchain blockchain;
@Mock private Block block;
private PendingTransactionDroppedSubscriptionService service;
@Before
public void setUp() {
service = new PendingTransactionDroppedSubscriptionService(subscriptionManager);
}
@Test
public void onTransactionAddedMustSendMessage() {
final long[] subscriptionIds = new long[] {5, 56, 989};
setUpSubscriptions(subscriptionIds);
final Transaction pending = transaction(TX_ONE);
service.onTransactionDropped(pending);
verifyZeroInteractions(block);
verifyZeroInteractions(blockchain);
verifySubscriptionMangerInteractions(messages(TX_ONE, subscriptionIds));
}
private void verifySubscriptionMangerInteractions(final Map<Long, Hash> expected) {
verify(subscriptionManager)
.subscriptionsOfType(SubscriptionType.DROPPED_PENDING_TRANSACTIONS, Subscription.class);
for (final Map.Entry<Long, Hash> message : expected.entrySet()) {
verify(subscriptionManager)
.sendMessage(
eq(message.getKey()), refEq(new PendingTransactionResult(message.getValue())));
}
verifyNoMoreInteractions(subscriptionManager);
}
private Map<Long, Hash> messages(final Hash result, final long... subscriptionIds) {
final Map<Long, Hash> messages = new HashMap<>();
for (final long subscriptionId : subscriptionIds) {
messages.put(subscriptionId, result);
}
return messages;
}
private Transaction transaction(final Hash hash) {
final Transaction tx = mock(Transaction.class);
when(tx.hash()).thenReturn(hash);
return tx;
}
private void setUpSubscriptions(final long... subscriptionsIds) {
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(
Arrays.stream(subscriptionsIds)
.mapToObj(id -> new Subscription(id, SubscriptionType.DROPPED_PENDING_TRANSACTIONS))
.collect(Collectors.toList()));
}
}

@ -25,7 +25,8 @@ public enum MetricCategory {
PROCESS("process", false),
ROCKSDB("rocksdb"),
RPC("rpc"),
SYNCHRONIZER("synchronizer");
SYNCHRONIZER("synchronizer"),
TRANSACTION_POOL("transaction_pool");
// Why not BIG_QUEUE and ROCKSDB? They hurt performance under load.
public static final Set<MetricCategory> DEFAULT_METRIC_CATEGORIES =

@ -37,6 +37,7 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.methods.WebSocketMethods
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.blockheaders.NewBlockHeadersSubscriptionService;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.logs.LogsSubscriptionService;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending.PendingTransactionDroppedSubscriptionService;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.pending.PendingTransactionSubscriptionService;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing.SyncingSubscriptionService;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
@ -457,7 +458,10 @@ public class RunnerBuilder {
final SubscriptionManager subscriptionManager = new SubscriptionManager();
final PendingTransactionSubscriptionService pendingTransactions =
new PendingTransactionSubscriptionService(subscriptionManager);
final PendingTransactionDroppedSubscriptionService pendingTransactionsRemoved =
new PendingTransactionDroppedSubscriptionService(subscriptionManager);
transactionPool.addTransactionListener(pendingTransactions);
transactionPool.addTransactionDroppedListener(pendingTransactionsRemoved);
vertx.deployVerticle(subscriptionManager);
return subscriptionManager;

@ -176,7 +176,8 @@ public class CliquePantheonController implements PantheonController<CliqueContex
protocolContext,
ethProtocolManager.ethContext(),
clock,
maxPendingTransactions);
maxPendingTransactions,
metricsSystem);
final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final CliqueMinerExecutor miningExecutor =

@ -180,7 +180,8 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
protocolContext,
istanbul64ProtocolManager.ethContext(),
clock,
maxPendingTransactions);
maxPendingTransactions,
metricsSystem);
return new IbftLegacyPantheonController(
protocolSchedule,

@ -201,7 +201,12 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
protocolSchedule, protocolContext, ethContext, clock, maxPendingTransactions);
protocolSchedule,
protocolContext,
ethContext,
clock,
maxPendingTransactions,
metricsSystem);
final IbftEventQueue ibftEventQueue = new IbftEventQueue(ibftConfig.getMessageQueueLimit());

@ -153,7 +153,8 @@ public class MainnetPantheonController implements PantheonController<Void> {
protocolContext,
ethProtocolManager.ethContext(),
clock,
maxPendingTransactions);
maxPendingTransactions,
metricsSystem);
final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final EthHashMinerExecutor executor =

Loading…
Cancel
Save