diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 5c16ddc791..470e5f10c7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -103,6 +103,7 @@ class FastSynchronizer { stateQueue, syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateRequestParallelism(), + syncConfig.getWorldStateRequestMaxRetries(), ethTasksTimer, metricsSystem); final FastSyncDownloader fastSyncDownloader = diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 4d74ac2459..4abb3f3da3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -23,12 +23,13 @@ import com.google.common.collect.Range; public class SynchronizerConfiguration { // TODO: Determine reasonable defaults here - public static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 50; - public static final float DEFAULT_FULL_VALIDATION_RATE = .1f; - public static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5; + private static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 50; + private static final float DEFAULT_FULL_VALIDATION_RATE = .1f; + private static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5; private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5); private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384; private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10; + private static final int DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES = 25; // Fast sync config private final int fastSyncPivotDistance; @@ -37,6 +38,7 @@ public class SynchronizerConfiguration { private final Duration fastSyncMaximumPeerWaitTime; private final int worldStateHashCountPerRequest; private final int worldStateRequestParallelism; + private final int worldStateRequestMaxRetries; // Block propagation config private final Range blockPropagationRange; @@ -64,6 +66,7 @@ public class SynchronizerConfiguration { final Duration fastSyncMaximumPeerWaitTime, final int worldStateHashCountPerRequest, final int worldStateRequestParallelism, + final int worldStateRequestMaxRetries, final Range blockPropagationRange, final SyncMode syncMode, final long downloaderChangeTargetThresholdByHeight, @@ -83,6 +86,7 @@ public class SynchronizerConfiguration { this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime; this.worldStateHashCountPerRequest = worldStateHashCountPerRequest; this.worldStateRequestParallelism = worldStateRequestParallelism; + this.worldStateRequestMaxRetries = worldStateRequestMaxRetries; this.blockPropagationRange = blockPropagationRange; this.syncMode = syncMode; this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight; @@ -207,6 +211,10 @@ public class SynchronizerConfiguration { return worldStateRequestParallelism; } + public int getWorldStateRequestMaxRetries() { + return worldStateRequestMaxRetries; + } + public static class Builder { private SyncMode syncMode = SyncMode.FULL; private Range blockPropagationRange = Range.closed(-10L, 30L); @@ -224,6 +232,9 @@ public class SynchronizerConfiguration { private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD; private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE; private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS; + private int worldStateHashCountPerRequest = DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST; + private int worldStateRequestParallelism = DEFAULT_WORLD_STATE_REQUEST_PARALLELISM; + private int worldStateRequestMaxRetries = DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES; private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME; public Builder fastSyncPivotDistance(final int distance) { @@ -311,6 +322,21 @@ public class SynchronizerConfiguration { return this; } + public Builder worldStateHashCountPerRequest(final int worldStateHashCountPerRequest) { + this.worldStateHashCountPerRequest = worldStateHashCountPerRequest; + return this; + } + + public Builder worldStateRequestParallelism(final int worldStateRequestParallelism) { + this.worldStateRequestParallelism = worldStateRequestParallelism; + return this; + } + + public Builder worldStateRequestMaxRetries(final int worldStateRequestMaxRetries) { + this.worldStateRequestMaxRetries = worldStateRequestMaxRetries; + return this; + } + public Builder fastSyncMaximumPeerWaitTime(final Duration fastSyncMaximumPeerWaitTime) { this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime; return this; @@ -322,8 +348,9 @@ public class SynchronizerConfiguration { fastSyncFullValidationRate, fastSyncMinimumPeerCount, fastSyncMaximumPeerWaitTime, - DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST, - DEFAULT_WORLD_STATE_REQUEST_PARALLELISM, + worldStateHashCountPerRequest, + worldStateRequestParallelism, + worldStateRequestMaxRetries, blockPropagationRange, syncMode, downloaderChangeTargetThresholdByHeight, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java index e2d45cc90f..a23dd06fc7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java @@ -15,8 +15,8 @@ package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally; import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose; +import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader; -import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateUnavailableException; import tech.pegasys.pantheon.util.ExceptionUtils; import java.util.concurrent.CompletableFuture; @@ -51,7 +51,7 @@ public class FastSyncDownloader { } private CompletableFuture handleWorldStateUnavailable(final Throwable error) { - if (ExceptionUtils.rootCause(error) instanceof WorldStateUnavailableException) { + if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) { LOG.warn( "Fast sync was unable to download the world state. Retrying with a new pivot block."); return start(FastSyncState.EMPTY_SYNC_STATE); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java index 29f4b68d57..776665bc2b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java @@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; public abstract class NodeDataRequest { @@ -27,6 +28,7 @@ public abstract class NodeDataRequest { private final RequestType requestType; private final Hash hash; private BytesValue data; + private final AtomicInteger failedRequestCount = new AtomicInteger(0); protected NodeDataRequest(final RequestType requestType, final Hash hash) { this.requestType = requestType; @@ -54,26 +56,35 @@ public abstract class NodeDataRequest { in.enterList(); RequestType requestType = RequestType.fromValue(in.readByte()); Hash hash = Hash.wrap(in.readBytes32()); + int failureCount = in.readIntScalar(); in.leaveList(); + NodeDataRequest deserialized; switch (requestType) { case ACCOUNT_TRIE_NODE: - return createAccountDataRequest(hash); + deserialized = createAccountDataRequest(hash); + break; case STORAGE_TRIE_NODE: - return createStorageDataRequest(hash); + deserialized = createStorageDataRequest(hash); + break; case CODE: - return createCodeRequest(hash); + deserialized = createCodeRequest(hash); + break; default: throw new IllegalArgumentException( "Unable to deserialize provided data into a valid " + NodeDataRequest.class.getSimpleName()); } + + deserialized.setFailureCount(failureCount); + return deserialized; } private void writeTo(final RLPOutput out) { out.startList(); out.writeByte(requestType.getValue()); out.writeBytesValue(hash); + out.writeIntScalar(failedRequestCount.get()); out.endList(); } @@ -94,6 +105,14 @@ public abstract class NodeDataRequest { return this; } + public int trackFailure() { + return failedRequestCount.incrementAndGet(); + } + + private void setFailureCount(final int failures) { + failedRequestCount.set(failures); + } + public abstract void persist(final WorldStateStorage.Updater updater); public abstract Stream getChildRequests(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StalledDownloadException.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StalledDownloadException.java new file mode 100644 index 0000000000..20a15564cc --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StalledDownloadException.java @@ -0,0 +1,20 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +public class StalledDownloadException extends WorldStateDownloaderException { + + public StalledDownloadException(final String message) { + super(message); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index f55442e130..308ad1daaa 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -64,6 +64,7 @@ public class WorldStateDownloader { private final TaskQueue pendingRequests; private final int hashCountPerRequest; private final int maxOutstandingRequests; + private final int maxNodeRequestRetries; private final Set> outstandingRequests = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final LabelledMetric ethTasksTimer; @@ -79,6 +80,7 @@ public class WorldStateDownloader { final TaskQueue pendingRequests, final int hashCountPerRequest, final int maxOutstandingRequests, + final int maxNodeRequestRetries, final LabelledMetric ethTasksTimer, final MetricsSystem metricsSystem) { this.ethContext = ethContext; @@ -86,6 +88,7 @@ public class WorldStateDownloader { this.pendingRequests = pendingRequests; this.hashCountPerRequest = hashCountPerRequest; this.maxOutstandingRequests = maxOutstandingRequests; + this.maxNodeRequestRetries = maxNodeRequestRetries; this.ethTasksTimer = ethTasksTimer; metricsSystem.createGauge( MetricCategory.SYNCHRONIZER, @@ -236,6 +239,10 @@ public class WorldStateDownloader { BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); if (matchingData == null) { retriedRequestsTotal.inc(); + int requestFailures = request.trackFailure(); + if (requestFailures > maxNodeRequestRetries) { + handleStalledDownload(); + } task.markFailed(); } else { completedRequestsCounter.inc(); @@ -275,14 +282,26 @@ public class WorldStateDownloader { (res, err) -> { // Handle cancellations if (future.isCancelled()) { - handleCancellation(); + LOG.info("World state download cancelled"); + doCancelDownload(); + } else if (err != null) { + LOG.info("World state download failed. ", err); + doCancelDownload(); } }); return future; } - private synchronized void handleCancellation() { - LOG.info("World state download cancelled"); + private synchronized void handleStalledDownload() { + final String message = + "Download stalled due to too many failures to retrieve node data (>" + + maxNodeRequestRetries + + " failures)"; + WorldStateDownloaderException e = new StalledDownloadException(message); + future.completeExceptionally(e); + } + + private synchronized void doCancelDownload() { status = Status.CANCELLED; pendingRequests.clear(); for (EthTask outstandingRequest : outstandingRequests) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateUnavailableException.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderException.java similarity index 80% rename from ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateUnavailableException.java rename to ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderException.java index b98d94685c..97fa35ae10 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateUnavailableException.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderException.java @@ -12,4 +12,9 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; -public class WorldStateUnavailableException extends RuntimeException {} +public class WorldStateDownloaderException extends RuntimeException { + + public WorldStateDownloaderException(final String message) { + super(message); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java index b674e5093f..2bf0a31850 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java @@ -26,8 +26,8 @@ import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState.EMP import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader; -import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateUnavailableException; import java.util.concurrent.CompletableFuture; @@ -292,7 +292,7 @@ public class FastSyncDownloaderTest { assertThat(result).isNotDone(); - firstWorldStateFuture.completeExceptionally(new WorldStateUnavailableException()); + firstWorldStateFuture.completeExceptionally(new StalledDownloadException("test")); assertThat(result).isNotDone(); assertThat(chainFuture).isCancelled(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 9a3a4be6da..102f955f56 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -30,12 +31,14 @@ import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.core.MutableWorldState; import tech.pegasys.pantheon.ethereum.core.WorldState; import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage; +import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.rlp.RLP; @@ -127,14 +130,7 @@ public class WorldStateDownloaderTest { WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - localStorage, - queue, - 10, - 10, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(ethProtocolManager.ethContext(), localStorage, queue); CompletableFuture future = downloader.run(header); assertThat(future).isDone(); @@ -172,14 +168,7 @@ public class WorldStateDownloaderTest { TaskQueue queue = new InMemoryTaskQueue<>(); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - storage, - queue, - 10, - 10, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(ethProtocolManager.ethContext(), storage, queue); CompletableFuture future = downloader.run(header); assertThat(future).isDone(); @@ -220,14 +209,7 @@ public class WorldStateDownloaderTest { WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - localStorage, - queue, - 10, - 10, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(ethProtocolManager.ethContext(), localStorage, queue); CompletableFuture result = downloader.run(header); @@ -289,14 +271,7 @@ public class WorldStateDownloaderTest { localStorageUpdater.commit(); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - localStorage, - queue, - 10, - 10, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(ethProtocolManager.ethContext(), localStorage, queue); CompletableFuture result = downloader.run(header); @@ -369,14 +344,7 @@ public class WorldStateDownloaderTest { new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - localStorage, - queue, - 10, - 10, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(ethProtocolManager.ethContext(), localStorage, queue); CompletableFuture result = downloader.run(header); @@ -464,14 +432,7 @@ public class WorldStateDownloaderTest { localStorageUpdater.commit(); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - localStorage, - queue, - 10, - 10, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(ethProtocolManager.ethContext(), localStorage, queue); CompletableFuture result = downloader.run(header); @@ -570,14 +531,7 @@ public class WorldStateDownloaderTest { localStorageUpdater.commit(); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - localStorage, - queue, - 10, - 10, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(ethProtocolManager.ethContext(), localStorage, queue); CompletableFuture result = downloader.run(header); @@ -616,6 +570,70 @@ public class WorldStateDownloaderTest { assertAccountsMatch(localWorldState, accounts); } + @Test + public void stalledDownloader() { + simulateStalledDownload(10); + } + + @Test + public void stalledDownloaderWithOneRetry() { + simulateStalledDownload(1); + } + + @Test + public void stalledDownloaderWithNoRetries() { + simulateStalledDownload(0); + } + + private void simulateStalledDownload(final int maxRetries) { + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + BlockDataGenerator dataGen = new BlockDataGenerator(1); + + // Setup "remote" state + final WorldStateStorage remoteStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage); + final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable(); + + // Generate accounts and save corresponding state root + dataGen.createRandomAccounts(remoteWorldState, 10); + final Hash stateRoot = remoteWorldState.rootHash(); + assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check + final BlockHeader header = + dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); + + TaskQueue queue = new InMemoryTaskQueue<>(); + WorldStateStorage localStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder().worldStateRequestMaxRetries(maxRetries).build(); + WorldStateDownloader downloader = + createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); + + // Create a peer that can respond + RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()); + + // Start downloader + CompletableFuture result = downloader.run(header); + // A second run should return an error without impacting the first result + CompletableFuture secondResult = downloader.run(header); + assertThat(secondResult).isCompletedExceptionally(); + assertThat(result).isNotCompletedExceptionally(); + + Responder emptyResponder = RespondingEthPeer.emptyResponder(); + for (int i = 0; i < maxRetries; i++) { + peer.respond(emptyResponder); + } + // Downloader should not be done yet + assertThat(result).isNotDone(); + + // One more empty response should trigger a failure + peer.respond(emptyResponder); + assertThat(result).isCompletedExceptionally(); + assertThatThrownBy(result::get).hasCauseInstanceOf(StalledDownloadException.class); + } + /** * Walks through trie represented by the given rootHash and returns hash-node pairs that would * need to be requested from the network in order to reconstruct this trie. @@ -700,15 +718,13 @@ public class WorldStateDownloaderTest { WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder() + .worldStateHashCountPerRequest(hashesPerRequest) + .worldStateRequestParallelism(maxOutstandingRequests) + .build(); WorldStateDownloader downloader = - new WorldStateDownloader( - ethProtocolManager.ethContext(), - localStorage, - queue, - hashesPerRequest, - maxOutstandingRequests, - NoOpMetricsSystem.NO_OP_LABELLED_TIMER, - new NoOpMetricsSystem()); + createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); // Create some peers that can respond List usefulPeers = @@ -828,6 +844,29 @@ public class WorldStateDownloaderTest { } } + private WorldStateDownloader createDownloader( + final EthContext context, + final WorldStateStorage storage, + final TaskQueue queue) { + return createDownloader(SynchronizerConfiguration.builder().build(), context, storage, queue); + } + + private WorldStateDownloader createDownloader( + final SynchronizerConfiguration config, + final EthContext context, + final WorldStateStorage storage, + final TaskQueue queue) { + return new WorldStateDownloader( + context, + storage, + queue, + config.getWorldStateHashCountPerRequest(), + config.getWorldStateRequestParallelism(), + config.getWorldStateRequestMaxRetries(), + NoOpMetricsSystem.NO_OP_LABELLED_TIMER, + new NoOpMetricsSystem()); + } + @FunctionalInterface private interface NetworkResponder { void respond(