Process world state download data on a worker thread (#898)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent d00ebf4521
commit 768293385f
  1. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
  2. 90
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java

@ -226,7 +226,7 @@ public class SynchronizerConfiguration {
private int downloaderChainSegmentSize = 200;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
private int downloaderParallelism = 4;
private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;

@ -16,6 +16,7 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
@ -41,8 +42,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -148,8 +151,7 @@ public class WorldStateDownloader {
pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(stateRoot));
}
}
requestNodeData(header);
ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header));
return future;
}
@ -246,38 +248,58 @@ public class WorldStateDownloader {
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(this::mapNodeDataByHash)
.handle(
(data, err) -> {
final boolean requestFailed = err != null;
final Updater storageUpdater = worldStateStorage.updater();
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
}
task.markFailed();
} else {
completedRequestsCounter.inc();
// Persist request data
request.setData(matchingData);
if (isRootState(blockHeader, request)) {
rootNode = request.getData();
} else {
request.persist(storageUpdater);
}
queueChildRequests(request);
task.markCompleted();
}
.exceptionally(
error -> {
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (!(rootCause instanceof TimeoutException
|| rootCause instanceof InterruptedException
|| rootCause instanceof CancellationException
|| rootCause instanceof EthTaskException)) {
LOG.debug("GetNodeDataRequest failed", error);
}
storageUpdater.commit();
return ethTask;
});
return Collections.emptyMap();
})
.thenCompose(
data ->
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> storeData(requestTasks, blockHeader, ethTask, data)));
}
private CompletableFuture<AbstractPeerTask<List<BytesValue>>> storeData(
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader,
final AbstractPeerTask<List<BytesValue>> ethTask,
final Map<Hash, BytesValue> data) {
final Updater storageUpdater = worldStateStorage.updater();
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
}
task.markFailed();
} else {
completedRequestsCounter.inc();
// Persist request data
request.setData(matchingData);
if (isRootState(blockHeader, request)) {
rootNode = request.getData();
} else {
request.persist(storageUpdater);
}
queueChildRequests(request);
task.markCompleted();
}
}
storageUpdater.commit();
return CompletableFuture.completedFuture(ethTask);
}
private void updateHighestRetryCount(final int requestFailures) {
@ -351,7 +373,7 @@ public class WorldStateDownloader {
private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
// Map data by hash
final Map<Hash, BytesValue> dataByHash = new HashMap<>();
data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d));
data.forEach(d -> dataByHash.put(Hash.hash(d), d));
return dataByHash;
}
}

Loading…
Cancel
Save