Move starting world state download process inside WorldDownloadState (#1104)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent c2e6594453
commit 58fdc669d4
  1. 20
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java
  2. 16
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask; import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
@ -200,4 +201,23 @@ class WorldDownloadState {
public synchronized void notifyTaskAvailable() { public synchronized void notifyTaskAvailable() {
notifyAll(); notifyAll();
} }
public CompletableFuture<Void> startDownload(
final WorldStateDownloadProcess worldStateDownloadProcess, final EthScheduler ethScheduler) {
this.worldStateDownloadProcess = worldStateDownloadProcess;
final CompletableFuture<Void> processFuture = worldStateDownloadProcess.start(ethScheduler);
processFuture.whenComplete(
(result, error) -> {
if (error != null
&& !(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
// The pipeline is only ever cancelled by us or shutdown closing the EthScheduler
// In either case we don't want to consider the download failed as we either already
// dealing with it or it's just a normal shutdown. Hence, don't propagate
// CancellationException
internalFuture.completeExceptionally(error);
}
});
return downloadFuture;
}
} }

@ -19,9 +19,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
@ -130,19 +128,7 @@ public class WorldStateDownloader {
newDownloadState.setWorldStateDownloadProcess(downloadProcess); newDownloadState.setWorldStateDownloadProcess(downloadProcess);
final CompletableFuture<Void> downloadProcessFuture = return newDownloadState.startDownload(downloadProcess, ethContext.getScheduler());
downloadProcess.start(ethContext.getScheduler());
downloadProcessFuture.whenComplete(
(result, error) -> {
if (error != null
&& !(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
// If the pipeline was cancelled it's because the download state cancelled it
// so don't propagate the cancellation or we'll interfere with the clean up it's
// doing.
newDownloadState.getDownloadFuture().completeExceptionally(error);
}
});
return newDownloadState.getDownloadFuture();
} }
} }

Loading…
Cancel
Save