[PAN-2305] Detect stalled world state downloads (#875)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent 312f285a15
commit 1da0008666
  1. 1
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java
  2. 37
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
  3. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java
  4. 25
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java
  5. 20
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StalledDownloadException.java
  6. 25
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  7. 7
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderException.java
  8. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java
  9. 167
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java

@ -103,6 +103,7 @@ class FastSynchronizer<C> {
stateQueue, stateQueue,
syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(), syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateRequestMaxRetries(),
ethTasksTimer, ethTasksTimer,
metricsSystem); metricsSystem);
final FastSyncDownloader<C> fastSyncDownloader = final FastSyncDownloader<C> fastSyncDownloader =

@ -23,12 +23,13 @@ import com.google.common.collect.Range;
public class SynchronizerConfiguration { public class SynchronizerConfiguration {
// TODO: Determine reasonable defaults here // TODO: Determine reasonable defaults here
public static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 50; private static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 50;
public static final float DEFAULT_FULL_VALIDATION_RATE = .1f; private static final float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5; private static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5); private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384; private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10; private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;
private static final int DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES = 25;
// Fast sync config // Fast sync config
private final int fastSyncPivotDistance; private final int fastSyncPivotDistance;
@ -37,6 +38,7 @@ public class SynchronizerConfiguration {
private final Duration fastSyncMaximumPeerWaitTime; private final Duration fastSyncMaximumPeerWaitTime;
private final int worldStateHashCountPerRequest; private final int worldStateHashCountPerRequest;
private final int worldStateRequestParallelism; private final int worldStateRequestParallelism;
private final int worldStateRequestMaxRetries;
// Block propagation config // Block propagation config
private final Range<Long> blockPropagationRange; private final Range<Long> blockPropagationRange;
@ -64,6 +66,7 @@ public class SynchronizerConfiguration {
final Duration fastSyncMaximumPeerWaitTime, final Duration fastSyncMaximumPeerWaitTime,
final int worldStateHashCountPerRequest, final int worldStateHashCountPerRequest,
final int worldStateRequestParallelism, final int worldStateRequestParallelism,
final int worldStateRequestMaxRetries,
final Range<Long> blockPropagationRange, final Range<Long> blockPropagationRange,
final SyncMode syncMode, final SyncMode syncMode,
final long downloaderChangeTargetThresholdByHeight, final long downloaderChangeTargetThresholdByHeight,
@ -83,6 +86,7 @@ public class SynchronizerConfiguration {
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime; this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
this.worldStateHashCountPerRequest = worldStateHashCountPerRequest; this.worldStateHashCountPerRequest = worldStateHashCountPerRequest;
this.worldStateRequestParallelism = worldStateRequestParallelism; this.worldStateRequestParallelism = worldStateRequestParallelism;
this.worldStateRequestMaxRetries = worldStateRequestMaxRetries;
this.blockPropagationRange = blockPropagationRange; this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode; this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight; this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
@ -207,6 +211,10 @@ public class SynchronizerConfiguration {
return worldStateRequestParallelism; return worldStateRequestParallelism;
} }
public int getWorldStateRequestMaxRetries() {
return worldStateRequestMaxRetries;
}
public static class Builder { public static class Builder {
private SyncMode syncMode = SyncMode.FULL; private SyncMode syncMode = SyncMode.FULL;
private Range<Long> blockPropagationRange = Range.closed(-10L, 30L); private Range<Long> blockPropagationRange = Range.closed(-10L, 30L);
@ -224,6 +232,9 @@ public class SynchronizerConfiguration {
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD; private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE; private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE;
private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS; private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS;
private int worldStateHashCountPerRequest = DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST;
private int worldStateRequestParallelism = DEFAULT_WORLD_STATE_REQUEST_PARALLELISM;
private int worldStateRequestMaxRetries = DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES;
private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME; private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME;
public Builder fastSyncPivotDistance(final int distance) { public Builder fastSyncPivotDistance(final int distance) {
@ -311,6 +322,21 @@ public class SynchronizerConfiguration {
return this; return this;
} }
public Builder worldStateHashCountPerRequest(final int worldStateHashCountPerRequest) {
this.worldStateHashCountPerRequest = worldStateHashCountPerRequest;
return this;
}
public Builder worldStateRequestParallelism(final int worldStateRequestParallelism) {
this.worldStateRequestParallelism = worldStateRequestParallelism;
return this;
}
public Builder worldStateRequestMaxRetries(final int worldStateRequestMaxRetries) {
this.worldStateRequestMaxRetries = worldStateRequestMaxRetries;
return this;
}
public Builder fastSyncMaximumPeerWaitTime(final Duration fastSyncMaximumPeerWaitTime) { public Builder fastSyncMaximumPeerWaitTime(final Duration fastSyncMaximumPeerWaitTime) {
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime; this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
return this; return this;
@ -322,8 +348,9 @@ public class SynchronizerConfiguration {
fastSyncFullValidationRate, fastSyncFullValidationRate,
fastSyncMinimumPeerCount, fastSyncMinimumPeerCount,
fastSyncMaximumPeerWaitTime, fastSyncMaximumPeerWaitTime,
DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST, worldStateHashCountPerRequest,
DEFAULT_WORLD_STATE_REQUEST_PARALLELISM, worldStateRequestParallelism,
worldStateRequestMaxRetries,
blockPropagationRange, blockPropagationRange,
syncMode, syncMode,
downloaderChangeTargetThresholdByHeight, downloaderChangeTargetThresholdByHeight,

@ -15,8 +15,8 @@ package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally; import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;
import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose; import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateUnavailableException;
import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -51,7 +51,7 @@ public class FastSyncDownloader<C> {
} }
private CompletableFuture<FastSyncState> handleWorldStateUnavailable(final Throwable error) { private CompletableFuture<FastSyncState> handleWorldStateUnavailable(final Throwable error) {
if (ExceptionUtils.rootCause(error) instanceof WorldStateUnavailableException) { if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
LOG.warn( LOG.warn(
"Fast sync was unable to download the world state. Retrying with a new pivot block."); "Fast sync was unable to download the world state. Retrying with a new pivot block.");
return start(FastSyncState.EMPTY_SYNC_STATE); return start(FastSyncState.EMPTY_SYNC_STATE);

@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream; import java.util.stream.Stream;
public abstract class NodeDataRequest { public abstract class NodeDataRequest {
@ -27,6 +28,7 @@ public abstract class NodeDataRequest {
private final RequestType requestType; private final RequestType requestType;
private final Hash hash; private final Hash hash;
private BytesValue data; private BytesValue data;
private final AtomicInteger failedRequestCount = new AtomicInteger(0);
protected NodeDataRequest(final RequestType requestType, final Hash hash) { protected NodeDataRequest(final RequestType requestType, final Hash hash) {
this.requestType = requestType; this.requestType = requestType;
@ -54,26 +56,35 @@ public abstract class NodeDataRequest {
in.enterList(); in.enterList();
RequestType requestType = RequestType.fromValue(in.readByte()); RequestType requestType = RequestType.fromValue(in.readByte());
Hash hash = Hash.wrap(in.readBytes32()); Hash hash = Hash.wrap(in.readBytes32());
int failureCount = in.readIntScalar();
in.leaveList(); in.leaveList();
NodeDataRequest deserialized;
switch (requestType) { switch (requestType) {
case ACCOUNT_TRIE_NODE: case ACCOUNT_TRIE_NODE:
return createAccountDataRequest(hash); deserialized = createAccountDataRequest(hash);
break;
case STORAGE_TRIE_NODE: case STORAGE_TRIE_NODE:
return createStorageDataRequest(hash); deserialized = createStorageDataRequest(hash);
break;
case CODE: case CODE:
return createCodeRequest(hash); deserialized = createCodeRequest(hash);
break;
default: default:
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Unable to deserialize provided data into a valid " "Unable to deserialize provided data into a valid "
+ NodeDataRequest.class.getSimpleName()); + NodeDataRequest.class.getSimpleName());
} }
deserialized.setFailureCount(failureCount);
return deserialized;
} }
private void writeTo(final RLPOutput out) { private void writeTo(final RLPOutput out) {
out.startList(); out.startList();
out.writeByte(requestType.getValue()); out.writeByte(requestType.getValue());
out.writeBytesValue(hash); out.writeBytesValue(hash);
out.writeIntScalar(failedRequestCount.get());
out.endList(); out.endList();
} }
@ -94,6 +105,14 @@ public abstract class NodeDataRequest {
return this; return this;
} }
public int trackFailure() {
return failedRequestCount.incrementAndGet();
}
private void setFailureCount(final int failures) {
failedRequestCount.set(failures);
}
public abstract void persist(final WorldStateStorage.Updater updater); public abstract void persist(final WorldStateStorage.Updater updater);
public abstract Stream<NodeDataRequest> getChildRequests(); public abstract Stream<NodeDataRequest> getChildRequests();

@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
public class StalledDownloadException extends WorldStateDownloaderException {
public StalledDownloadException(final String message) {
super(message);
}
}

@ -64,6 +64,7 @@ public class WorldStateDownloader {
private final TaskQueue<NodeDataRequest> pendingRequests; private final TaskQueue<NodeDataRequest> pendingRequests;
private final int hashCountPerRequest; private final int hashCountPerRequest;
private final int maxOutstandingRequests; private final int maxOutstandingRequests;
private final int maxNodeRequestRetries;
private final Set<EthTask<?>> outstandingRequests = private final Set<EthTask<?>> outstandingRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>()); Collections.newSetFromMap(new ConcurrentHashMap<>());
private final LabelledMetric<OperationTimer> ethTasksTimer; private final LabelledMetric<OperationTimer> ethTasksTimer;
@ -79,6 +80,7 @@ public class WorldStateDownloader {
final TaskQueue<NodeDataRequest> pendingRequests, final TaskQueue<NodeDataRequest> pendingRequests,
final int hashCountPerRequest, final int hashCountPerRequest,
final int maxOutstandingRequests, final int maxOutstandingRequests,
final int maxNodeRequestRetries,
final LabelledMetric<OperationTimer> ethTasksTimer, final LabelledMetric<OperationTimer> ethTasksTimer,
final MetricsSystem metricsSystem) { final MetricsSystem metricsSystem) {
this.ethContext = ethContext; this.ethContext = ethContext;
@ -86,6 +88,7 @@ public class WorldStateDownloader {
this.pendingRequests = pendingRequests; this.pendingRequests = pendingRequests;
this.hashCountPerRequest = hashCountPerRequest; this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests; this.maxOutstandingRequests = maxOutstandingRequests;
this.maxNodeRequestRetries = maxNodeRequestRetries;
this.ethTasksTimer = ethTasksTimer; this.ethTasksTimer = ethTasksTimer;
metricsSystem.createGauge( metricsSystem.createGauge(
MetricCategory.SYNCHRONIZER, MetricCategory.SYNCHRONIZER,
@ -236,6 +239,10 @@ public class WorldStateDownloader {
BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) { if (matchingData == null) {
retriedRequestsTotal.inc(); retriedRequestsTotal.inc();
int requestFailures = request.trackFailure();
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
}
task.markFailed(); task.markFailed();
} else { } else {
completedRequestsCounter.inc(); completedRequestsCounter.inc();
@ -275,14 +282,26 @@ public class WorldStateDownloader {
(res, err) -> { (res, err) -> {
// Handle cancellations // Handle cancellations
if (future.isCancelled()) { if (future.isCancelled()) {
handleCancellation(); LOG.info("World state download cancelled");
doCancelDownload();
} else if (err != null) {
LOG.info("World state download failed. ", err);
doCancelDownload();
} }
}); });
return future; return future;
} }
private synchronized void handleCancellation() { private synchronized void handleStalledDownload() {
LOG.info("World state download cancelled"); final String message =
"Download stalled due to too many failures to retrieve node data (>"
+ maxNodeRequestRetries
+ " failures)";
WorldStateDownloaderException e = new StalledDownloadException(message);
future.completeExceptionally(e);
}
private synchronized void doCancelDownload() {
status = Status.CANCELLED; status = Status.CANCELLED;
pendingRequests.clear(); pendingRequests.clear();
for (EthTask<?> outstandingRequest : outstandingRequests) { for (EthTask<?> outstandingRequest : outstandingRequests) {

@ -12,4 +12,9 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
public class WorldStateUnavailableException extends RuntimeException {} public class WorldStateDownloaderException extends RuntimeException {
public WorldStateDownloaderException(final String message) {
super(message);
}
}

@ -26,8 +26,8 @@ import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState.EMP
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateUnavailableException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -292,7 +292,7 @@ public class FastSyncDownloaderTest {
assertThat(result).isNotDone(); assertThat(result).isNotDone();
firstWorldStateFuture.completeExceptionally(new WorldStateUnavailableException()); firstWorldStateFuture.completeExceptionally(new StalledDownloadException("test"));
assertThat(result).isNotDone(); assertThat(result).isNotDone();
assertThat(chainFuture).isCancelled(); assertThat(chainFuture).isCancelled();

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -30,12 +31,14 @@ import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.MutableWorldState; import tech.pegasys.pantheon.ethereum.core.MutableWorldState;
import tech.pegasys.pantheon.ethereum.core.WorldState; import tech.pegasys.pantheon.ethereum.core.WorldState;
import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage; import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.rlp.RLP;
@ -127,14 +130,7 @@ public class WorldStateDownloaderTest {
WorldStateStorage localStorage = WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
CompletableFuture<Void> future = downloader.run(header); CompletableFuture<Void> future = downloader.run(header);
assertThat(future).isDone(); assertThat(future).isDone();
@ -172,14 +168,7 @@ public class WorldStateDownloaderTest {
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>(); TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(ethProtocolManager.ethContext(), storage, queue);
ethProtocolManager.ethContext(),
storage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
CompletableFuture<Void> future = downloader.run(header); CompletableFuture<Void> future = downloader.run(header);
assertThat(future).isDone(); assertThat(future).isDone();
@ -220,14 +209,7 @@ public class WorldStateDownloaderTest {
WorldStateStorage localStorage = WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
CompletableFuture<Void> result = downloader.run(header); CompletableFuture<Void> result = downloader.run(header);
@ -289,14 +271,7 @@ public class WorldStateDownloaderTest {
localStorageUpdater.commit(); localStorageUpdater.commit();
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
CompletableFuture<Void> result = downloader.run(header); CompletableFuture<Void> result = downloader.run(header);
@ -369,14 +344,7 @@ public class WorldStateDownloaderTest {
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
CompletableFuture<Void> result = downloader.run(header); CompletableFuture<Void> result = downloader.run(header);
@ -464,14 +432,7 @@ public class WorldStateDownloaderTest {
localStorageUpdater.commit(); localStorageUpdater.commit();
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
CompletableFuture<Void> result = downloader.run(header); CompletableFuture<Void> result = downloader.run(header);
@ -570,14 +531,7 @@ public class WorldStateDownloaderTest {
localStorageUpdater.commit(); localStorageUpdater.commit();
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(ethProtocolManager.ethContext(), localStorage, queue);
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
CompletableFuture<Void> result = downloader.run(header); CompletableFuture<Void> result = downloader.run(header);
@ -616,6 +570,70 @@ public class WorldStateDownloaderTest {
assertAccountsMatch(localWorldState, accounts); assertAccountsMatch(localWorldState, accounts);
} }
@Test
public void stalledDownloader() {
simulateStalledDownload(10);
}
@Test
public void stalledDownloaderWithOneRetry() {
simulateStalledDownload(1);
}
@Test
public void stalledDownloaderWithNoRetries() {
simulateStalledDownload(0);
}
private void simulateStalledDownload(final int maxRetries) {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
BlockDataGenerator dataGen = new BlockDataGenerator(1);
// Setup "remote" state
final WorldStateStorage remoteStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage);
final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable();
// Generate accounts and save corresponding state root
dataGen.createRandomAccounts(remoteWorldState, 10);
final Hash stateRoot = remoteWorldState.rootHash();
assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check
final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().worldStateRequestMaxRetries(maxRetries).build();
WorldStateDownloader downloader =
createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue);
// Create a peer that can respond
RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber());
// Start downloader
CompletableFuture<?> result = downloader.run(header);
// A second run should return an error without impacting the first result
CompletableFuture<?> secondResult = downloader.run(header);
assertThat(secondResult).isCompletedExceptionally();
assertThat(result).isNotCompletedExceptionally();
Responder emptyResponder = RespondingEthPeer.emptyResponder();
for (int i = 0; i < maxRetries; i++) {
peer.respond(emptyResponder);
}
// Downloader should not be done yet
assertThat(result).isNotDone();
// One more empty response should trigger a failure
peer.respond(emptyResponder);
assertThat(result).isCompletedExceptionally();
assertThatThrownBy(result::get).hasCauseInstanceOf(StalledDownloadException.class);
}
/** /**
* Walks through trie represented by the given rootHash and returns hash-node pairs that would * Walks through trie represented by the given rootHash and returns hash-node pairs that would
* need to be requested from the network in order to reconstruct this trie. * need to be requested from the network in order to reconstruct this trie.
@ -700,15 +718,13 @@ public class WorldStateDownloaderTest {
WorldStateStorage localStorage = WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
.worldStateHashCountPerRequest(hashesPerRequest)
.worldStateRequestParallelism(maxOutstandingRequests)
.build();
WorldStateDownloader downloader = WorldStateDownloader downloader =
new WorldStateDownloader( createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue);
ethProtocolManager.ethContext(),
localStorage,
queue,
hashesPerRequest,
maxOutstandingRequests,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
// Create some peers that can respond // Create some peers that can respond
List<RespondingEthPeer> usefulPeers = List<RespondingEthPeer> usefulPeers =
@ -828,6 +844,29 @@ public class WorldStateDownloaderTest {
} }
} }
private WorldStateDownloader createDownloader(
final EthContext context,
final WorldStateStorage storage,
final TaskQueue<NodeDataRequest> queue) {
return createDownloader(SynchronizerConfiguration.builder().build(), context, storage, queue);
}
private WorldStateDownloader createDownloader(
final SynchronizerConfiguration config,
final EthContext context,
final WorldStateStorage storage,
final TaskQueue<NodeDataRequest> queue) {
return new WorldStateDownloader(
context,
storage,
queue,
config.getWorldStateHashCountPerRequest(),
config.getWorldStateRequestParallelism(),
config.getWorldStateRequestMaxRetries(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER,
new NoOpMetricsSystem());
}
@FunctionalInterface @FunctionalInterface
private interface NetworkResponder { private interface NetworkResponder {
void respond( void respond(

Loading…
Cancel
Save