Don't mark world state as stalled until a minimum time without progress is reached (#1179)

Also increases the number of requests without progress before considering the download stalled.

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent f897761951
commit 9a5d8d6fa2
  1. 5
      ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java
  2. 5
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java
  3. 6
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java
  4. 14
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
  5. 15
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java
  6. 10
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java
  7. 60
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java
  8. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java
  9. 1
      pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
  10. 1
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java
  11. 1
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java
  12. 1
      pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java
  13. 24
      testutil/src/main/java/tech/pegasys/pantheon/testutil/TestClock.java

@ -37,6 +37,9 @@ import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -104,6 +107,8 @@ public class WorldStateDownloaderBenchmark {
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
syncConfig.getWorldStateMinMillisBeforeStalling(),
Clock.fixed(Instant.ofEpochSecond(1000), ZoneOffset.UTC),
metricsSystem);
}

@ -30,6 +30,7 @@ import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.Subscribers;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@ -55,6 +56,7 @@ public class DefaultSynchronizer<C> implements Synchronizer {
final EthContext ethContext,
final SyncState syncState,
final Path dataDirectory,
final Clock clock,
final MetricsSystem metricsSystem) {
this.syncState = syncState;
@ -89,7 +91,8 @@ public class DefaultSynchronizer<C> implements Synchronizer {
metricsSystem,
ethContext,
worldStateStorage,
syncState);
syncState,
clock);
}
private TrailingPeerRequirements calculateTrailingPeerRequirements() {

@ -33,6 +33,7 @@ import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -71,7 +72,8 @@ class FastSynchronizer<C> {
final MetricsSystem metricsSystem,
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final SyncState syncState) {
final SyncState syncState,
final Clock clock) {
if (syncConfig.syncMode() != SyncMode.FAST) {
return Optional.empty();
}
@ -100,6 +102,8 @@ class FastSynchronizer<C> {
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
syncConfig.getWorldStateMinMillisBeforeStalling(),
clock,
metricsSystem);
final FastSyncDownloader<C> fastSyncDownloader =
new FastSyncDownloader<>(

@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Range;
@ -29,7 +30,9 @@ public class SynchronizerConfiguration {
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;
private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 100;
private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 1000;
private static final long DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING =
TimeUnit.MINUTES.toMillis(5);
// Fast sync config
private final int fastSyncPivotDistance;
@ -57,6 +60,7 @@ public class SynchronizerConfiguration {
private final int transactionsParallelism;
private final int computationParallelism;
private final int maxTrailingPeers;
private final long worldStateMinMillisBeforeStalling;
private SynchronizerConfiguration(
final int fastSyncPivotDistance,
@ -66,6 +70,7 @@ public class SynchronizerConfiguration {
final int worldStateHashCountPerRequest,
final int worldStateRequestParallelism,
final int worldStateMaxRequestsWithoutProgress,
final long worldStateMinMillisBeforeStalling,
final Range<Long> blockPropagationRange,
final SyncMode syncMode,
final long downloaderChangeTargetThresholdByHeight,
@ -85,6 +90,7 @@ public class SynchronizerConfiguration {
this.worldStateHashCountPerRequest = worldStateHashCountPerRequest;
this.worldStateRequestParallelism = worldStateRequestParallelism;
this.worldStateMaxRequestsWithoutProgress = worldStateMaxRequestsWithoutProgress;
this.worldStateMinMillisBeforeStalling = worldStateMinMillisBeforeStalling;
this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
@ -199,6 +205,10 @@ public class SynchronizerConfiguration {
return worldStateMaxRequestsWithoutProgress;
}
public long getWorldStateMinMillisBeforeStalling() {
return worldStateMinMillisBeforeStalling;
}
public int getMaxTrailingPeers() {
return maxTrailingPeers;
}
@ -222,6 +232,7 @@ public class SynchronizerConfiguration {
private int worldStateRequestParallelism = DEFAULT_WORLD_STATE_REQUEST_PARALLELISM;
private int worldStateMaxRequestsWithoutProgress =
DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS;
private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING;
private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME;
private int maxTrailingPeers = Integer.MAX_VALUE;
@ -335,6 +346,7 @@ public class SynchronizerConfiguration {
worldStateHashCountPerRequest,
worldStateRequestParallelism,
worldStateMaxRequestsWithoutProgress,
worldStateMinMillisBeforeStalling,
blockPropagationRange,
syncMode,
downloaderChangeTargetThresholdByHeight,

@ -22,6 +22,7 @@ import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
@ -38,21 +39,29 @@ class WorldDownloadState {
private final boolean downloadWasResumed;
private final CachingTaskCollection<NodeDataRequest> pendingRequests;
private final int maxRequestsWithoutProgress;
private final Clock clock;
private final Set<EthTask<?>> outstandingRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final CompletableFuture<Void> internalFuture;
private final CompletableFuture<Void> downloadFuture;
// Volatile so monitoring can access it without having to synchronize.
private volatile int requestsSinceLastProgress = 0;
private final long minMillisBeforeStalling;
private volatile long timestampOfLastProgress;
private BytesValue rootNodeData;
private WorldStateDownloadProcess worldStateDownloadProcess;
public WorldDownloadState(
final CachingTaskCollection<NodeDataRequest> pendingRequests,
final int maxRequestsWithoutProgress) {
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock) {
this.minMillisBeforeStalling = minMillisBeforeStalling;
this.timestampOfLastProgress = clock.millis();
this.downloadWasResumed = !pendingRequests.isEmpty();
this.pendingRequests = pendingRequests;
this.maxRequestsWithoutProgress = maxRequestsWithoutProgress;
this.clock = clock;
this.internalFuture = new CompletableFuture<>();
this.downloadFuture = new CompletableFuture<>();
this.internalFuture.whenComplete(this::cleanup);
@ -147,9 +156,11 @@ class WorldDownloadState {
public synchronized void requestComplete(final boolean madeProgress) {
if (madeProgress) {
requestsSinceLastProgress = 0;
timestampOfLastProgress = clock.millis();
} else {
requestsSinceLastProgress++;
if (requestsSinceLastProgress >= maxRequestsWithoutProgress) {
if (requestsSinceLastProgress >= maxRequestsWithoutProgress
&& timestampOfLastProgress + minMillisBeforeStalling < clock.millis()) {
markAsStalled(maxRequestsWithoutProgress);
}
}

@ -20,6 +20,7 @@ import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import java.time.Clock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@ -31,6 +32,8 @@ import org.apache.logging.log4j.Logger;
public class WorldStateDownloader {
private static final Logger LOG = LogManager.getLogger();
private final long minMillisBeforeStalling;
private final Clock clock;
private final MetricsSystem metricsSystem;
private final EthContext ethContext;
@ -49,6 +52,8 @@ public class WorldStateDownloader {
final int hashCountPerRequest,
final int maxOutstandingRequests,
final int maxNodeRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.worldStateStorage = worldStateStorage;
@ -56,6 +61,8 @@ public class WorldStateDownloader {
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress;
this.minMillisBeforeStalling = minMillisBeforeStalling;
this.clock = clock;
this.metricsSystem = metricsSystem;
metricsSystem.createIntegerGauge(
@ -105,7 +112,8 @@ public class WorldStateDownloader {
stateRoot);
final WorldDownloadState newDownloadState =
new WorldDownloadState(taskCollection, maxNodeRequestsWithoutProgress);
new WorldDownloadState(
taskCollection, maxNodeRequestsWithoutProgress, minMillisBeforeStalling, clock);
this.downloadState.set(newDownloadState);
if (!newDownloadState.downloadWasResumed()) {

@ -14,6 +14,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static tech.pegasys.pantheon.ethereum.eth.sync.worldstate.NodeDataRequest.createAccountDataRequest;
@ -27,9 +28,11 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;
@ -39,6 +42,7 @@ public class WorldDownloadStateTest {
private static final BytesValue ROOT_NODE_DATA = BytesValue.of(1, 2, 3, 4);
private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA);
private static final int MAX_REQUESTS_WITHOUT_PROGRESS = 10;
private static final long MIN_MILLIS_BEFORE_STALLING = 50_000;
private final WorldStateStorage worldStateStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
@ -50,8 +54,10 @@ public class WorldDownloadStateTest {
private final WorldStateDownloadProcess worldStateDownloadProcess =
mock(WorldStateDownloadProcess.class);
private final TestClock clock = new TestClock();
private final WorldDownloadState downloadState =
new WorldDownloadState(pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS);
new WorldDownloadState(
pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS, MIN_MILLIS_BEFORE_STALLING, clock);
private final CompletableFuture<Void> future = downloadState.getDownloadFuture();
@ -121,6 +127,7 @@ public class WorldDownloadStateTest {
downloadState.requestComplete(false);
downloadState.requestComplete(true);
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);
for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS - 1; i++) {
downloadState.requestComplete(false);
@ -128,11 +135,50 @@ public class WorldDownloadStateTest {
}
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isCompletedExceptionally();
assertWorldStateStalled(downloadState);
}
@Test
public void shouldNotAddRequestsAfterDownloadIsStalled() {
public void shouldNotBeStalledWhenMaxRequestsReachedUntilMinimumTimeAlsoReached() {
for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS; i++) {
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}
// Exceeding the requests without progress limit doesn't trigger stalled state
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
// Until the minimum time has elapsed, then the next request with no progress marks as stalled
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);
downloadState.requestComplete(false);
assertWorldStateStalled(downloadState);
}
@Test
public void shouldNotBeStalledIfMinimumTimeIsReachedButMaximumRequestsIsNot() {
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}
@Test
public void shouldResetTimeSinceProgressWhenProgressIsMade() {
// Enough time has progressed but the next request makes progress so we are not stalled.
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);
downloadState.requestComplete(true);
assertThat(downloadState.getDownloadFuture()).isNotDone();
// We then reach the max number of requests without progress but the timer should have reset
for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS; i++) {
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}
assertThat(downloadState.getDownloadFuture()).isNotDone();
}
@Test
public void shouldNotAddRequestsAfterDownloadIsCompleted() {
downloadState.checkCompletion(worldStateStorage, header);
downloadState.enqueueRequests(singletonList(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)));
@ -140,4 +186,12 @@ public class WorldDownloadStateTest {
assertThat(pendingRequests.isEmpty()).isTrue();
}
private void assertWorldStateStalled(final WorldDownloadState state) {
final CompletableFuture<Void> future = state.getDownloadFuture();
assertThat(future).isCompletedExceptionally();
assertThatThrownBy(future::get)
.isInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(StalledDownloadException.class);
}
}

@ -60,6 +60,7 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -974,6 +975,8 @@ public class WorldStateDownloaderTest {
config.getWorldStateHashCountPerRequest(),
config.getWorldStateRequestParallelism(),
config.getWorldStateMaxRequestsWithoutProgress(),
config.getWorldStateMinMillisBeforeStalling(),
TestClock.fixed(),
new NoOpMetricsSystem());
}

@ -168,6 +168,7 @@ public class CliquePantheonController implements PantheonController<CliqueContex
ethProtocolManager.ethContext(),
syncState,
dataDirectory,
clock,
metricsSystem);
final TransactionPool transactionPool =

@ -160,6 +160,7 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
istanbul64ProtocolManager.ethContext(),
syncState,
dataDirectory,
clock,
metricsSystem);
final Runnable closer =

@ -197,6 +197,7 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
ethProtocolManager.ethContext(),
syncState,
dataDirectory,
clock,
metricsSystem);
final TransactionPool transactionPool =

@ -135,6 +135,7 @@ public class MainnetPantheonController implements PantheonController<Void> {
ethProtocolManager.ethContext(),
syncState,
dataDirectory,
clock,
metricsSystem);
final OptionalLong daoBlock = genesisConfig.getConfigOptions().getDaoForkBlock();

@ -15,9 +15,31 @@ package tech.pegasys.pantheon.testutil;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
public class TestClock {
public class TestClock extends Clock {
public static Clock fixed() {
return Clock.fixed(Instant.ofEpochSecond(10_000_000), ZoneId.systemDefault());
}
private Instant now = Instant.ofEpochSecond(24982948294L);
@Override
public ZoneId getZone() {
return ZoneOffset.UTC;
}
@Override
public Clock withZone(final ZoneId zone) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public Instant instant() {
return now;
}
public void stepMillis(final long millis) {
now = now.plusMillis(millis);
}
}

Loading…
Cancel
Save