diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java index bd5c0ca6bc..ad2dfc0003 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java @@ -117,6 +117,31 @@ public abstract class AbstractEthTask implements EthTask { } } + /** + * Utility for registring completable futures for cleanup if this EthTask is cancelled. + * + * @param subTaskFuture the future to be reigstered. + * @param the type of data returned from the CompletableFuture + * @return The completableFuture that was executed + */ + protected final CompletableFuture registerSubTask( + final CompletableFuture subTaskFuture) { + synchronized (result) { + if (!isCancelled()) { + subTaskFutures.add(subTaskFuture); + subTaskFuture.whenComplete( + (r, t) -> { + subTaskFutures.remove(subTaskFuture); + }); + return subTaskFuture; + } else { + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new CancellationException()); + return future; + } + } + } + /** * Helper method for sending subTask to worker that will clean up if this EthTask is cancelled. * diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java index 95317a11ca..12765e532a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java @@ -15,8 +15,10 @@ package tech.pegasys.pantheon.ethereum.eth.manager; import tech.pegasys.pantheon.util.ExceptionUtils; import java.time.Duration; +import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,6 +49,8 @@ public class EthScheduler { private final ExecutorService servicesExecutor; private final ExecutorService computationExecutor; + private Collection> serviceFutures = new ConcurrentLinkedDeque<>(); + public EthScheduler( final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) { this( @@ -126,7 +130,10 @@ public class EthScheduler { } public CompletableFuture scheduleServiceTask(final EthTask task) { - return task.runAsync(servicesExecutor); + final CompletableFuture serviceFuture = task.runAsync(servicesExecutor); + serviceFutures.add(serviceFuture); + serviceFuture.whenComplete((r, t) -> serviceFutures.remove(serviceFuture)); + return serviceFuture; } public CompletableFuture scheduleComputationTask(final Supplier computation) { @@ -230,6 +237,7 @@ public class EthScheduler { void awaitStop() throws InterruptedException { shutdown.await(); + serviceFutures.forEach(future -> future.cancel(true)); if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName()); syncWorkerExecutor.shutdownNow(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java index f9f5ff0377..d1a67fe887 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java @@ -141,12 +141,16 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask downloadHeaderFuture = scheduler.scheduleServiceTask(downloadHeadersTask); + registerSubTask(downloadHeaderFuture); final CompletableFuture validateHeaderFuture = scheduler.scheduleServiceTask(validateHeadersTask); + registerSubTask(validateHeaderFuture); final CompletableFuture downloadBodiesFuture = scheduler.scheduleServiceTask(downloadBodiesTask); + registerSubTask(downloadBodiesFuture); final CompletableFuture>>> validateBodiesFuture = scheduler.scheduleServiceTask(validateAndImportBodiesTask); + registerSubTask(validateBodiesFuture); // Hook in pipeline completion signaling. downloadHeadersTask.shutdown();