NC-1880 High TX volume swamps block processing (#337)

* NC-1880 High TX volume swamps block processing

Move transaction processing into its own thread(s).

Size of txWorkerExecutor thread pool can be independently configured.
Danno Ferrin 6 years ago committed by GitHub
parent b74f88afb2
commit abaaef7907
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java
  2. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java
  3. 15
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
  4. 40
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
  5. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java
  6. 21
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
  7. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionSender.java
  8. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java
  9. 10
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java
  10. 89
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
  11. 16
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerTest.java
  12. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
  13. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
  14. 6
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java
  15. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java
  16. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java

@ -29,8 +29,9 @@ public class Istanbul64ProtocolManager extends EthProtocolManager {
final Blockchain blockchain, final Blockchain blockchain,
final int networkId, final int networkId,
final boolean fastSyncEnabled, final boolean fastSyncEnabled,
final int workers) { final int syncWorkers,
super(blockchain, networkId, fastSyncEnabled, workers); final int txWorkers) {
super(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers);
} }
@Override @Override

@ -100,7 +100,7 @@ public abstract class AbstractEthTask<T> implements EthTask<T> {
*/ */
protected final <S> CompletableFuture<S> executeWorkerSubTask( protected final <S> CompletableFuture<S> executeWorkerSubTask(
final EthScheduler scheduler, final Supplier<CompletableFuture<S>> subTask) { final EthScheduler scheduler, final Supplier<CompletableFuture<S>> subTask) {
return executeSubTask(() -> scheduler.scheduleWorkerTask(subTask)); return executeSubTask(() -> scheduler.scheduleSyncWorkerTask(subTask));
} }
public final T result() { public final T result() {

@ -90,17 +90,24 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final Blockchain blockchain, final Blockchain blockchain,
final int networkId, final int networkId,
final boolean fastSyncEnabled, final boolean fastSyncEnabled,
final int workers, final int syncWorkers,
final int txWorkers,
final int requestLimit) { final int requestLimit) {
this(blockchain, networkId, fastSyncEnabled, requestLimit, new EthScheduler(workers)); this(
blockchain,
networkId,
fastSyncEnabled,
requestLimit,
new EthScheduler(syncWorkers, txWorkers));
} }
public EthProtocolManager( public EthProtocolManager(
final Blockchain blockchain, final Blockchain blockchain,
final int networkId, final int networkId,
final boolean fastSyncEnabled, final boolean fastSyncEnabled,
final int workers) { final int syncWorkers,
this(blockchain, networkId, fastSyncEnabled, workers, DEFAULT_REQUEST_LIMIT); final int txWorkers) {
this(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers, DEFAULT_REQUEST_LIMIT);
} }
public EthContext ethContext() { public EthContext ethContext() {

@ -41,13 +41,14 @@ public class EthScheduler {
private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdown = new CountDownLatch(1); private final CountDownLatch shutdown = new CountDownLatch(1);
protected final ExecutorService workerExecutor; protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler; protected final ScheduledExecutorService scheduler;
protected final ExecutorService txWorkerExecutor;
EthScheduler(final int workerCount) { EthScheduler(final int syncWorkerCount, final int txWorkerCount) {
this( this(
Executors.newFixedThreadPool( Executors.newFixedThreadPool(
workerCount, syncWorkerCount,
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Workers") .setNameFormat(EthScheduler.class.getSimpleName() + "-Workers")
.build()), .build()),
@ -56,19 +57,28 @@ public class EthScheduler {
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setDaemon(true) .setDaemon(true)
.setNameFormat(EthScheduler.class.getSimpleName() + "Timer") .setNameFormat(EthScheduler.class.getSimpleName() + "Timer")
.build()),
Executors.newFixedThreadPool(
txWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions")
.build())); .build()));
} }
protected EthScheduler( protected EthScheduler(
final ExecutorService workerExecutor, final ScheduledExecutorService scheduler) { final ExecutorService syncWorkerExecutor,
this.workerExecutor = workerExecutor; final ScheduledExecutorService scheduler,
final ExecutorService txWorkerExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler; this.scheduler = scheduler;
this.txWorkerExecutor = txWorkerExecutor;
} }
public <T> CompletableFuture<T> scheduleWorkerTask(final Supplier<CompletableFuture<T>> future) { public <T> CompletableFuture<T> scheduleSyncWorkerTask(
final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>(); final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture = final Future<?> workerFuture =
workerExecutor.submit( syncWorkerExecutor.submit(
() -> { () -> {
future future
.get() .get()
@ -91,8 +101,12 @@ public class EthScheduler {
return promise; return promise;
} }
public Future<?> scheduleWorkerTask(final Runnable command) { public Future<?> scheduleSyncWorkerTask(final Runnable command) {
return workerExecutor.submit(command); return syncWorkerExecutor.submit(command);
}
public Future<?> scheduleTxWorkerTask(final Runnable command) {
return txWorkerExecutor.submit(command);
} }
public CompletableFuture<Void> scheduleFutureTask( public CompletableFuture<Void> scheduleFutureTask(
@ -179,7 +193,7 @@ public class EthScheduler {
public void stop() { public void stop() {
if (stopped.compareAndSet(false, true)) { if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName()); LOG.trace("Stopping " + getClass().getSimpleName());
workerExecutor.shutdown(); syncWorkerExecutor.shutdown();
scheduler.shutdown(); scheduler.shutdown();
shutdown.countDown(); shutdown.countDown();
} else { } else {
@ -189,10 +203,10 @@ public class EthScheduler {
public void awaitStop() throws InterruptedException { public void awaitStop() throws InterruptedException {
shutdown.await(); shutdown.await();
if (!workerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
workerExecutor.shutdownNow(); syncWorkerExecutor.shutdownNow();
workerExecutor.awaitTermination(2L, TimeUnit.MINUTES); syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
} }
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) { if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());

@ -128,7 +128,7 @@ public class BlockPropagationManager<C> {
protocolSchedule, protocolContext, readyForImport, HeaderValidationMode.FULL); protocolSchedule, protocolContext, readyForImport, HeaderValidationMode.FULL);
ethContext ethContext
.getScheduler() .getScheduler()
.scheduleWorkerTask(importBlocksTask) .scheduleSyncWorkerTask(importBlocksTask)
.whenComplete( .whenComplete(
(r, t) -> { (r, t) -> {
if (r != null) { if (r != null) {
@ -255,7 +255,7 @@ public class BlockPropagationManager<C> {
final OperationTimer.TimingContext blockTimer = announcedBlockIngestTimer.startTimer(); final OperationTimer.TimingContext blockTimer = announcedBlockIngestTimer.startTimer();
return ethContext return ethContext
.getScheduler() .getScheduler()
.scheduleWorkerTask(importTask::run) .scheduleSyncWorkerTask(importTask::run)
.whenComplete( .whenComplete(
(r, t) -> { (r, t) -> {
if (t != null) { if (t != null) {

@ -52,6 +52,7 @@ public class SynchronizerConfiguration {
private final long trailingPeerBlocksBehindThreshold; private final long trailingPeerBlocksBehindThreshold;
private final int maxTrailingPeers; private final int maxTrailingPeers;
private final int downloaderParallelism; private final int downloaderParallelism;
private final int transactionsParallelism;
private SynchronizerConfiguration( private SynchronizerConfiguration(
final SyncMode requestedSyncMode, final SyncMode requestedSyncMode,
@ -67,7 +68,8 @@ public class SynchronizerConfiguration {
final int downloaderChainSegmentSize, final int downloaderChainSegmentSize,
final long trailingPeerBlocksBehindThreshold, final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers, final int maxTrailingPeers,
final int downloaderParallelism) { final int downloaderParallelism,
final int transactionsParallelism) {
this.requestedSyncMode = requestedSyncMode; this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance; this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate; this.fastSyncFullValidationRate = fastSyncFullValidationRate;
@ -82,6 +84,7 @@ public class SynchronizerConfiguration {
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold; this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
this.maxTrailingPeers = maxTrailingPeers; this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism; this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
} }
/** /**
@ -122,7 +125,8 @@ public class SynchronizerConfiguration {
downloaderChainSegmentSize, downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold, trailingPeerBlocksBehindThreshold,
maxTrailingPeers, maxTrailingPeers,
downloaderParallelism); downloaderParallelism,
transactionsParallelism);
} }
public static Builder builder() { public static Builder builder() {
@ -203,6 +207,10 @@ public class SynchronizerConfiguration {
return downloaderParallelism; return downloaderParallelism;
} }
public int transactionsParallelism() {
return transactionsParallelism;
}
/** /**
* The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all * The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all
* blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight * blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight
@ -228,6 +236,7 @@ public class SynchronizerConfiguration {
private long trailingPeerBlocksBehindThreshold; private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE; private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2; private int downloaderParallelism = 2;
private int transactionsParallelism = 2;
public Builder fastSyncPivotDistance(final int distance) { public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance; fastSyncPivotDistance = distance;
@ -299,6 +308,11 @@ public class SynchronizerConfiguration {
return this; return this;
} }
public Builder transactionsParallelism(final int transactionsParallelism) {
this.transactionsParallelism = transactionsParallelism;
return this;
}
public SynchronizerConfiguration build() { public SynchronizerConfiguration build() {
return new SynchronizerConfiguration( return new SynchronizerConfiguration(
syncMode, syncMode,
@ -314,7 +328,8 @@ public class SynchronizerConfiguration {
downloaderChainSegmentSize, downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold, trailingPeerBlocksBehindThreshold,
maxTrailingPeers, maxTrailingPeers,
downloaderParallelism); downloaderParallelism,
transactionsParallelism);
} }
} }
} }

@ -42,6 +42,6 @@ class TransactionSender implements TransactionBatchAddedListener {
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction))); transaction -> transactionTracker.addToPeerSendQueue(peer, transaction)));
ethContext ethContext
.getScheduler() .getScheduler()
.scheduleWorkerTask(transactionsMessageSender::sendTransactionsToPeers); .scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers);
} }
} }

@ -32,7 +32,7 @@ class TransactionsMessageHandler implements MessageCallback {
@Override @Override
public void exec(final EthMessage message) { public void exec(final EthMessage message) {
final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData()); final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData());
scheduler.scheduleWorkerTask( scheduler.scheduleTxWorkerTask(
() -> () ->
transactionsMessageProcessor.processTransactionsMessage( transactionsMessageProcessor.processTransactionsMessage(
message.getPeer(), transactionsMessage)); message.getPeer(), transactionsMessage));

@ -27,18 +27,22 @@ public class DeterministicEthScheduler extends EthScheduler {
} }
DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) { DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) {
super(new MockExecutorService(), new MockScheduledExecutor()); super(new MockExecutorService(), new MockScheduledExecutor(), new MockExecutorService());
this.timeoutPolicy = timeoutPolicy; this.timeoutPolicy = timeoutPolicy;
} }
MockExecutorService mockWorkerExecutor() { MockExecutorService mockSyncWorkerExecutor() {
return (MockExecutorService) workerExecutor; return (MockExecutorService) syncWorkerExecutor;
} }
MockScheduledExecutor mockScheduledExecutor() { MockScheduledExecutor mockScheduledExecutor() {
return (MockScheduledExecutor) scheduler; return (MockScheduledExecutor) scheduler;
} }
MockScheduledExecutor mockTransactionsExecutor() {
return (MockScheduledExecutor) txWorkerExecutor;
}
@Override @Override
public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Duration timeout) { public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Duration timeout) {
if (timeoutPolicy.shouldTimeout()) { if (timeoutPolicy.shouldTimeout()) {

@ -13,18 +13,22 @@
package tech.pegasys.pantheon.ethereum.eth.manager; package tech.pegasys.pantheon.ethereum.eth.manager;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockBody; import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.EthProtocol; import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
import tech.pegasys.pantheon.ethereum.eth.EthProtocol.EthVersion; import tech.pegasys.pantheon.ethereum.eth.EthProtocol.EthVersion;
@ -40,12 +44,15 @@ import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage; import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage; import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage; import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPoolFactory;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.util.uint.UInt256; import tech.pegasys.pantheon.util.uint.UInt256;
@ -57,6 +64,8 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -72,6 +81,7 @@ public final class EthProtocolManagerTest {
private static Blockchain blockchain; private static Blockchain blockchain;
private static ProtocolSchedule<Void> protocolSchedule; private static ProtocolSchedule<Void> protocolSchedule;
private static BlockDataGenerator gen; private static BlockDataGenerator gen;
private static ProtocolContext<Void> protocolContext;
@BeforeClass @BeforeClass
public static void setup() { public static void setup() {
@ -80,12 +90,13 @@ public final class EthProtocolManagerTest {
blockchainSetupUtil.importAllBlocks(); blockchainSetupUtil.importAllBlocks();
blockchain = blockchainSetupUtil.getBlockchain(); blockchain = blockchainSetupUtil.getBlockchain();
protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
protocolContext = blockchainSetupUtil.getProtocolContext();
assert (blockchainSetupUtil.getMaxBlockNumber() >= 20L); assert (blockchainSetupUtil.getMaxBlockNumber() >= 20L);
} }
@Test @Test
public void disconnectOnUnsolicitedMessage() { public void disconnectOnUnsolicitedMessage() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
@ -96,7 +107,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void disconnectOnFailureToSendStatusMessage() { public void disconnectOnFailureToSendStatusMessage() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = final MockPeerConnection peer =
@ -108,7 +119,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void disconnectOnWrongChainId() { public void disconnectOnWrongChainId() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = final MockPeerConnection peer =
@ -131,7 +142,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void disconnectOnWrongGenesisHash() { public void disconnectOnWrongGenesisHash() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = final MockPeerConnection peer =
@ -154,7 +165,7 @@ public final class EthProtocolManagerTest {
@Test(expected = ConditionTimeoutException.class) @Test(expected = ConditionTimeoutException.class)
public void doNotDisconnectOnValidMessage() { public void doNotDisconnectOnValidMessage() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final MessageData messageData = final MessageData messageData =
GetBlockBodiesMessage.create(Collections.singletonList(gen.hash())); GetBlockBodiesMessage.create(Collections.singletonList(gen.hash()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
@ -169,7 +180,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetHeaders() throws ExecutionException, InterruptedException { public void respondToGetHeaders() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final long startBlock = 5L; final long startBlock = 5L;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = final MessageData messageData =
@ -201,7 +212,7 @@ public final class EthProtocolManagerTest {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
final int limit = 5; final int limit = 5;
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, limit)) { new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) {
final long startBlock = 5L; final long startBlock = 5L;
final int blockCount = 10; final int blockCount = 10;
final MessageData messageData = final MessageData messageData =
@ -231,7 +242,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetHeadersReversed() throws ExecutionException, InterruptedException { public void respondToGetHeadersReversed() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final long endBlock = 10L; final long endBlock = 10L;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true); final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true);
@ -260,7 +271,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetHeadersWithSkip() throws ExecutionException, InterruptedException { public void respondToGetHeadersWithSkip() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final long startBlock = 5L; final long startBlock = 5L;
final int blockCount = 5; final int blockCount = 5;
final int skip = 1; final int skip = 1;
@ -292,7 +303,7 @@ public final class EthProtocolManagerTest {
public void respondToGetHeadersReversedWithSkip() public void respondToGetHeadersReversedWithSkip()
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final long endBlock = 10L; final long endBlock = 10L;
final int blockCount = 5; final int blockCount = 5;
final int skip = 1; final int skip = 1;
@ -336,7 +347,7 @@ public final class EthProtocolManagerTest {
private MockPeerConnection setupPeerWithoutStatusExchange( private MockPeerConnection setupPeerWithoutStatusExchange(
final EthProtocolManager ethManager, final PeerSendHandler onSend) { final EthProtocolManager ethManager, final PeerSendHandler onSend) {
final Set<Capability> caps = new HashSet<>(Arrays.asList(EthProtocol.ETH63)); final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
final MockPeerConnection peer = new MockPeerConnection(caps, onSend); final MockPeerConnection peer = new MockPeerConnection(caps, onSend);
ethManager.handleNewConnection(peer); ethManager.handleNewConnection(peer);
return peer; return peer;
@ -345,7 +356,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetHeadersPartial() throws ExecutionException, InterruptedException { public void respondToGetHeadersPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final long startBlock = blockchain.getChainHeadBlockNumber() - 1L; final long startBlock = blockchain.getChainHeadBlockNumber() - 1L;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = final MessageData messageData =
@ -375,7 +386,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedException { public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final long startBlock = blockchain.getChainHeadBlockNumber() + 1; final long startBlock = blockchain.getChainHeadBlockNumber() + 1;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = final MessageData messageData =
@ -402,7 +413,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetBodies() throws ExecutionException, InterruptedException { public void respondToGetBodies() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
// Setup blocks query // Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2; final int blockCount = 2;
@ -446,7 +457,7 @@ public final class EthProtocolManagerTest {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
final int limit = 5; final int limit = 5;
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, limit)) { new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) {
// Setup blocks query // Setup blocks query
final int blockCount = 10; final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
@ -488,7 +499,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetBodiesPartial() throws ExecutionException, InterruptedException { public void respondToGetBodiesPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
// Setup blocks query // Setup blocks query
final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1; final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1;
final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get(); final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get();
@ -524,7 +535,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetReceipts() throws ExecutionException, InterruptedException { public void respondToGetReceipts() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
// Setup blocks query // Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2; final int blockCount = 2;
@ -567,7 +578,7 @@ public final class EthProtocolManagerTest {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
final int limit = 5; final int limit = 5;
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, limit)) { new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) {
// Setup blocks query // Setup blocks query
final int blockCount = 10; final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
@ -608,7 +619,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void respondToGetReceiptsPartial() throws ExecutionException, InterruptedException { public void respondToGetReceiptsPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
// Setup blocks query // Setup blocks query
final long blockNumber = blockchain.getChainHeadBlockNumber() - 5; final long blockNumber = blockchain.getChainHeadBlockNumber() - 5;
final BlockHeader header = blockchain.getBlockHeader(blockNumber).get(); final BlockHeader header = blockchain.getBlockHeader(blockNumber).get();
@ -643,7 +654,7 @@ public final class EthProtocolManagerTest {
@Test @Test
public void newBlockMinedSendsNewBlockMessageToAllPeers() { public void newBlockMinedSendsNewBlockMessageToAllPeers() {
final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1); final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1);
// Define handler to validate response // Define handler to validate response
final PeerSendHandler onSend = mock(PeerSendHandler.class); final PeerSendHandler onSend = mock(PeerSendHandler.class);
@ -705,7 +716,7 @@ public final class EthProtocolManagerTest {
blockchain.appendBlock(block, receipts); blockchain.appendBlock(block, receipts);
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
final long startBlock = 1L; final long startBlock = 1L;
final int requestedBlockCount = 13; final int requestedBlockCount = 13;
final int receivedBlockCount = 2; final int receivedBlockCount = 2;
@ -728,7 +739,7 @@ public final class EthProtocolManagerTest {
done.complete(null); done.complete(null);
}; };
final Set<Capability> caps = new HashSet<>(Arrays.asList(EthProtocol.ETH63)); final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
final MockPeerConnection peer = new MockPeerConnection(caps, onSend); final MockPeerConnection peer = new MockPeerConnection(caps, onSend);
ethManager.handleNewConnection(peer); ethManager.handleNewConnection(peer);
final StatusMessage statusMessage = final StatusMessage statusMessage =
@ -744,4 +755,38 @@ public final class EthProtocolManagerTest {
done.get(); done.get();
} }
} }
@Test
public void transactionMessagesGoToTheCorrectExecutor() {
// Create a mock ethScheduler to hold our mock executors.
final ExecutorService worker = mock(ExecutorService.class);
final ScheduledExecutorService scheduled = mock(ScheduledExecutorService.class);
final ExecutorService transactions = mock(ExecutorService.class);
final EthScheduler ethScheduler = new EthScheduler(worker, scheduled, transactions);
// Create the fake TransactionMessage to feed to the EthManager.
final BlockDataGenerator gen = new BlockDataGenerator(1);
final List<Transaction> txes = Collections.singletonList(gen.transaction());
final MessageData initialMessage = TransactionsMessage.create(txes);
final MessageData raw = new RawMessage(EthPV62.TRANSACTIONS, initialMessage.getData());
final TransactionsMessage transactionMessage = TransactionsMessage.readFrom(raw);
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, ethScheduler)) {
// Create a transaction pool. This has a side effect of registring a listener for the
// transactions message.
TransactionPoolFactory.createTransactionPool(
protocolSchedule, protocolContext, ethManager.ethContext());
// Send just a transaction message.
final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {});
ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(peer, transactionMessage));
// Verify the regular message executor and scheduled executor got nothing to execute.
verifyZeroInteractions(worker, scheduled);
// Verify our transactions executor got something to execute.
verify(transactions).submit((Runnable) any());
}
}
} }

@ -31,7 +31,7 @@ import org.junit.Test;
public class EthSchedulerTest { public class EthSchedulerTest {
private DeterministicEthScheduler ethScheduler; private DeterministicEthScheduler ethScheduler;
private MockExecutorService workerExecutor; private MockExecutorService syncWorkerExecutor;
private MockScheduledExecutor scheduledExecutor; private MockScheduledExecutor scheduledExecutor;
private AtomicBoolean shouldTimeout; private AtomicBoolean shouldTimeout;
@ -39,14 +39,14 @@ public class EthSchedulerTest {
public void setup() { public void setup() {
shouldTimeout = new AtomicBoolean(false); shouldTimeout = new AtomicBoolean(false);
ethScheduler = new DeterministicEthScheduler(shouldTimeout::get); ethScheduler = new DeterministicEthScheduler(shouldTimeout::get);
workerExecutor = ethScheduler.mockWorkerExecutor(); syncWorkerExecutor = ethScheduler.mockSyncWorkerExecutor();
scheduledExecutor = ethScheduler.mockScheduledExecutor(); scheduledExecutor = ethScheduler.mockScheduledExecutor();
} }
@Test @Test
public void scheduleWorkerTask_completesWhenScheduledTaskCompletes() { public void scheduleWorkerTask_completesWhenScheduledTaskCompletes() {
final CompletableFuture<Object> future = new CompletableFuture<>(); final CompletableFuture<Object> future = new CompletableFuture<>();
final CompletableFuture<Object> result = ethScheduler.scheduleWorkerTask(() -> future); final CompletableFuture<Object> result = ethScheduler.scheduleSyncWorkerTask(() -> future);
assertThat(result.isDone()).isFalse(); assertThat(result.isDone()).isFalse();
future.complete("bla"); future.complete("bla");
@ -58,7 +58,7 @@ public class EthSchedulerTest {
@Test @Test
public void scheduleWorkerTask_completesWhenScheduledTaskFails() { public void scheduleWorkerTask_completesWhenScheduledTaskFails() {
final CompletableFuture<Object> future = new CompletableFuture<>(); final CompletableFuture<Object> future = new CompletableFuture<>();
final CompletableFuture<Object> result = ethScheduler.scheduleWorkerTask(() -> future); final CompletableFuture<Object> result = ethScheduler.scheduleSyncWorkerTask(() -> future);
assertThat(result.isDone()).isFalse(); assertThat(result.isDone()).isFalse();
future.completeExceptionally(new RuntimeException("whoops")); future.completeExceptionally(new RuntimeException("whoops"));
@ -70,7 +70,7 @@ public class EthSchedulerTest {
@Test @Test
public void scheduleWorkerTask_completesWhenScheduledTaskIsCancelled() { public void scheduleWorkerTask_completesWhenScheduledTaskIsCancelled() {
final CompletableFuture<Object> future = new CompletableFuture<>(); final CompletableFuture<Object> future = new CompletableFuture<>();
final CompletableFuture<Object> result = ethScheduler.scheduleWorkerTask(() -> future); final CompletableFuture<Object> result = ethScheduler.scheduleSyncWorkerTask(() -> future);
assertThat(result.isDone()).isFalse(); assertThat(result.isDone()).isFalse();
future.cancel(false); future.cancel(false);
@ -82,10 +82,10 @@ public class EthSchedulerTest {
@Test @Test
public void scheduleWorkerTask_cancelsScheduledFutureWhenResultIsCancelled() { public void scheduleWorkerTask_cancelsScheduledFutureWhenResultIsCancelled() {
final CompletableFuture<Object> result = final CompletableFuture<Object> result =
ethScheduler.scheduleWorkerTask(() -> new CompletableFuture<>()); ethScheduler.scheduleSyncWorkerTask(() -> new CompletableFuture<>());
assertThat(workerExecutor.getScheduledFutures().size()).isEqualTo(1); assertThat(syncWorkerExecutor.getScheduledFutures().size()).isEqualTo(1);
final Future<?> future = workerExecutor.getScheduledFutures().get(0); final Future<?> future = syncWorkerExecutor.getScheduledFutures().get(0);
verify(future, times(0)).cancel(anyBoolean()); verify(future, times(0)).cancel(anyBoolean());
result.cancel(true); result.cancel(true);

@ -99,7 +99,8 @@ public class TestNode implements Closeable {
genesisState.writeStateTo(worldStateArchive.getMutable()); genesisState.writeStateTo(worldStateArchive.getMutable());
final ProtocolContext<Void> protocolContext = final ProtocolContext<Void> protocolContext =
new ProtocolContext<>(blockchain, worldStateArchive, null); new ProtocolContext<>(blockchain, worldStateArchive, null);
final EthProtocolManager ethProtocolManager = new EthProtocolManager(blockchain, 1, false, 1); final EthProtocolManager ethProtocolManager =
new EthProtocolManager(blockchain, 1, false, 1, 1);
final NetworkRunner networkRunner = final NetworkRunner networkRunner =
NetworkRunner.builder() NetworkRunner.builder()

@ -144,7 +144,8 @@ public class CliquePantheonController implements PantheonController<CliqueContex
protocolContext.getBlockchain(), protocolContext.getBlockchain(),
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism()); syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism());
final SyncState syncState = final SyncState syncState =
new SyncState( new SyncState(
protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers()); protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers());

@ -144,7 +144,8 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
protocolContext.getBlockchain(), protocolContext.getBlockchain(),
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism()); syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism());
} else { } else {
ethSubProtocol = EthProtocol.get(); ethSubProtocol = EthProtocol.get();
ethProtocolManager = ethProtocolManager =
@ -152,7 +153,8 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
protocolContext.getBlockchain(), protocolContext.getBlockchain(),
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism()); syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism());
} }
final SyncState syncState = final SyncState syncState =

@ -150,7 +150,8 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
protocolContext.getBlockchain(), protocolContext.getBlockchain(),
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism()); syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism());
final SubProtocol ethSubProtocol = EthProtocol.get(); final SubProtocol ethSubProtocol = EthProtocol.get();
final SyncState syncState = final SyncState syncState =

@ -117,7 +117,8 @@ public class MainnetPantheonController implements PantheonController<Void> {
.getChainId() .getChainId()
.orElse(MainnetProtocolSchedule.DEFAULT_CHAIN_ID), .orElse(MainnetProtocolSchedule.DEFAULT_CHAIN_ID),
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism()); syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism());
final SyncState syncState = final SyncState syncState =
new SyncState( new SyncState(
protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers()); protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers());

Loading…
Cancel
Save