shutdown improvements (#841)

* register sub tasks in ParalellImportChainSegmentTask
* rememger and shutdown eth services in EthScheduler

Fixes #768
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent 026d157680
commit a18f1663c6
  1. 25
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java
  2. 10
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
  3. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java

@ -117,6 +117,31 @@ public abstract class AbstractEthTask<T> implements EthTask<T> {
} }
} }
/**
* Utility for registring completable futures for cleanup if this EthTask is cancelled.
*
* @param subTaskFuture the future to be reigstered.
* @param <S> the type of data returned from the CompletableFuture
* @return The completableFuture that was executed
*/
protected final <S> CompletableFuture<S> registerSubTask(
final CompletableFuture<S> subTaskFuture) {
synchronized (result) {
if (!isCancelled()) {
subTaskFutures.add(subTaskFuture);
subTaskFuture.whenComplete(
(r, t) -> {
subTaskFutures.remove(subTaskFuture);
});
return subTaskFuture;
} else {
final CompletableFuture<S> future = new CompletableFuture<>();
future.completeExceptionally(new CancellationException());
return future;
}
}
}
/** /**
* Helper method for sending subTask to worker that will clean up if this EthTask is cancelled. * Helper method for sending subTask to worker that will clean up if this EthTask is cancelled.
* *

@ -15,8 +15,10 @@ package tech.pegasys.pantheon.ethereum.eth.manager;
import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.ExceptionUtils;
import java.time.Duration; import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -47,6 +49,8 @@ public class EthScheduler {
private final ExecutorService servicesExecutor; private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor; private final ExecutorService computationExecutor;
private Collection<CompletableFuture<?>> serviceFutures = new ConcurrentLinkedDeque<>();
public EthScheduler( public EthScheduler(
final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) { final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) {
this( this(
@ -126,7 +130,10 @@ public class EthScheduler {
} }
public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) { public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
return task.runAsync(servicesExecutor); final CompletableFuture<T> serviceFuture = task.runAsync(servicesExecutor);
serviceFutures.add(serviceFuture);
serviceFuture.whenComplete((r, t) -> serviceFutures.remove(serviceFuture));
return serviceFuture;
} }
public <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) { public <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
@ -230,6 +237,7 @@ public class EthScheduler {
void awaitStop() throws InterruptedException { void awaitStop() throws InterruptedException {
shutdown.await(); shutdown.await();
serviceFutures.forEach(future -> future.cancel(true));
if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
syncWorkerExecutor.shutdownNow(); syncWorkerExecutor.shutdownNow();

@ -141,12 +141,16 @@ public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B
final EthScheduler scheduler = ethContext.getScheduler(); final EthScheduler scheduler = ethContext.getScheduler();
final CompletableFuture<?> downloadHeaderFuture = final CompletableFuture<?> downloadHeaderFuture =
scheduler.scheduleServiceTask(downloadHeadersTask); scheduler.scheduleServiceTask(downloadHeadersTask);
registerSubTask(downloadHeaderFuture);
final CompletableFuture<?> validateHeaderFuture = final CompletableFuture<?> validateHeaderFuture =
scheduler.scheduleServiceTask(validateHeadersTask); scheduler.scheduleServiceTask(validateHeadersTask);
registerSubTask(validateHeaderFuture);
final CompletableFuture<?> downloadBodiesFuture = final CompletableFuture<?> downloadBodiesFuture =
scheduler.scheduleServiceTask(downloadBodiesTask); scheduler.scheduleServiceTask(downloadBodiesTask);
registerSubTask(downloadBodiesFuture);
final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<List<B>>>> validateBodiesFuture = final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<List<B>>>> validateBodiesFuture =
scheduler.scheduleServiceTask(validateAndImportBodiesTask); scheduler.scheduleServiceTask(validateAndImportBodiesTask);
registerSubTask(validateBodiesFuture);
// Hook in pipeline completion signaling. // Hook in pipeline completion signaling.
downloadHeadersTask.shutdown(); downloadHeadersTask.shutdown();

Loading…
Cancel
Save