From 768293385f100740625b9cc79165da33990aa2ff Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Wed, 20 Feb 2019 07:05:02 +1000 Subject: [PATCH] Process world state download data on a worker thread (#898) Signed-off-by: Adrian Sutton --- .../eth/sync/SynchronizerConfiguration.java | 2 +- .../sync/worldstate/WorldStateDownloader.java | 90 ++++++++++++------- 2 files changed, 57 insertions(+), 35 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 4abb3f3da3..01aefba71c 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -226,7 +226,7 @@ public class SynchronizerConfiguration { private int downloaderChainSegmentSize = 200; private long trailingPeerBlocksBehindThreshold; private int maxTrailingPeers = Integer.MAX_VALUE; - private int downloaderParallelism = 2; + private int downloaderParallelism = 4; private int transactionsParallelism = 2; private int computationParallelism = Runtime.getRuntime().availableProcessors(); private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 7f3f22408f..f4fef8b6ab 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -16,6 +16,7 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask; @@ -41,8 +42,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -148,8 +151,7 @@ public class WorldStateDownloader { pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(stateRoot)); } } - - requestNodeData(header); + ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header)); return future; } @@ -246,38 +248,58 @@ public class WorldStateDownloader { .run() .thenApply(PeerTaskResult::getResult) .thenApply(this::mapNodeDataByHash) - .handle( - (data, err) -> { - final boolean requestFailed = err != null; - final Updater storageUpdater = worldStateStorage.updater(); - for (final Task task : requestTasks) { - final NodeDataRequest request = task.getData(); - final BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); - if (matchingData == null) { - retriedRequestsTotal.inc(); - final int requestFailures = request.trackFailure(); - updateHighestRetryCount(requestFailures); - if (requestFailures > maxNodeRequestRetries) { - handleStalledDownload(); - } - task.markFailed(); - } else { - completedRequestsCounter.inc(); - // Persist request data - request.setData(matchingData); - if (isRootState(blockHeader, request)) { - rootNode = request.getData(); - } else { - request.persist(storageUpdater); - } - - queueChildRequests(request); - task.markCompleted(); - } + .exceptionally( + error -> { + final Throwable rootCause = ExceptionUtils.rootCause(error); + if (!(rootCause instanceof TimeoutException + || rootCause instanceof InterruptedException + || rootCause instanceof CancellationException + || rootCause instanceof EthTaskException)) { + LOG.debug("GetNodeDataRequest failed", error); } - storageUpdater.commit(); - return ethTask; - }); + return Collections.emptyMap(); + }) + .thenCompose( + data -> + ethContext + .getScheduler() + .scheduleSyncWorkerTask( + () -> storeData(requestTasks, blockHeader, ethTask, data))); + } + + private CompletableFuture>> storeData( + final List> requestTasks, + final BlockHeader blockHeader, + final AbstractPeerTask> ethTask, + final Map data) { + final Updater storageUpdater = worldStateStorage.updater(); + for (final Task task : requestTasks) { + final NodeDataRequest request = task.getData(); + final BytesValue matchingData = data.get(request.getHash()); + if (matchingData == null) { + retriedRequestsTotal.inc(); + final int requestFailures = request.trackFailure(); + updateHighestRetryCount(requestFailures); + if (requestFailures > maxNodeRequestRetries) { + handleStalledDownload(); + } + task.markFailed(); + } else { + completedRequestsCounter.inc(); + // Persist request data + request.setData(matchingData); + if (isRootState(blockHeader, request)) { + rootNode = request.getData(); + } else { + request.persist(storageUpdater); + } + + queueChildRequests(request); + task.markCompleted(); + } + } + storageUpdater.commit(); + return CompletableFuture.completedFuture(ethTask); } private void updateHighestRetryCount(final int requestFailures) { @@ -351,7 +373,7 @@ public class WorldStateDownloader { private Map mapNodeDataByHash(final List data) { // Map data by hash final Map dataByHash = new HashMap<>(); - data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d)); + data.forEach(d -> dataByHash.put(Hash.hash(d), d)); return dataByHash; } }