Speed up shutdown time (#838)

Modify EthScheduler to use shutdownNow immediately instead of trying shutdown first.

EthScheduler executes a lot of tasks which wait for responses from the network and may have a significant number of tasks queued.  Using shutdown would wait for all network responses and all queued tasks to complete before exiting which almost always reaches the 2 minute timeout allowed before switching to shutdownNow.  All tasks have to cope with being unexpectedly terminated (as would happen with a kill -9) so there's no reason to have this extra delay.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 2091cd0dc3
commit 0f2d7d7c65
  1. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPipelinedPeerTask.java
  2. 29
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
  3. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainDownloader.java
  4. 44
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthSchedulerShutdownTest.java
  5. 12
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockEthTask.java
  6. 5
      pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java

@ -95,7 +95,9 @@ public abstract class AbstractPipelinedPeerTask<I, O> extends AbstractPeerTask<L
} }
protected void failExceptionally(final Throwable t) { protected void failExceptionally(final Throwable t) {
if (!(t instanceof InterruptedException)) {
LOG.error("Task Failure", t); LOG.error("Task Failure", t);
}
processingException.compareAndSet(null, t); processingException.compareAndSet(null, t);
result.get().completeExceptionally(t); result.get().completeExceptionally(t);
cancel(); cancel();

@ -224,11 +224,11 @@ public class EthScheduler {
public void stop() { public void stop() {
if (stopped.compareAndSet(false, true)) { if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName()); LOG.trace("Stopping " + getClass().getSimpleName());
syncWorkerExecutor.shutdown(); syncWorkerExecutor.shutdownNow();
txWorkerExecutor.shutdown(); txWorkerExecutor.shutdownNow();
scheduler.shutdown(); scheduler.shutdownNow();
servicesExecutor.shutdown(); servicesExecutor.shutdownNow();
computationExecutor.shutdown(); computationExecutor.shutdownNow();
shutdown.countDown(); shutdown.countDown();
} else { } else {
LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName()); LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName());
@ -238,33 +238,24 @@ public class EthScheduler {
void awaitStop() throws InterruptedException { void awaitStop() throws InterruptedException {
shutdown.await(); shutdown.await();
serviceFutures.forEach(future -> future.cancel(true)); serviceFutures.forEach(future -> future.cancel(true));
if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { if (!syncWorkerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
syncWorkerExecutor.shutdownNow();
syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
} }
if (!txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { if (!txWorkerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.error( LOG.error(
"{} transaction worker executor did not shutdown cleanly.", "{} transaction worker executor did not shutdown cleanly.",
this.getClass().getSimpleName()); this.getClass().getSimpleName());
txWorkerExecutor.shutdownNow();
txWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
} }
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) { if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
scheduler.shutdownNow(); scheduler.shutdownNow();
scheduler.awaitTermination(2L, TimeUnit.MINUTES);
} }
if (!servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { if (!servicesExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.error("{} services executor did not shutdown cleanly.", this.getClass().getSimpleName()); LOG.error("{} services executor did not shutdown cleanly.", this.getClass().getSimpleName());
servicesExecutor.shutdownNow();
servicesExecutor.awaitTermination(2L, TimeUnit.MINUTES);
} }
if (!computationExecutor.awaitTermination(2L, TimeUnit.MINUTES)) { if (!computationExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.error( LOG.error(
"{} computation executor did not shutdown cleanly.", this.getClass().getSimpleName()); "{} computation executor did not shutdown cleanly.", this.getClass().getSimpleName());
computationExecutor.shutdownNow();
computationExecutor.awaitTermination(2L, TimeUnit.MINUTES);
} }
LOG.trace("{} stopped.", this.getClass().getSimpleName()); LOG.trace("{} stopped.", this.getClass().getSimpleName());
} }

@ -108,6 +108,8 @@ public class ChainDownloader<C> {
LOG.debug("Invalid block downloaded", t); LOG.debug("Invalid block downloaded", t);
} else if (rootCause instanceof EthTaskException) { } else if (rootCause instanceof EthTaskException) {
LOG.debug(rootCause.toString()); LOG.debug(rootCause.toString());
} else if (rootCause instanceof InterruptedException) {
LOG.trace("Interrupted while downloading chain", rootCause);
} else { } else {
LOG.error("Error encountered while downloading", t); LOG.error("Error encountered while downloading", t);
} }
@ -214,6 +216,8 @@ public class ChainDownloader<C> {
final Throwable rootCause = ExceptionUtils.rootCause(t); final Throwable rootCause = ExceptionUtils.rootCause(t);
if (rootCause instanceof EthTaskException) { if (rootCause instanceof EthTaskException) {
LOG.debug(rootCause.toString()); LOG.debug(rootCause.toString());
} else if (rootCause instanceof InterruptedException) {
LOG.trace("Interrupted while importing blocks", rootCause);
} else { } else {
LOG.error("Encountered error importing blocks", t); LOG.error("Encountered error importing blocks", t);
} }

@ -49,32 +49,31 @@ public class EthSchedulerShutdownTest {
@Test @Test
public void shutdown_syncWorkerShutsDown() throws InterruptedException { public void shutdown_syncWorkerShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1); final MockEthTask task1 = new MockEthTask(1);
final MockEthTask task2 = new MockEthTask();
ethScheduler.scheduleSyncWorkerTask(task::executeTask); ethScheduler.scheduleSyncWorkerTask(task1::executeTask);
ethScheduler.scheduleSyncWorkerTask(task2::executeTask);
ethScheduler.stop(); ethScheduler.stop();
assertThat(syncWorkerExecutor.isShutdown()).isTrue(); assertThat(syncWorkerExecutor.isShutdown()).isTrue();
assertThat(syncWorkerExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop(); ethScheduler.awaitStop();
assertThat(syncWorkerExecutor.isShutdown()).isTrue(); assertThat(syncWorkerExecutor.isShutdown()).isTrue();
assertThat(syncWorkerExecutor.isTerminated()).isTrue(); assertThat(syncWorkerExecutor.isTerminated()).isTrue();
assertThat(task2.hasBeenStarted()).isFalse();
} }
@Test @Test
public void shutdown_scheduledWorkerShutsDown() throws InterruptedException { public void shutdown_scheduledWorkerShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1); final MockEthTask task = new MockEthTask(1);
ethScheduler.scheduleFutureTask(task::executeTask, Duration.ofMillis(1)); ethScheduler.scheduleFutureTask(task::executeTask, Duration.ofMillis(0));
ethScheduler.stop(); ethScheduler.stop();
assertThat(scheduledExecutor.isShutdown()).isTrue(); assertThat(scheduledExecutor.isShutdown()).isTrue();
assertThat(scheduledExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop(); ethScheduler.awaitStop();
assertThat(scheduledExecutor.isShutdown()).isTrue(); assertThat(scheduledExecutor.isShutdown()).isTrue();
@ -83,56 +82,63 @@ public class EthSchedulerShutdownTest {
@Test @Test
public void shutdown_txWorkerShutsDown() throws InterruptedException { public void shutdown_txWorkerShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1); final MockEthTask task1 = new MockEthTask(1);
final MockEthTask task2 = new MockEthTask();
ethScheduler.scheduleTxWorkerTask(task::executeTask); ethScheduler.scheduleTxWorkerTask(task1::executeTask);
ethScheduler.scheduleTxWorkerTask(task2::executeTask);
ethScheduler.stop(); ethScheduler.stop();
assertThat(txWorkerExecutor.isShutdown()).isTrue(); assertThat(txWorkerExecutor.isShutdown()).isTrue();
assertThat(txWorkerExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop(); ethScheduler.awaitStop();
assertThat(txWorkerExecutor.isShutdown()).isTrue(); assertThat(txWorkerExecutor.isShutdown()).isTrue();
assertThat(txWorkerExecutor.isTerminated()).isTrue(); assertThat(txWorkerExecutor.isTerminated()).isTrue();
assertThat(task2.hasBeenStarted()).isFalse();
} }
@Test @Test
public void shutdown_servicesShutsDown() throws InterruptedException { public void shutdown_servicesShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1); final MockEthTask task1 = new MockEthTask(1);
final MockEthTask task2 = new MockEthTask();
ethScheduler.scheduleServiceTask(task); ethScheduler.scheduleServiceTask(task1);
ethScheduler.scheduleServiceTask(task2);
ethScheduler.stop(); ethScheduler.stop();
assertThat(servicesExecutor.isShutdown()).isTrue(); assertThat(servicesExecutor.isShutdown()).isTrue();
assertThat(servicesExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop(); ethScheduler.awaitStop();
assertThat(servicesExecutor.isShutdown()).isTrue(); assertThat(servicesExecutor.isShutdown()).isTrue();
assertThat(servicesExecutor.isTerminated()).isTrue(); assertThat(servicesExecutor.isTerminated()).isTrue();
assertThat(task2.hasBeenStarted()).isFalse();
} }
@Test @Test
public void shutdown_computationShutsDown() throws InterruptedException { public void shutdown_computationShutsDown() throws InterruptedException {
final MockEthTask task = new MockEthTask(1); final MockEthTask task1 = new MockEthTask(1);
final MockEthTask task2 = new MockEthTask();
ethScheduler.scheduleComputationTask( ethScheduler.scheduleComputationTask(
() -> { () -> {
task.executeTask(); task1.executeTask();
return Integer.MAX_VALUE;
});
ethScheduler.scheduleComputationTask(
() -> {
task2.executeTask();
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
}); });
ethScheduler.stop(); ethScheduler.stop();
assertThat(computationExecutor.isShutdown()).isTrue(); assertThat(computationExecutor.isShutdown()).isTrue();
assertThat(computationExecutor.isTerminated()).isFalse();
task.countDown();
ethScheduler.awaitStop(); ethScheduler.awaitStop();
assertThat(computationExecutor.isShutdown()).isTrue(); assertThat(computationExecutor.isShutdown()).isTrue();
assertThat(computationExecutor.isTerminated()).isTrue(); assertThat(computationExecutor.isTerminated()).isTrue();
assertThat(task2.hasBeenStarted()).isFalse();
} }
} }

@ -18,7 +18,7 @@ import java.util.concurrent.CountDownLatch;
public class MockEthTask extends AbstractEthTask<Object> { public class MockEthTask extends AbstractEthTask<Object> {
private boolean executed = false; private CountDownLatch startedLatch = new CountDownLatch(1);
private CountDownLatch countdown; private CountDownLatch countdown;
MockEthTask(final int count) { MockEthTask(final int count) {
@ -32,23 +32,19 @@ public class MockEthTask extends AbstractEthTask<Object> {
@Override @Override
protected void executeTask() { protected void executeTask() {
startedLatch.countDown();
try { try {
countdown.await(); countdown.await();
} catch (final InterruptedException ignore) { } catch (final InterruptedException ignore) {
} }
executed = true;
}
void countDown() {
countdown.countDown();
} }
boolean hasBeenStarted() { boolean hasBeenStarted() {
return executed; return startedLatch.getCount() == 0;
} }
void complete() { void complete() {
if (executed) { if (hasBeenStarted() && countdown.getCount() == 0) {
result.get().complete(null); result.get().complete(null);
} }
} }

@ -74,6 +74,7 @@ import com.google.common.io.Resources;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.json.DecodeException; import io.vertx.core.json.DecodeException;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator; import org.apache.logging.log4j.core.config.Configurator;
import picocli.CommandLine; import picocli.CommandLine;
@ -843,8 +844,10 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
() -> { () -> {
try { try {
runner.close(); runner.close();
LogManager.shutdown();
} catch (final Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); logger.error("Failed to stop Pantheon");
} }
})); }));
} }

Loading…
Cancel
Save