EthScheduler additions (#767)

Add the services thread pool and a computation thread pool to the
EthScheduler.

* Services are long running, sequential, and infrequently start tasks
  such as Full Sync and Fast Sync.
* Computations are short and high CPU intensity tasks such as ECDSA
  signature extractions and POW validation.  The intent is that each
  runnable represents one such extraction and the extractions from a
  block are saturated across available processing power.  These
  computations should have zero dependencies outside their object and
  thread.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent 2d8d38fa7c
commit 1135e539e8
  1. 12
      consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java
  2. 7
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
  3. 61
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
  4. 27
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
  5. 7
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java
  6. 66
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
  7. 138
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerShutdownTest.java
  8. 26
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockEthTask.java
  9. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
  10. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
  11. 6
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java
  12. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java
  13. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java

@ -32,8 +32,16 @@ public class Istanbul64ProtocolManager extends EthProtocolManager {
final int networkId, final int networkId,
final boolean fastSyncEnabled, final boolean fastSyncEnabled,
final int syncWorkers, final int syncWorkers,
final int txWorkers) { final int txWorkers,
super(blockchain, worldStateArchive, networkId, fastSyncEnabled, syncWorkers, txWorkers); final int computationWorkers) {
super(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers);
} }
@Override @Override

@ -95,6 +95,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final boolean fastSyncEnabled, final boolean fastSyncEnabled,
final int syncWorkers, final int syncWorkers,
final int txWorkers, final int txWorkers,
final int computationWorkers,
final int requestLimit) { final int requestLimit) {
this( this(
blockchain, blockchain,
@ -102,7 +103,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
requestLimit, requestLimit,
new EthScheduler(syncWorkers, txWorkers)); new EthScheduler(syncWorkers, txWorkers, computationWorkers));
} }
public EthProtocolManager( public EthProtocolManager(
@ -111,7 +112,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final int networkId, final int networkId,
final boolean fastSyncEnabled, final boolean fastSyncEnabled,
final int syncWorkers, final int syncWorkers,
final int txWorkers) { final int txWorkers,
final int computationWorkers) {
this( this(
blockchain, blockchain,
worldStateArchive, worldStateArchive,
@ -119,6 +121,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
fastSyncEnabled, fastSyncEnabled,
syncWorkers, syncWorkers,
txWorkers, txWorkers,
computationWorkers,
DEFAULT_REQUEST_LIMIT); DEFAULT_REQUEST_LIMIT);
} }

@ -44,8 +44,11 @@ public class EthScheduler {
protected final ExecutorService syncWorkerExecutor; protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler; protected final ScheduledExecutorService scheduler;
protected final ExecutorService txWorkerExecutor; protected final ExecutorService txWorkerExecutor;
private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor;
EthScheduler(final int syncWorkerCount, final int txWorkerCount) { EthScheduler(
final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) {
this( this(
Executors.newFixedThreadPool( Executors.newFixedThreadPool(
syncWorkerCount, syncWorkerCount,
@ -62,16 +65,29 @@ public class EthScheduler {
txWorkerCount, txWorkerCount,
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions-%d") .setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions-%d")
.build()),
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Services-%d")
.build()),
Executors.newFixedThreadPool(
computationWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Computation-%d")
.build())); .build()));
} }
protected EthScheduler( protected EthScheduler(
final ExecutorService syncWorkerExecutor, final ExecutorService syncWorkerExecutor,
final ScheduledExecutorService scheduler, final ScheduledExecutorService scheduler,
final ExecutorService txWorkerExecutor) { final ExecutorService txWorkerExecutor,
final ExecutorService servicesExecutor,
final ExecutorService computationExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor; this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler; this.scheduler = scheduler;
this.txWorkerExecutor = txWorkerExecutor; this.txWorkerExecutor = txWorkerExecutor;
this.servicesExecutor = servicesExecutor;
this.computationExecutor = computationExecutor;
} }
public <T> CompletableFuture<T> scheduleSyncWorkerTask( public <T> CompletableFuture<T> scheduleSyncWorkerTask(
@ -101,12 +117,20 @@ public class EthScheduler {
return promise; return promise;
} }
public Future<?> scheduleSyncWorkerTask(final Runnable command) { public void scheduleSyncWorkerTask(final Runnable command) {
return syncWorkerExecutor.submit(command); syncWorkerExecutor.submit(command);
}
public void scheduleTxWorkerTask(final Runnable command) {
txWorkerExecutor.submit(command);
}
CompletableFuture<Void> scheduleServiceTask(final Runnable service) {
return CompletableFuture.runAsync(service, servicesExecutor);
} }
public Future<?> scheduleTxWorkerTask(final Runnable command) { <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
return txWorkerExecutor.submit(command); return CompletableFuture.supplyAsync(computation, computationExecutor);
} }
public CompletableFuture<Void> scheduleFutureTask( public CompletableFuture<Void> scheduleFutureTask(
@ -194,25 +218,46 @@ public class EthScheduler {
if (stopped.compareAndSet(false, true)) { if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName()); LOG.trace("Stopping " + getClass().getSimpleName());
syncWorkerExecutor.shutdown(); syncWorkerExecutor.shutdown();
txWorkerExecutor.shutdown();
scheduler.shutdown(); scheduler.shutdown();
servicesExecutor.shutdown();
computationExecutor.shutdown();
shutdown.countDown(); shutdown.countDown();
} else { } else {
LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName()); LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName());
} }
} }
public void awaitStop() throws InterruptedException { void awaitStop() throws InterruptedException {
shutdown.await(); shutdown.await();
if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
syncWorkerExecutor.shutdownNow(); syncWorkerExecutor.shutdownNow();
syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES); syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
} }
if (!txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error(
"{} transaction worker executor did not shutdown cleanly.",
this.getClass().getSimpleName());
txWorkerExecutor.shutdownNow();
txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) { if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
scheduler.shutdownNow(); scheduler.shutdownNow();
scheduler.awaitTermination(2L, TimeUnit.MINUTES); scheduler.awaitTermination(2L, TimeUnit.MINUTES);
} }
if (!servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} services executor did not shutdown cleanly.", this.getClass().getSimpleName());
servicesExecutor.shutdownNow();
servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!computationExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error(
"{} computation executor did not shutdown cleanly.", this.getClass().getSimpleName());
computationExecutor.shutdownNow();
computationExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
LOG.trace("{} stopped.", this.getClass().getSimpleName()); LOG.trace("{} stopped.", this.getClass().getSimpleName());
} }
@ -222,7 +267,7 @@ public class EthScheduler {
return promise; return promise;
} }
public <T> void failAfterTimeout(final CompletableFuture<T> promise) { <T> void failAfterTimeout(final CompletableFuture<T> promise) {
failAfterTimeout(promise, defaultTimeout); failAfterTimeout(promise, defaultTimeout);
} }

@ -29,9 +29,9 @@ public class SynchronizerConfiguration {
private static final Logger LOG = LogManager.getLogger(); private static final Logger LOG = LogManager.getLogger();
// TODO: Determine reasonable defaults here // TODO: Determine reasonable defaults here
public static int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500; public static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static float DEFAULT_FULL_VALIDATION_RATE = .1f; public static final float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5; public static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5); private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 200; private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 200;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10; private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;
@ -62,6 +62,7 @@ public class SynchronizerConfiguration {
private final int maxTrailingPeers; private final int maxTrailingPeers;
private final int downloaderParallelism; private final int downloaderParallelism;
private final int transactionsParallelism; private final int transactionsParallelism;
private final int computationParallelism;
private SynchronizerConfiguration( private SynchronizerConfiguration(
final SyncMode requestedSyncMode, final SyncMode requestedSyncMode,
@ -82,7 +83,8 @@ public class SynchronizerConfiguration {
final long trailingPeerBlocksBehindThreshold, final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers, final int maxTrailingPeers,
final int downloaderParallelism, final int downloaderParallelism,
final int transactionsParallelism) { final int transactionsParallelism,
final int computationParallelism) {
this.requestedSyncMode = requestedSyncMode; this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance; this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate; this.fastSyncFullValidationRate = fastSyncFullValidationRate;
@ -102,6 +104,7 @@ public class SynchronizerConfiguration {
this.maxTrailingPeers = maxTrailingPeers; this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism; this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism; this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
} }
/** /**
@ -147,7 +150,8 @@ public class SynchronizerConfiguration {
trailingPeerBlocksBehindThreshold, trailingPeerBlocksBehindThreshold,
maxTrailingPeers, maxTrailingPeers,
downloaderParallelism, downloaderParallelism,
transactionsParallelism); transactionsParallelism,
computationParallelism);
} }
public static Builder builder() { public static Builder builder() {
@ -232,6 +236,10 @@ public class SynchronizerConfiguration {
return transactionsParallelism; return transactionsParallelism;
} }
public int computationParallelism() {
return computationParallelism;
}
/** /**
* The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all * The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all
* blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight * blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight
@ -274,6 +282,7 @@ public class SynchronizerConfiguration {
private int maxTrailingPeers = Integer.MAX_VALUE; private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2; private int downloaderParallelism = 2;
private int transactionsParallelism = 2; private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();
public Builder fastSyncPivotDistance(final int distance) { public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance; fastSyncPivotDistance = distance;
@ -350,6 +359,11 @@ public class SynchronizerConfiguration {
return this; return this;
} }
public Builder computationParallelism(final int computationParallelism) {
this.computationParallelism = computationParallelism;
return this;
}
public SynchronizerConfiguration build() { public SynchronizerConfiguration build() {
return new SynchronizerConfiguration( return new SynchronizerConfiguration(
syncMode, syncMode,
@ -370,7 +384,8 @@ public class SynchronizerConfiguration {
trailingPeerBlocksBehindThreshold, trailingPeerBlocksBehindThreshold,
maxTrailingPeers, maxTrailingPeers,
downloaderParallelism, downloaderParallelism,
transactionsParallelism); transactionsParallelism,
computationParallelism);
} }
} }
} }

@ -28,7 +28,12 @@ public class DeterministicEthScheduler extends EthScheduler {
} }
public DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) { public DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) {
super(new MockExecutorService(), new MockScheduledExecutor(), new MockExecutorService()); super(
new MockExecutorService(),
new MockScheduledExecutor(),
new MockExecutorService(),
new MockExecutorService(),
new MockExecutorService());
this.timeoutPolicy = timeoutPolicy; this.timeoutPolicy = timeoutPolicy;
} }

@ -80,6 +80,8 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
// NullPointerExceptions on optional.get() will result in test failures anyway
@SuppressWarnings("OptionalGetWithoutIsPresent")
public final class EthProtocolManagerTest { public final class EthProtocolManagerTest {
private static Blockchain blockchain; private static Blockchain blockchain;
@ -101,7 +103,8 @@ public final class EthProtocolManagerTest {
@Test @Test
public void disconnectOnUnsolicitedMessage() { public void disconnectOnUnsolicitedMessage() {
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
@ -113,7 +116,8 @@ public final class EthProtocolManagerTest {
@Test @Test
public void disconnectOnFailureToSendStatusMessage() { public void disconnectOnFailureToSendStatusMessage() {
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = final MockPeerConnection peer =
@ -126,7 +130,8 @@ public final class EthProtocolManagerTest {
@Test @Test
public void disconnectOnWrongChainId() { public void disconnectOnWrongChainId() {
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = final MockPeerConnection peer =
@ -150,7 +155,8 @@ public final class EthProtocolManagerTest {
@Test @Test
public void disconnectOnWrongGenesisHash() { public void disconnectOnWrongGenesisHash() {
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final MessageData messageData = final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = final MockPeerConnection peer =
@ -174,7 +180,8 @@ public final class EthProtocolManagerTest {
@Test(expected = ConditionTimeoutException.class) @Test(expected = ConditionTimeoutException.class)
public void doNotDisconnectOnValidMessage() { public void doNotDisconnectOnValidMessage() {
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final MessageData messageData = final MessageData messageData =
GetBlockBodiesMessage.create(Collections.singletonList(gen.hash())); GetBlockBodiesMessage.create(Collections.singletonList(gen.hash()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
@ -190,7 +197,8 @@ public final class EthProtocolManagerTest {
public void respondToGetHeaders() throws ExecutionException, InterruptedException { public void respondToGetHeaders() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final long startBlock = 5L; final long startBlock = 5L;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = final MessageData messageData =
@ -223,7 +231,7 @@ public final class EthProtocolManagerTest {
final int limit = 5; final int limit = 5;
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager( new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, limit)) { blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1, limit)) {
final long startBlock = 5L; final long startBlock = 5L;
final int blockCount = 10; final int blockCount = 10;
final MessageData messageData = final MessageData messageData =
@ -254,7 +262,8 @@ public final class EthProtocolManagerTest {
public void respondToGetHeadersReversed() throws ExecutionException, InterruptedException { public void respondToGetHeadersReversed() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final long endBlock = 10L; final long endBlock = 10L;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true); final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true);
@ -284,7 +293,8 @@ public final class EthProtocolManagerTest {
public void respondToGetHeadersWithSkip() throws ExecutionException, InterruptedException { public void respondToGetHeadersWithSkip() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final long startBlock = 5L; final long startBlock = 5L;
final int blockCount = 5; final int blockCount = 5;
final int skip = 1; final int skip = 1;
@ -317,7 +327,8 @@ public final class EthProtocolManagerTest {
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final long endBlock = 10L; final long endBlock = 10L;
final int blockCount = 5; final int blockCount = 5;
final int skip = 1; final int skip = 1;
@ -371,7 +382,8 @@ public final class EthProtocolManagerTest {
public void respondToGetHeadersPartial() throws ExecutionException, InterruptedException { public void respondToGetHeadersPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final long startBlock = blockchain.getChainHeadBlockNumber() - 1L; final long startBlock = blockchain.getChainHeadBlockNumber() - 1L;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = final MessageData messageData =
@ -402,7 +414,8 @@ public final class EthProtocolManagerTest {
public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedException { public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final long startBlock = blockchain.getChainHeadBlockNumber() + 1; final long startBlock = blockchain.getChainHeadBlockNumber() + 1;
final int blockCount = 5; final int blockCount = 5;
final MessageData messageData = final MessageData messageData =
@ -430,7 +443,8 @@ public final class EthProtocolManagerTest {
public void respondToGetBodies() throws ExecutionException, InterruptedException { public void respondToGetBodies() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
// Setup blocks query // Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2; final int blockCount = 2;
@ -475,7 +489,7 @@ public final class EthProtocolManagerTest {
final int limit = 5; final int limit = 5;
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager( new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, limit)) { blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1, limit)) {
// Setup blocks query // Setup blocks query
final int blockCount = 10; final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
@ -518,7 +532,8 @@ public final class EthProtocolManagerTest {
public void respondToGetBodiesPartial() throws ExecutionException, InterruptedException { public void respondToGetBodiesPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
// Setup blocks query // Setup blocks query
final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1; final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1;
final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get(); final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get();
@ -555,7 +570,8 @@ public final class EthProtocolManagerTest {
public void respondToGetReceipts() throws ExecutionException, InterruptedException { public void respondToGetReceipts() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
// Setup blocks query // Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2; final int blockCount = 2;
@ -599,7 +615,7 @@ public final class EthProtocolManagerTest {
final int limit = 5; final int limit = 5;
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager( new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, limit)) { blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1, limit)) {
// Setup blocks query // Setup blocks query
final int blockCount = 10; final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
@ -641,7 +657,8 @@ public final class EthProtocolManagerTest {
public void respondToGetReceiptsPartial() throws ExecutionException, InterruptedException { public void respondToGetReceiptsPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
// Setup blocks query // Setup blocks query
final long blockNumber = blockchain.getChainHeadBlockNumber() - 5; final long blockNumber = blockchain.getChainHeadBlockNumber() - 5;
final BlockHeader header = blockchain.getBlockHeader(blockNumber).get(); final BlockHeader header = blockchain.getBlockHeader(blockNumber).get();
@ -680,7 +697,7 @@ public final class EthProtocolManagerTest {
final WorldStateArchive worldStateArchive = protocolContext.getWorldStateArchive(); final WorldStateArchive worldStateArchive = protocolContext.getWorldStateArchive();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, worldStateArchive, 1, true, 1, 1)) { new EthProtocolManager(blockchain, worldStateArchive, 1, true, 1, 1, 1)) {
// Setup node data query // Setup node data query
final List<BytesValue> expectedResults = new ArrayList<>(); final List<BytesValue> expectedResults = new ArrayList<>();
@ -722,7 +739,8 @@ public final class EthProtocolManagerTest {
@Test @Test
public void newBlockMinedSendsNewBlockMessageToAllPeers() { public void newBlockMinedSendsNewBlockMessageToAllPeers() {
final EthProtocolManager ethManager = final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1); new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1);
// Define handler to validate response // Define handler to validate response
final PeerSendHandler onSend = mock(PeerSendHandler.class); final PeerSendHandler onSend = mock(PeerSendHandler.class);
@ -785,7 +803,8 @@ public final class EthProtocolManagerTest {
final CompletableFuture<Void> done = new CompletableFuture<>(); final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) { new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) {
final long startBlock = 1L; final long startBlock = 1L;
final int requestedBlockCount = 13; final int requestedBlockCount = 13;
final int receivedBlockCount = 2; final int receivedBlockCount = 2;
@ -831,7 +850,10 @@ public final class EthProtocolManagerTest {
final ExecutorService worker = mock(ExecutorService.class); final ExecutorService worker = mock(ExecutorService.class);
final ScheduledExecutorService scheduled = mock(ScheduledExecutorService.class); final ScheduledExecutorService scheduled = mock(ScheduledExecutorService.class);
final ExecutorService transactions = mock(ExecutorService.class); final ExecutorService transactions = mock(ExecutorService.class);
final EthScheduler ethScheduler = new EthScheduler(worker, scheduled, transactions); final ExecutorService services = mock(ExecutorService.class);
final ExecutorService computations = mock(ExecutorService.class);
final EthScheduler ethScheduler =
new EthScheduler(worker, scheduled, transactions, services, computations);
// Create the fake TransactionMessage to feed to the EthManager. // Create the fake TransactionMessage to feed to the EthManager.
final BlockDataGenerator gen = new BlockDataGenerator(1); final BlockDataGenerator gen = new BlockDataGenerator(1);

@ -0,0 +1,138 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Test;
public class EthSchedulerShutdownTest {
private EthScheduler ethScheduler;
private ExecutorService syncWorkerExecutor;
private ScheduledExecutorService scheduledExecutor;
private ExecutorService txWorkerExecutor;
private ExecutorService servicesExecutor;
private ExecutorService computationExecutor;
@Before
public void setup() {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
syncWorkerExecutor = Executors.newSingleThreadExecutor();
txWorkerExecutor = Executors.newSingleThreadExecutor();
servicesExecutor = Executors.newSingleThreadExecutor();
computationExecutor = Executors.newSingleThreadExecutor();
ethScheduler =
new EthScheduler(
syncWorkerExecutor,
scheduledExecutor,
txWorkerExecutor,
servicesExecutor,
computationExecutor);
}
@Test
public void shutdown_syncWorkerShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1);
ethScheduler.scheduleSyncWorkerTask(task::executeTask);
ethScheduler.stop();
assertThat(syncWorkerExecutor.isShutdown()).isTrue();
assertThat(syncWorkerExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop();
assertThat(syncWorkerExecutor.isShutdown()).isTrue();
assertThat(syncWorkerExecutor.isTerminated()).isTrue();
}
@Test
public void shutdown_scheduledWorkerShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1);
ethScheduler.scheduleFutureTask(task::executeTask, Duration.ofMillis(1));
ethScheduler.stop();
assertThat(scheduledExecutor.isShutdown()).isTrue();
assertThat(scheduledExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop();
assertThat(scheduledExecutor.isShutdown()).isTrue();
assertThat(scheduledExecutor.isTerminated()).isTrue();
}
@Test
public void shutdown_txWorkerShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1);
ethScheduler.scheduleTxWorkerTask(task::executeTask);
ethScheduler.stop();
assertThat(txWorkerExecutor.isShutdown()).isTrue();
assertThat(txWorkerExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop();
assertThat(txWorkerExecutor.isShutdown()).isTrue();
assertThat(txWorkerExecutor.isTerminated()).isTrue();
}
@Test
public void shutdown_servicesShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1);
ethScheduler.scheduleServiceTask(task::executeTask);
ethScheduler.stop();
assertThat(servicesExecutor.isShutdown()).isTrue();
assertThat(servicesExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop();
assertThat(servicesExecutor.isShutdown()).isTrue();
assertThat(servicesExecutor.isTerminated()).isTrue();
}
@Test
public void shutdown_computationShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1);
ethScheduler.scheduleComputationTask(
() -> {
task.executeTask();
return Integer.MAX_VALUE;
});
ethScheduler.stop();
assertThat(computationExecutor.isShutdown()).isTrue();
assertThat(computationExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop();
assertThat(computationExecutor.isShutdown()).isTrue();
assertThat(computationExecutor.isTerminated()).isTrue();
}
}

@ -14,28 +14,46 @@ package tech.pegasys.pantheon.ethereum.eth.manager;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.concurrent.CountDownLatch;
public class MockEthTask extends AbstractEthTask<Object> { public class MockEthTask extends AbstractEthTask<Object> {
private boolean executed = false; private boolean executed = false;
private CountDownLatch countdown;
protected MockEthTask() { MockEthTask(final int count) {
super(NoOpMetricsSystem.NO_OP_LABELLED_TIMER); super(NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
countdown = new CountDownLatch(count);
}
MockEthTask() {
this(0);
} }
@Override @Override
protected void executeTask() { protected void executeTask() {
try {
countdown.await();
} catch (final InterruptedException ignore) {
}
executed = true; executed = true;
} }
public boolean hasBeenStarted() { void countDown() {
countdown.countDown();
}
boolean hasBeenStarted() {
return executed; return executed;
} }
public void complete() { void complete() {
if (executed) {
result.get().complete(null); result.get().complete(null);
} }
}
public void fail() { void fail() {
result.get().completeExceptionally(new RuntimeException("Failure forced for testing")); result.get().completeExceptionally(new RuntimeException("Failure forced for testing"));
} }
} }

@ -104,7 +104,7 @@ public class TestNode implements Closeable {
final ProtocolContext<Void> protocolContext = final ProtocolContext<Void> protocolContext =
new ProtocolContext<>(blockchain, worldStateArchive, null); new ProtocolContext<>(blockchain, worldStateArchive, null);
final EthProtocolManager ethProtocolManager = final EthProtocolManager ethProtocolManager =
new EthProtocolManager(blockchain, worldStateArchive, 1, false, 1, 1); new EthProtocolManager(blockchain, worldStateArchive, 1, false, 1, 1, 1);
final NetworkRunner networkRunner = final NetworkRunner networkRunner =
NetworkRunner.builder() NetworkRunner.builder()

@ -151,7 +151,8 @@ public class CliquePantheonController implements PantheonController<CliqueContex
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism(), syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism()); syncConfig.transactionsParallelism(),
syncConfig.computationParallelism());
final SyncState syncState = final SyncState syncState =
new SyncState( new SyncState(
protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers()); protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers());

@ -149,7 +149,8 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism(), syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism()); syncConfig.transactionsParallelism(),
syncConfig.computationParallelism());
} else { } else {
ethSubProtocol = EthProtocol.get(); ethSubProtocol = EthProtocol.get();
ethProtocolManager = ethProtocolManager =
@ -159,7 +160,8 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism(), syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism()); syncConfig.transactionsParallelism(),
syncConfig.computationParallelism());
} }
final SyncState syncState = final SyncState syncState =

@ -165,7 +165,8 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
networkId, networkId,
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism(), syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism()); syncConfig.transactionsParallelism(),
syncConfig.computationParallelism());
final SubProtocol ethSubProtocol = EthProtocol.get(); final SubProtocol ethSubProtocol = EthProtocol.get();
final SyncState syncState = final SyncState syncState =

@ -126,7 +126,8 @@ public class MainnetPantheonController implements PantheonController<Void> {
.orElse(MainnetProtocolSchedule.DEFAULT_CHAIN_ID), .orElse(MainnetProtocolSchedule.DEFAULT_CHAIN_ID),
fastSyncEnabled, fastSyncEnabled,
syncConfig.downloaderParallelism(), syncConfig.downloaderParallelism(),
syncConfig.transactionsParallelism()); syncConfig.transactionsParallelism(),
syncConfig.computationParallelism());
final SyncState syncState = final SyncState syncState =
new SyncState( new SyncState(
protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers()); protocolContext.getBlockchain(), ethProtocolManager.ethContext().getEthPeers());

Loading…
Cancel
Save