From 069b2393a9a23cdb61494ee1e0818a4579daa740 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Thu, 24 Jan 2019 07:53:25 +1000 Subject: [PATCH] [NC-1273] Start of fast sync downloader (#613) * Add support for initiating fast sync to DefaultSynchronizer, starting full sync once that completes. * Wait for a minimum number of peers to be available before starting fast sync. * Select pivot block. * Fetch the pivot block header. * Ensure that a majority of peers (which have the pivot block) agree on the block. * Pull isRetryingError and assignPeer up to AbstractRetryingPeerTask so it can be reused. Signed-off-by: Adrian Sutton --- .../eth/manager/AbstractRetryingPeerTask.java | 24 ++- .../eth/sync/DefaultSynchronizer.java | 52 ++++- ...ownloader.java => FullSyncDownloader.java} | 4 +- .../eth/sync/SynchronizerConfiguration.java | 21 ++ .../eth/sync/fastsync/FastSyncActions.java | 146 ++++++++++++++ .../eth/sync/fastsync/FastSyncDownloader.java | 42 ++++ .../eth/sync/fastsync/FastSyncError.java | 21 ++ .../eth/sync/fastsync/FastSyncException.java | 27 +++ .../eth/sync/fastsync/FastSyncState.java | 75 ++++++++ .../sync/fastsync/PivotBlockRetriever.java | 125 ++++++++++++ .../eth/sync/state/FastSyncState.java | 39 ---- .../eth/sync/tasks/CompleteBlocksTask.java | 27 +-- .../tasks/DownloadHeaderSequenceTask.java | 23 +-- .../eth/sync/tasks/ImportBlocksTask.java | 4 +- ...RetryingGetHeaderFromPeerByNumberTask.java | 77 ++++++++ .../manager/EthProtocolManagerTestUtil.java | 8 +- .../ethtaskutils/RetryingMessageTaskTest.java | 4 +- ...rTest.java => FullSyncDownloaderTest.java} | 34 ++-- .../sync/fastsync/FastSyncActionsTest.java | 156 +++++++++++++++ .../sync/fastsync/FastSyncDownloaderTest.java | 106 ++++++++++ .../fastsync/PivotBlockRetrieverTest.java | 181 ++++++++++++++++++ ...yingGetHeaderFromPeerByNumberTaskTest.java | 46 +++++ .../pegasys/pantheon/cli/PantheonCommand.java | 27 ++- .../tech/pegasys/pantheon/RunnerTest.java | 2 + .../src/test/resources/everything_config.toml | 2 +- 25 files changed, 1145 insertions(+), 128 deletions(-) rename ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/{Downloader.java => FullSyncDownloader.java} (99%) create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActions.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncError.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncException.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncState.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetriever.java delete mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/FastSyncState.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java rename ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/{DownloaderTest.java => FullSyncDownloaderTest.java} (95%) create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActionsTest.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java index e5406f23d3..7499d287af 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java @@ -14,6 +14,8 @@ package tech.pegasys.pantheon.ethereum.eth.manager; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; @@ -21,7 +23,9 @@ import tech.pegasys.pantheon.util.ExceptionUtils; import java.time.Duration; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,7 +33,7 @@ import org.apache.logging.log4j.Logger; /** * A task that will retry a fixed number of times before completing the associated CompletableFuture * exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link - * #executePeerTask()} is complete with a non-empty list the retry counter is reset. + * #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset. * * @param The type as a typed list that the peer task can get partial or full results in. */ @@ -40,6 +44,7 @@ public abstract class AbstractRetryingPeerTask> extends private final int maxRetries; private int retryCount = 0; private final LabelledMetric ethTasksTimer; + private Optional assignedPeer = Optional.empty(); /** * @param ethContext The context of the current Eth network we are attached to. @@ -56,6 +61,10 @@ public abstract class AbstractRetryingPeerTask> extends this.maxRetries = maxRetries; } + public void assignPeer(final EthPeer peer) { + assignedPeer = Optional.of(peer); + } + @Override protected void executeTask() { if (result.get().isDone()) { @@ -68,7 +77,7 @@ public abstract class AbstractRetryingPeerTask> extends } retryCount += 1; - executePeerTask() + executePeerTask(assignedPeer) .whenComplete( (peerResult, error) -> { if (error != null) { @@ -83,7 +92,7 @@ public abstract class AbstractRetryingPeerTask> extends }); } - protected abstract CompletableFuture executePeerTask(); + protected abstract CompletableFuture executePeerTask(Optional assignedPeer); private void handleTaskError(final Throwable error) { final Throwable cause = ExceptionUtils.rootCause(error); @@ -118,5 +127,12 @@ public abstract class AbstractRetryingPeerTask> extends .scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1))); } - protected abstract boolean isRetryableError(Throwable error); + private boolean isRetryableError(final Throwable error) { + final boolean isPeerError = + error instanceof PeerBreachedProtocolException + || error instanceof PeerDisconnectedException + || error instanceof NoAvailablePeersException; + + return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 118a68ee75..2d05bc8fdb 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -16,11 +16,16 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncActions; +import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader; +import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException; +import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState; import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.util.ExceptionUtils; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,7 +40,8 @@ public class DefaultSynchronizer implements Synchronizer { private final SyncState syncState; private final AtomicBoolean started = new AtomicBoolean(false); private final BlockPropagationManager blockPropagationManager; - private final Downloader downloader; + private final FullSyncDownloader fullSyncDownloader; + private final Optional> fastSyncDownloader; public DefaultSynchronizer( final SynchronizerConfiguration syncConfig, @@ -54,28 +60,60 @@ public class DefaultSynchronizer implements Synchronizer { syncState, new PendingBlocks(), ethTasksTimer); - this.downloader = - new Downloader<>( + this.fullSyncDownloader = + new FullSyncDownloader<>( syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer); ChainHeadTracker.trackChainHeadForPeers( ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer); - if (syncConfig.syncMode().equals(SyncMode.FAST)) { + if (syncConfig.syncMode() == SyncMode.FAST) { LOG.info("Fast sync enabled."); + this.fastSyncDownloader = + Optional.of( + new FastSyncDownloader<>( + new FastSyncActions<>( + syncConfig, protocolSchedule, protocolContext, ethContext, ethTasksTimer))); + } else { + this.fastSyncDownloader = Optional.empty(); } } @Override public void start() { if (started.compareAndSet(false, true)) { - LOG.info("Starting synchronizer."); - blockPropagationManager.start(); - downloader.start(); + if (fastSyncDownloader.isPresent()) { + fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult); + } else { + startFullSync(); + } } else { throw new IllegalStateException("Attempt to start an already started synchronizer."); } } + private void handleFastSyncResult(final FastSyncState result, final Throwable error) { + + final Throwable rootCause = ExceptionUtils.rootCause(error); + if (rootCause instanceof FastSyncException) { + LOG.error( + "Fast sync failed ({}), switching to full sync.", + ((FastSyncException) rootCause).getError()); + } else if (error != null) { + LOG.error("Fast sync failed, switching to full sync.", error); + } else { + LOG.info( + "Fast sync completed successfully with pivot block {}", + result.getPivotBlockNumber().getAsLong()); + } + startFullSync(); + } + + private void startFullSync() { + LOG.info("Starting synchronizer."); + blockPropagationManager.start(); + fullSyncDownloader.start(); + } + @Override public Optional getSyncStatus() { if (!started.get()) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloader.java similarity index 99% rename from ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java rename to ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloader.java index 754f0e7902..ac728d3b7e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloader.java @@ -54,7 +54,7 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class Downloader { +public class FullSyncDownloader { private static final Logger LOG = LogManager.getLogger(); private final SynchronizerConfiguration config; @@ -73,7 +73,7 @@ public class Downloader { private long syncTargetDisconnectListenerId; protected CompletableFuture currentTask; - Downloader( + FullSyncDownloader( final SynchronizerConfiguration config, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 0a7fddf092..997f0d91e6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.util.uint.UInt256; +import java.time.Duration; import java.util.Optional; import com.google.common.collect.Range; @@ -30,10 +31,14 @@ public class SynchronizerConfiguration { // TODO: Determine reasonable defaults here public static int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500; public static float DEFAULT_FULL_VALIDATION_RATE = .1f; + public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5; + private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofSeconds(3); // Fast sync config private final int fastSyncPivotDistance; private final float fastSyncFullValidationRate; + private final int fastSyncMinimumPeerCount; + private final Duration fastSyncMaximumPeerWaitTime; // Block propagation config private final Range blockPropagationRange; @@ -58,6 +63,8 @@ public class SynchronizerConfiguration { final SyncMode requestedSyncMode, final int fastSyncPivotDistance, final float fastSyncFullValidationRate, + final int fastSyncMinimumPeerCount, + final Duration fastSyncMaximumPeerWaitTime, final Range blockPropagationRange, final Optional syncMode, final long downloaderChangeTargetThresholdByHeight, @@ -73,6 +80,8 @@ public class SynchronizerConfiguration { this.requestedSyncMode = requestedSyncMode; this.fastSyncPivotDistance = fastSyncPivotDistance; this.fastSyncFullValidationRate = fastSyncFullValidationRate; + this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount; + this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime; this.blockPropagationRange = blockPropagationRange; this.syncMode = syncMode; this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight; @@ -115,6 +124,8 @@ public class SynchronizerConfiguration { requestedSyncMode, fastSyncPivotDistance, fastSyncFullValidationRate, + fastSyncMinimumPeerCount, + fastSyncMaximumPeerWaitTime, blockPropagationRange, Optional.of(actualSyncMode), downloaderChangeTargetThresholdByHeight, @@ -222,6 +233,14 @@ public class SynchronizerConfiguration { return fastSyncFullValidationRate; } + public int getFastSyncMinimumPeerCount() { + return fastSyncMinimumPeerCount; + } + + public Duration getFastSyncMaximumPeerWaitTime() { + return fastSyncMaximumPeerWaitTime; + } + public static class Builder { private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD; private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE; @@ -318,6 +337,8 @@ public class SynchronizerConfiguration { syncMode, fastSyncPivotDistance, fastSyncFullValidationRate, + DEFAULT_FAST_SYNC_MINIMUM_PEERS, + DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME, blockPropagationRange, Optional.empty(), downloaderChangeTargetThresholdByHeight, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActions.java new file mode 100644 index 0000000000..5a895c4548 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -0,0 +1,146 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.NO_PEERS_AVAILABLE; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; +import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.util.ExceptionUtils; + +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class FastSyncActions { + + private static final Logger LOG = LogManager.getLogger(); + private final SynchronizerConfiguration syncConfig; + private final ProtocolSchedule protocolSchedule; + private final ProtocolContext protocolContext; + private final EthContext ethContext; + private final LabelledMetric ethTasksTimer; + + public FastSyncActions( + final SynchronizerConfiguration syncConfig, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final LabelledMetric ethTasksTimer) { + this.syncConfig = syncConfig; + this.protocolSchedule = protocolSchedule; + this.protocolContext = protocolContext; + this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; + } + + public CompletableFuture waitForSuitablePeers() { + final WaitForPeersTask waitForPeersTask = + WaitForPeersTask.create( + ethContext, syncConfig.getFastSyncMinimumPeerCount(), ethTasksTimer); + + final EthScheduler scheduler = ethContext.getScheduler(); + final CompletableFuture result = new CompletableFuture<>(); + scheduler + .timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime()) + .handle( + (waitResult, error) -> { + if (ExceptionUtils.rootCause(error) instanceof TimeoutException) { + if (ethContext.getEthPeers().availablePeerCount() > 0) { + LOG.warn( + "Fast sync timed out before minimum peer count was reached. Continuing with reduced peers."); + result.complete(null); + } else { + waitForAnyPeer() + .thenAccept(result::complete) + .exceptionally( + taskError -> { + result.completeExceptionally(error); + return null; + }); + } + } else if (error != null) { + LOG.error("Failed to find peers for fast sync", error); + result.completeExceptionally(error); + } else { + result.complete(null); + } + return null; + }); + + return result; + } + + private CompletableFuture waitForAnyPeer() { + LOG.warn( + "Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer."); + final CompletableFuture result = new CompletableFuture<>(); + waitForAnyPeer(result); + return result; + } + + private void waitForAnyPeer(final CompletableFuture result) { + ethContext + .getScheduler() + .timeout(WaitForPeersTask.create(ethContext, 1, ethTasksTimer)) + .whenComplete( + (waitResult, throwable) -> { + if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) { + waitForAnyPeer(result); + } else if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(waitResult); + } + }); + } + + public FastSyncState selectPivotBlock() { + return ethContext + .getEthPeers() + .bestPeer() + .map( + peer -> { + final long pivotBlockNumber = + peer.chainState().getEstimatedHeight() - syncConfig.fastSyncPivotDistance(); + if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) { + throw new FastSyncException(CHAIN_TOO_SHORT); + } else { + LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber); + return new FastSyncState(OptionalLong.of(pivotBlockNumber)); + } + }) + .orElseThrow(() -> new FastSyncException(NO_PEERS_AVAILABLE)); + } + + public CompletableFuture downloadPivotBlockHeader( + final FastSyncState currentState) { + return new PivotBlockRetriever<>( + protocolSchedule, + ethContext, + ethTasksTimer, + currentState.getPivotBlockNumber().getAsLong()) + .downloadPivotBlockHeader(); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java new file mode 100644 index 0000000000..7d59f7d84f --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.FAST_SYNC_UNAVAILABLE; + +import java.util.concurrent.CompletableFuture; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class FastSyncDownloader { + private static final Logger LOG = LogManager.getLogger(); + private final FastSyncActions fastSyncActions; + + public FastSyncDownloader(final FastSyncActions fastSyncActions) { + this.fastSyncActions = fastSyncActions; + } + + public CompletableFuture start() { + LOG.info("Fast sync enabled"); + return fastSyncActions + .waitForSuitablePeers() + .thenApply(state -> fastSyncActions.selectPivotBlock()) + .thenCompose(fastSyncActions::downloadPivotBlockHeader) + .thenCompose( + state -> { + LOG.info("Reached end of current fast sync implementation with state {}", state); + throw new FastSyncException(FAST_SYNC_UNAVAILABLE); + }); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncError.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncError.java new file mode 100644 index 0000000000..54640b9e3a --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncError.java @@ -0,0 +1,21 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +public enum FastSyncError { + FAST_SYNC_UNAVAILABLE, + NO_PEERS_AVAILABLE, + CHAIN_TOO_SHORT, + PIVOT_BLOCK_HEADER_MISMATCH, + UNEXPECTED_ERROR +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncException.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncException.java new file mode 100644 index 0000000000..6ef16c36cc --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +public class FastSyncException extends RuntimeException { + + private final FastSyncError error; + + public FastSyncException(final FastSyncError error) { + super("Fast sync failed: " + error); + this.error = error; + } + + public FastSyncError getError() { + return error; + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncState.java new file mode 100644 index 0000000000..69718124fb --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncState.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; + +import com.google.common.base.MoreObjects; + +public class FastSyncState { + + private final OptionalLong pivotBlockNumber; + private final Optional pivotBlockHeader; + + public FastSyncState() { + this(OptionalLong.empty(), Optional.empty()); + } + + public FastSyncState(final OptionalLong pivotBlockNumber) { + this(pivotBlockNumber, Optional.empty()); + } + + public FastSyncState( + final OptionalLong pivotBlockNumber, final Optional pivotBlockHeader) { + this.pivotBlockNumber = pivotBlockNumber; + this.pivotBlockHeader = pivotBlockHeader; + } + + public OptionalLong getPivotBlockNumber() { + return pivotBlockNumber; + } + + public Optional getPivotBlockHeader() { + return pivotBlockHeader; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final FastSyncState that = (FastSyncState) o; + return Objects.equals(pivotBlockNumber, that.pivotBlockNumber) + && Objects.equals(pivotBlockHeader, that.pivotBlockHeader); + } + + @Override + public int hashCode() { + return Objects.hash(pivotBlockNumber, pivotBlockHeader); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("pivotBlockNumber", pivotBlockNumber) + .add("pivotBlockHeader", pivotBlockHeader) + .toString(); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetriever.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetriever.java new file mode 100644 index 0000000000..0f9d81e129 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetriever.java @@ -0,0 +1,125 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PivotBlockRetriever { + + private static final Logger LOG = LogManager.getLogger(); + private static final int MAX_PIVOT_BLOCK_RETRIES = 3; + private final long pivotBlockNumber; + private final EthContext ethContext; + private final LabelledMetric ethTasksTimer; + private final ProtocolSchedule protocolSchedule; + private final Map confirmationsByBlockNumber = + new ConcurrentHashMap<>(); + private final CompletableFuture result = new CompletableFuture<>(); + private final Collection getHeaderTasks = + new ConcurrentLinkedQueue<>(); + + public PivotBlockRetriever( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final LabelledMetric ethTasksTimer, + final long pivotBlockNumber) { + this.pivotBlockNumber = pivotBlockNumber; + this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; + this.protocolSchedule = protocolSchedule; + } + + @SuppressWarnings("rawtypes") + public CompletableFuture downloadPivotBlockHeader() { + final CompletableFuture[] requestFutures = requestHeaderFromAllPeers(); + + CompletableFuture.allOf(requestFutures) + .thenRun( + () -> { + // All requests have completed but we still haven't reached agreement on a header. + result.completeExceptionally( + new FastSyncException(FastSyncError.PIVOT_BLOCK_HEADER_MISMATCH)); + }); + return result; + } + + @SuppressWarnings("rawtypes") + private CompletableFuture[] requestHeaderFromAllPeers() { + final List peersToQuery = + ethContext + .getEthPeers() + .availablePeers() + .filter(peer -> peer.chainState().getEstimatedHeight() >= pivotBlockNumber) + .collect(Collectors.toList()); + + final int confirmationsRequired = peersToQuery.size() / 2 + 1; + return peersToQuery + .stream() + .map( + peer -> { + final RetryingGetHeaderFromPeerByNumberTask getHeaderTask = createGetHeaderTask(peer); + getHeaderTasks.add(getHeaderTask); + return ethContext + .getScheduler() + .scheduleSyncWorkerTask(getHeaderTask::getHeader) + .thenAccept(header -> countHeader(header, confirmationsRequired)); + }) + .toArray(CompletableFuture[]::new); + } + + private RetryingGetHeaderFromPeerByNumberTask createGetHeaderTask(final EthPeer peer) { + final RetryingGetHeaderFromPeerByNumberTask task = + RetryingGetHeaderFromPeerByNumberTask.forPivotBlock( + protocolSchedule, ethContext, ethTasksTimer, pivotBlockNumber, MAX_PIVOT_BLOCK_RETRIES); + task.assignPeer(peer); + return task; + } + + private void countHeader(final BlockHeader header, final int confirmationsRequired) { + final int confirmations = + confirmationsByBlockNumber + .computeIfAbsent(header, key -> new AtomicInteger(0)) + .incrementAndGet(); + LOG.debug( + "Received header {} which now has {} confirmations out of {} required.", + header.getHash(), + confirmations, + confirmationsRequired); + if (confirmations >= confirmationsRequired) { + LOG.info( + "Confirmed pivot block hash {} with {} confirmations", header.getHash(), confirmations); + result.complete(new FastSyncState(OptionalLong.of(header.getNumber()), Optional.of(header))); + getHeaderTasks.forEach(RetryingGetHeaderFromPeerByNumberTask::cancel); + } + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/FastSyncState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/FastSyncState.java deleted file mode 100644 index 17c0b6fbfa..0000000000 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/FastSyncState.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2018 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.ethereum.eth.sync.state; - -import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; - -public final class FastSyncState { - private long fastSyncTargetBlockNumber = -1; - - private final SynchronizerConfiguration config; - - public FastSyncState(final SynchronizerConfiguration config) { - this.config = config; - } - - /** - * Registers the chain height that we're trying to sync to. - * - * @param blockNumber the height of the chain we are syncing to. - */ - public void setFastSyncChainTarget(final long blockNumber) { - fastSyncTargetBlockNumber = blockNumber; - } - - /** @return the block number at which we switch from fast sync to full sync */ - public long pivot() { - return Math.max(fastSyncTargetBlockNumber - config.fastSyncPivotDistance(), 0); - } -} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java index fe6b7443db..b087d6391b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -20,9 +20,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResul import tech.pegasys.pantheon.ethereum.eth.manager.AbstractRetryingPeerTask; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; @@ -32,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; @@ -54,7 +50,6 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> private final List headers; private final Map blocks; - private Optional assignedPeer = Optional.empty(); private CompleteBlocksTask( final ProtocolSchedule protocolSchedule, @@ -92,26 +87,12 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> } @Override - protected CompletableFuture> executePeerTask() { - return requestBodies().thenCompose(this::processBodiesResult); + protected CompletableFuture> executePeerTask(final Optional assignedPeer) { + return requestBodies(assignedPeer).thenCompose(this::processBodiesResult); } - @Override - protected boolean isRetryableError(final Throwable error) { - final boolean isPeerError = - error instanceof PeerBreachedProtocolException - || error instanceof PeerDisconnectedException - || error instanceof NoAvailablePeersException; - - return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError); - } - - public CompleteBlocksTask assignPeer(final EthPeer peer) { - assignedPeer = Optional.of(peer); - return this; - } - - private CompletableFuture>> requestBodies() { + private CompletableFuture>> requestBodies( + final Optional assignedPeer) { final List incompleteHeaders = incompleteHeaders(); LOG.debug( "Requesting bodies to complete {} blocks, starting with {}.", diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 856be8712e..1481fee42a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -21,9 +21,7 @@ import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.ethereum.eth.manager.AbstractRetryingPeerTask; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException; -import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -34,8 +32,8 @@ import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import com.google.common.primitives.Ints; import org.apache.logging.log4j.LogManager; @@ -120,11 +118,12 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask> executePeerTask() { + protected CompletableFuture> executePeerTask( + final Optional assignedPeer) { LOG.debug( "Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber() - 1); final CompletableFuture> task = - downloadHeaders().thenCompose(this::processHeaders); + downloadHeaders(assignedPeer).thenCompose(this::processHeaders); return task.whenComplete( (r, t) -> { // We're done if we've filled all requested headers @@ -138,15 +137,8 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask>> downloadHeaders() { + private CompletableFuture>> downloadHeaders( + final Optional assignedPeer) { // Figure out parameters for our headers request final boolean partiallyFilled = lastFilledHeaderIndex < segmentLength; final BlockHeader referenceHeaderForNextRequest = @@ -165,6 +157,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask extends AbstractPeerTask> { } final CompleteBlocksTask task = CompleteBlocksTask.forHeaders( - protocolSchedule, ethContext, headers.getResult(), ethTasksTimer) - .assignPeer(peer); + protocolSchedule, ethContext, headers.getResult(), ethTasksTimer); + task.assignPeer(peer); return executeSubTask(() -> ethContext.getScheduler().timeout(task)); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java new file mode 100644 index 0000000000..ea37b607e2 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractRetryingPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public class RetryingGetHeaderFromPeerByNumberTask + extends AbstractRetryingPeerTask> { + private final ProtocolSchedule protocolSchedule; + private final EthContext ethContext; + private final LabelledMetric ethTasksTimer; + private final long pivotBlockNumber; + + private RetryingGetHeaderFromPeerByNumberTask( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final LabelledMetric ethTasksTimer, + final long pivotBlockNumber, + final int maxRetries) { + super(ethContext, maxRetries, ethTasksTimer); + this.protocolSchedule = protocolSchedule; + this.ethContext = ethContext; + this.ethTasksTimer = ethTasksTimer; + this.pivotBlockNumber = pivotBlockNumber; + } + + public static RetryingGetHeaderFromPeerByNumberTask forPivotBlock( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final LabelledMetric ethTasksTimer, + final long pivotBlockNumber, + final int maxRetries) { + return new RetryingGetHeaderFromPeerByNumberTask( + protocolSchedule, ethContext, ethTasksTimer, pivotBlockNumber, maxRetries); + } + + @Override + protected CompletableFuture> executePeerTask( + final Optional assignedPeer) { + final AbstractGetHeadersFromPeerTask getHeadersTask = + GetHeadersFromPeerByNumberTask.forSingleNumber( + protocolSchedule, ethContext, pivotBlockNumber, ethTasksTimer); + assignedPeer.ifPresent(getHeadersTask::assignPeer); + return executeSubTask(getHeadersTask::run) + .thenApply( + peerResult -> { + if (!peerResult.getResult().isEmpty()) { + result.get().complete(peerResult.getResult()); + } + return peerResult.getResult(); + }); + } + + public CompletableFuture getHeader() { + return run().thenApply(singletonList -> singletonList.get(0)); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java index a2ae437c19..b6fb16e705 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -50,13 +50,17 @@ public class EthProtocolManagerTestUtil { return create(blockchain, worldStateArchive, () -> false); } - public static EthProtocolManager create() { + public static EthProtocolManager create(final TimeoutPolicy timeoutPolicy) { final ProtocolSchedule protocolSchedule = MainnetProtocolSchedule.create(); final GenesisConfigFile config = GenesisConfigFile.mainnet(); final GenesisState genesisState = GenesisState.fromConfig(config, protocolSchedule); final Blockchain blockchain = createInMemoryBlockchain(genesisState.getBlock()); final WorldStateArchive worldStateArchive = createInMemoryWorldStateArchive(); - return create(blockchain, worldStateArchive); + return create(blockchain, worldStateArchive, timeoutPolicy); + } + + public static EthProtocolManager create() { + return create(() -> false); } public static void broadcastMessage( diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java index 2c548d6cad..7cd23e1446 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java @@ -73,11 +73,11 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest // Respond max times with no data respondingPeer.respondTimes(emptyResponder, maxRetries); - assertThat(future.isDone()).isFalse(); + assertThat(future).isNotDone(); // Next retry should fail respondingPeer.respond(emptyResponder); - assertThat(future.isDone()).isTrue(); + assertThat(future).isDone(); assertThat(future.isCompletedExceptionally()).isTrue(); assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloaderTest.java similarity index 95% rename from ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java rename to ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloaderTest.java index 0d4e169531..c596626a92 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/FullSyncDownloaderTest.java @@ -55,7 +55,7 @@ import java.util.function.Function; import org.junit.Before; import org.junit.Test; -public class DownloaderTest { +public class FullSyncDownloaderTest { protected ProtocolSchedule protocolSchedule; protected EthProtocolManager ethProtocolManager; @@ -88,12 +88,12 @@ public class DownloaderTest { ethTashsTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER; } - private Downloader downloader(final SynchronizerConfiguration syncConfig) { - return new Downloader<>( + private FullSyncDownloader downloader(final SynchronizerConfiguration syncConfig) { + return new FullSyncDownloader<>( syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTashsTimer); } - private Downloader downloader() { + private FullSyncDownloader downloader() { final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().build().validated(localBlockchain); return downloader(syncConfig); @@ -115,7 +115,7 @@ public class DownloaderTest { .downloaderChainSegmentSize(10) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); while (!syncState.syncTarget().isPresent()) { @@ -147,7 +147,7 @@ public class DownloaderTest { .downloaderChainSegmentSize(10) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); while (!syncState.syncTarget().isPresent()) { @@ -179,7 +179,7 @@ public class DownloaderTest { .downloaderChainSegmentSize(4) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); while (!syncState.syncTarget().isPresent()) { @@ -206,7 +206,7 @@ public class DownloaderTest { EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); final Responder responder = RespondingEthPeer.blockchainResponder(otherBlockchain); - final Downloader downloader = downloader(); + final FullSyncDownloader downloader = downloader(); downloader.start(); peer.respond(responder); @@ -246,7 +246,7 @@ public class DownloaderTest { .downloaderChainSegmentSize(10) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); while (localBlockchain.getChainHeadBlockNumber() < targetBlock) { @@ -268,7 +268,7 @@ public class DownloaderTest { final RespondingEthPeer peerB = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, localTd.plus(200)); - final Downloader downloader = downloader(); + final FullSyncDownloader downloader = downloader(); downloader.start(); // Process until the sync target is selected @@ -291,7 +291,7 @@ public class DownloaderTest { EthProtocolManagerTestUtil.createPeer(ethProtocolManager, localTd.plus(200), 0); peerA.getEthPeer().chainState().update(gen.hash(), 50); - final Downloader downloader = downloader(); + final FullSyncDownloader downloader = downloader(); downloader.start(); // Process until the sync target is selected @@ -319,7 +319,7 @@ public class DownloaderTest { .downloaderChangeTargetThresholdByHeight(10) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); // Process until the sync target is selected @@ -359,7 +359,7 @@ public class DownloaderTest { .downloaderChangeTargetThresholdByHeight(1000) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); // Process until the sync target is selected @@ -399,7 +399,7 @@ public class DownloaderTest { .downloaderChangeTargetThresholdByTd(UInt256.of(10)) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); // Process until the sync target is selected @@ -446,7 +446,7 @@ public class DownloaderTest { .downloaderChangeTargetThresholdByTd(UInt256.of(100_000_000L)) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); downloader.start(); // Process until the sync target is selected @@ -491,7 +491,7 @@ public class DownloaderTest { .downloaderHeadersRequestSize(3) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); final long bestPeerChainHead = otherBlockchain.getChainHeadBlockNumber(); final RespondingEthPeer bestPeer = @@ -565,7 +565,7 @@ public class DownloaderTest { .downloaderHeadersRequestSize(3) .build() .validated(localBlockchain); - final Downloader downloader = downloader(syncConfig); + final FullSyncDownloader downloader = downloader(syncConfig); // Setup the best peer we should use as our sync target final long bestPeerChainHead = otherBlockchain.getChainHeadBlockNumber(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActionsTest.java new file mode 100644 index 0000000000..eda59d844c --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.NO_PEERS_AVAILABLE; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_TIMER; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; +import tech.pegasys.pantheon.ethereum.eth.sync.SyncMode; +import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; +import org.junit.Before; +import org.junit.Test; + +public class FastSyncActionsTest { + + private final SynchronizerConfiguration syncConfig = + new SynchronizerConfiguration.Builder() + .syncMode(SyncMode.FAST) + .fastSyncPivotDistance(1000) + .build(); + + private ProtocolSchedule protocolSchedule; + private ProtocolContext protocolContext; + + private final LabelledMetric ethTasksTimer = NO_OP_LABELLED_TIMER; + private final AtomicInteger timeoutCount = new AtomicInteger(0); + private FastSyncActions fastSyncActions; + private EthProtocolManager ethProtocolManager; + private MutableBlockchain blockchain; + + @Before + public void setUp() { + final BlockchainSetupUtil blockchainSetupUtil = BlockchainSetupUtil.forTesting(); + blockchainSetupUtil.importAllBlocks(); + blockchain = blockchainSetupUtil.getBlockchain(); + protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); + protocolContext = blockchainSetupUtil.getProtocolContext(); + ethProtocolManager = + EthProtocolManagerTestUtil.create( + blockchain, + blockchainSetupUtil.getWorldArchive(), + () -> timeoutCount.getAndDecrement() > 0); + fastSyncActions = + new FastSyncActions<>( + syncConfig, + protocolSchedule, + protocolContext, + ethProtocolManager.ethContext(), + ethTasksTimer); + } + + @Test + public void waitForPeersShouldSucceedIfEnoughPeersAreFound() { + for (int i = 0; i < syncConfig.getFastSyncMinimumPeerCount(); i++) { + EthProtocolManagerTestUtil.createPeer(ethProtocolManager); + } + final CompletableFuture result = fastSyncActions.waitForSuitablePeers(); + assertThat(result).isCompleted(); + } + + @Test + public void waitForPeersShouldReportSuccessWhenTimeLimitReachedAndAPeerIsAvailable() { + EthProtocolManagerTestUtil.createPeer(ethProtocolManager); + timeoutCount.set(Integer.MAX_VALUE); + assertThat(fastSyncActions.waitForSuitablePeers()).isCompleted(); + } + + @Test + public void waitForPeersShouldContinueWaitingUntilAtLeastOnePeerIsAvailable() { + timeoutCount.set(1); + final CompletableFuture result = fastSyncActions.waitForSuitablePeers(); + assertThat(result).isNotCompleted(); + + EthProtocolManagerTestUtil.createPeer(ethProtocolManager); + assertThat(result).isCompleted(); + } + + @Test + public void selectPivotBlockShouldSelectBlockPivotDistanceFromBestPeer() { + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000); + + final FastSyncState result = fastSyncActions.selectPivotBlock(); + final FastSyncState expected = new FastSyncState(OptionalLong.of(4000)); + assertThat(result).isEqualTo(expected); + } + + @Test + public void selectPivotBlockShouldFailIfNoPeersAreAvailable() { + assertThrowsFastSyncException(NO_PEERS_AVAILABLE, fastSyncActions::selectPivotBlock); + } + + @Test + public void selectPivotBlockShouldFailIfBestPeerChainIsShorterThanPivotDistance() { + EthProtocolManagerTestUtil.createPeer( + ethProtocolManager, syncConfig.fastSyncPivotDistance() - 1); + + assertThrowsFastSyncException(CHAIN_TOO_SHORT, fastSyncActions::selectPivotBlock); + } + + @Test + public void selectPivotBlockShouldFailIfBestPeerChainIsEqualToPivotDistance() { + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, syncConfig.fastSyncPivotDistance()); + + assertThrowsFastSyncException(CHAIN_TOO_SHORT, fastSyncActions::selectPivotBlock); + } + + @Test + public void downloadPivotBlockHeaderShouldRetrievePivotBlockHeader() { + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001); + final CompletableFuture result = + fastSyncActions.downloadPivotBlockHeader(new FastSyncState(OptionalLong.of(1))); + assertThat(result).isNotCompleted(); + + final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); + peer.respond(responder); + + assertThat(result) + .isCompletedWithValue(new FastSyncState(OptionalLong.of(1), blockchain.getBlockHeader(1))); + } + + private void assertThrowsFastSyncException( + final FastSyncError expectedError, final ThrowingCallable callable) { + assertThatThrownBy(callable) + .isInstanceOf(FastSyncException.class) + .extracting(exception -> ((FastSyncException) exception).getError()) + .isEqualTo(expectedError); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java new file mode 100644 index 0000000000..b879aa48f9 --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT; +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.FAST_SYNC_UNAVAILABLE; +import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.UNEXPECTED_ERROR; + +import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; + +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +public class FastSyncDownloaderTest { + + @SuppressWarnings("unchecked") + private final FastSyncActions fastSyncActions = mock(FastSyncActions.class); + + private final FastSyncDownloader downloader = new FastSyncDownloader<>(fastSyncActions); + + @Test + public void shouldCompleteFastSyncSuccessfully() { + final FastSyncState selectPivotBlockState = new FastSyncState(OptionalLong.of(50)); + final FastSyncState downloadPivotBlockHeaderState = + new FastSyncState( + OptionalLong.of(50), + Optional.of(new BlockHeaderTestFixture().number(50).buildHeader())); + when(fastSyncActions.waitForSuitablePeers()).thenReturn(completedFuture(null)); + when(fastSyncActions.selectPivotBlock()).thenReturn(selectPivotBlockState); + when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) + .thenReturn(completedFuture(downloadPivotBlockHeaderState)); + + final CompletableFuture result = downloader.start(); + + verify(fastSyncActions).waitForSuitablePeers(); + verify(fastSyncActions).selectPivotBlock(); + verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); + verifyNoMoreInteractions(fastSyncActions); + assertCompletedExceptionally(result, FAST_SYNC_UNAVAILABLE); + } + + @Test + public void shouldAbortIfWaitForSuitablePeersFails() { + when(fastSyncActions.waitForSuitablePeers()) + .thenReturn(completedExceptionally(new FastSyncException(UNEXPECTED_ERROR))); + + final CompletableFuture result = downloader.start(); + + assertCompletedExceptionally(result, UNEXPECTED_ERROR); + + verify(fastSyncActions).waitForSuitablePeers(); + verifyNoMoreInteractions(fastSyncActions); + } + + @Test + public void shouldAbortIfSelectPivotBlockFails() { + when(fastSyncActions.waitForSuitablePeers()).thenReturn(completedFuture(null)); + when(fastSyncActions.selectPivotBlock()).thenThrow(new FastSyncException(CHAIN_TOO_SHORT)); + + final CompletableFuture result = downloader.start(); + + assertCompletedExceptionally(result, CHAIN_TOO_SHORT); + + verify(fastSyncActions).waitForSuitablePeers(); + verify(fastSyncActions).selectPivotBlock(); + verifyNoMoreInteractions(fastSyncActions); + } + + private CompletableFuture completedExceptionally(final Throwable error) { + final CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(error); + return result; + } + + private void assertCompletedExceptionally( + final CompletableFuture future, final FastSyncError expectedError) { + assertThat(future).isCompletedExceptionally(); + future.exceptionally( + actualError -> { + assertThat(actualError) + .isInstanceOf(FastSyncException.class) + .extracting(ex -> ((FastSyncException) ex).getError()) + .isEqualTo(expectedError); + return null; + }); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java new file mode 100644 index 0000000000..feff85e000 --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java @@ -0,0 +1,181 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_TIMER; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.chain.Blockchain; +import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; +import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; + +public class PivotBlockRetrieverTest { + + private static final long PIVOT_BLOCK_NUMBER = 10; + + private ProtocolContext protocolContext; + + private final LabelledMetric ethTasksTimer = NO_OP_LABELLED_TIMER; + private final AtomicBoolean timeout = new AtomicBoolean(false); + private EthProtocolManager ethProtocolManager; + private MutableBlockchain blockchain; + private PivotBlockRetriever pivotBlockRetriever; + + @Before + public void setUp() { + final BlockchainSetupUtil blockchainSetupUtil = BlockchainSetupUtil.forTesting(); + blockchainSetupUtil.importAllBlocks(); + blockchain = blockchainSetupUtil.getBlockchain(); + final ProtocolSchedule protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); + protocolContext = blockchainSetupUtil.getProtocolContext(); + ethProtocolManager = + EthProtocolManagerTestUtil.create( + blockchain, blockchainSetupUtil.getWorldArchive(), timeout::get); + pivotBlockRetriever = + new PivotBlockRetriever<>( + protocolSchedule, ethProtocolManager.ethContext(), ethTasksTimer, PIVOT_BLOCK_NUMBER); + } + + @Test + public void shouldSucceedWhenAllPeersAgree() { + final Responder responder = + RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); + final RespondingEthPeer respondingPeerA = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final RespondingEthPeer respondingPeerB = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final RespondingEthPeer respondingPeerC = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + + final CompletableFuture future = pivotBlockRetriever.downloadPivotBlockHeader(); + while (!future.isDone()) { + respondingPeerA.respond(responder); + respondingPeerB.respond(responder); + respondingPeerC.respond(responder); + } + + assertThat(future) + .isCompletedWithValue( + new FastSyncState( + OptionalLong.of(PIVOT_BLOCK_NUMBER), + blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER))); + } + + @Test + public void shouldIgnorePeersThatDoNotHaveThePivotBlock() { + final Responder responder = + RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); + final RespondingEthPeer respondingPeerA = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); + + final CompletableFuture future = pivotBlockRetriever.downloadPivotBlockHeader(); + while (!future.isDone()) { + respondingPeerA.respondWhile(responder, () -> !future.isDone()); + } + + assertThat(future) + .isCompletedWithValue( + new FastSyncState( + OptionalLong.of(PIVOT_BLOCK_NUMBER), + blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER))); + } + + @Test + public void shouldSucceedWhenMajorityOfPeersAgree() { + final Responder responder = + RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); + final Responder fakeResponder = responderForFakeBlock(); + + final RespondingEthPeer respondingPeerA = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final RespondingEthPeer respondingPeerB = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final RespondingEthPeer respondingPeerC = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + + final CompletableFuture future = pivotBlockRetriever.downloadPivotBlockHeader(); + while (!future.isDone()) { + respondingPeerA.respond(responder); + respondingPeerB.respond(fakeResponder); + respondingPeerC.respond(responder); + } + + assertThat(future) + .isCompletedWithValue( + new FastSyncState( + OptionalLong.of(PIVOT_BLOCK_NUMBER), + blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER))); + } + + @Test + public void shouldFailWhenPeersReturnDifferentHeaders() { + final Responder responderA = + RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive()); + final RespondingEthPeer respondingPeerA = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + + final Responder responderB = responderForFakeBlock(); + final RespondingEthPeer respondingPeerB = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + + // Execute task and wait for response + final AtomicReference actualError = new AtomicReference<>(); + final CompletableFuture future = pivotBlockRetriever.downloadPivotBlockHeader(); + while (!future.isDone()) { + respondingPeerA.respond(responderA); + respondingPeerB.respond(responderB); + } + future.whenComplete((result, error) -> actualError.set(error)); + + assertThat(future).isCompletedExceptionally(); + assertThat(actualError.get()) + .isInstanceOf(FastSyncException.class) + .extracting(e -> ((FastSyncException) e).getError()) + .isEqualTo(FastSyncError.PIVOT_BLOCK_HEADER_MISMATCH); + } + + private Responder responderForFakeBlock() { + final Blockchain mockBlockchain = mock(Blockchain.class); + when(mockBlockchain.getBlockHeader(PIVOT_BLOCK_NUMBER)) + .thenReturn( + Optional.of( + new BlockHeaderTestFixture() + .number(PIVOT_BLOCK_NUMBER) + .extraData(BytesValue.of(1)) + .buildHeader())); + return RespondingEthPeer.blockchainResponder(mockBlockchain); + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java new file mode 100644 index 0000000000..9f39023b15 --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import static java.util.Collections.singletonList; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; + +import java.util.List; + +import org.junit.Ignore; +import org.junit.Test; + +public class RetryingGetHeaderFromPeerByNumberTaskTest + extends RetryingMessageTaskTest> { + + private static final long PIVOT_BLOCK_NUMBER = 10; + + @Override + protected List generateDataToBeRequested() { + return singletonList(blockchain.getBlockHeader(PIVOT_BLOCK_NUMBER).get()); + } + + @Override + protected EthTask> createTask(final List requestedData) { + return RetryingGetHeaderFromPeerByNumberTask.forPivotBlock( + protocolSchedule, ethContext, ethTasksTimer, PIVOT_BLOCK_NUMBER, maxRetries); + } + + @Test + @Override + @Ignore("It's not possible to return a partial result as we only ever request one header.") + public void failsWhenPeerReturnsPartialResultThenStops() {} +} diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java index a551371364..702ab389ce 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java @@ -215,14 +215,13 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { ) private final Collection bannedNodeIds = new ArrayList<>(); - // TODO: Re-enable as per NC-1057/NC-1681 - // @Option( - // names = {"--sync-mode"}, - // paramLabel = MANDATORY_MODE_FORMAT_HELP, - // description = - // "Synchronization mode (Value can be one of ${COMPLETION-CANDIDATES}, default: - // ${DEFAULT-VALUE})" - // ) + @Option( + hidden = true, + names = {"--sync-mode"}, + paramLabel = MANDATORY_MODE_FORMAT_HELP, + description = + "Synchronization mode (Value can be one of ${COMPLETION-CANDIDATES}, default: ${DEFAULT-VALUE})" + ) private final SyncMode syncMode = DEFAULT_SYNC_MODE; @Option( @@ -603,7 +602,7 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { } final EthNetworkConfig ethNetworkConfig = ethNetworkConfig(); - PermissioningConfiguration permissioningConfiguration = permissioningConfiguration(); + final PermissioningConfiguration permissioningConfiguration = permissioningConfiguration(); ensureAllBootnodesAreInWhitelist(ethNetworkConfig, permissioningConfiguration); synchronize( @@ -622,17 +621,17 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { private void ensureAllBootnodesAreInWhitelist( final EthNetworkConfig ethNetworkConfig, final PermissioningConfiguration permissioningConfiguration) { - List bootnodes = + final List bootnodes = DiscoveryConfiguration.getBootstrapPeersFromGenericCollection( ethNetworkConfig.getBootNodes()); if (permissioningConfiguration.isNodeWhitelistSet() && bootnodes != null) { - List whitelist = + final List whitelist = permissioningConfiguration .getNodeWhitelist() .stream() .map(DefaultPeer::fromURI) .collect(Collectors.toList()); - for (Peer bootnode : bootnodes) { + for (final Peer bootnode : bootnodes) { if (!whitelist.contains(bootnode)) { throw new ParameterException( new CommandLine(this), @@ -734,7 +733,7 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { checkNotNull(runnerBuilder); - Runner runner = + final Runner runner = runnerBuilder .vertx(Vertx.vertx()) .pantheonController(controller) @@ -816,7 +815,7 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { private String genesisConfig() { try { return Resources.toString(genesisFile().toURI().toURL(), UTF_8); - } catch (IOException e) { + } catch (final IOException e) { throw new ParameterException( new CommandLine(this), String.format("Unable to load genesis file %s.", genesisFile()), diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java index e81c4b4637..a8dad2c2bf 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java @@ -65,6 +65,7 @@ import okhttp3.RequestBody; import okhttp3.Response; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -80,6 +81,7 @@ public final class RunnerTest { } @Test + @Ignore("Fast sync implementation in progress.") public void fastSyncFromGenesis() throws Exception { syncFromGenesis(SyncMode.FAST); } diff --git a/pantheon/src/test/resources/everything_config.toml b/pantheon/src/test/resources/everything_config.toml index 7512a5b57a..c60a7ceabd 100644 --- a/pantheon/src/test/resources/everything_config.toml +++ b/pantheon/src/test/resources/everything_config.toml @@ -32,7 +32,7 @@ host-whitelist=["all"] # chain network="MAINNET" private-genesis-file="~/genesis.json" -#sync-mode="fast" +sync-mode="fast" ottoman=false ropsten=false goerli=false