diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java index 14e5d54a1d..843056878a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java @@ -25,6 +25,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.metrics.MetricsSystem; +import java.util.function.Supplier; + import org.apache.logging.log4j.Logger; public class ChainHeadTracker implements ConnectCallback { @@ -51,14 +53,10 @@ public class ChainHeadTracker implements ConnectCallback { final EthContext ethContext, final ProtocolSchedule protocolSchedule, final Blockchain blockchain, - final SynchronizerConfiguration syncConfiguration, + final Supplier trailingPeerRequirementsCalculator, final MetricsSystem metricsSystem) { final TrailingPeerLimiter trailingPeerLimiter = - new TrailingPeerLimiter( - ethContext.getEthPeers(), - blockchain, - syncConfiguration.trailingPeerBlocksBehindThreshold(), - syncConfiguration.maxTrailingPeers()); + new TrailingPeerLimiter(ethContext.getEthPeers(), trailingPeerRequirementsCalculator); final ChainHeadTracker tracker = new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, metricsSystem); ethContext.getEthPeers().subscribeConnect(tracker); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 47a70262b4..5b8711f407 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -74,7 +74,11 @@ public class DefaultSynchronizer implements Synchronizer { new BlockBroadcaster(ethContext)); ChainHeadTracker.trackChainHeadForPeers( - ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, metricsSystem); + ethContext, + protocolSchedule, + protocolContext.getBlockchain(), + this::calculateTrailingPeerRequirements, + metricsSystem); this.fullSyncDownloader = new FullSyncDownloader<>( @@ -92,6 +96,12 @@ public class DefaultSynchronizer implements Synchronizer { syncState); } + private TrailingPeerRequirements calculateTrailingPeerRequirements() { + return fastSynchronizer + .flatMap(FastSynchronizer::calculateTrailingPeerRequirements) + .orElse(TrailingPeerRequirements.UNRESTRICTED); + } + @Override public void start() { if (started.compareAndSet(false, true)) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 1815484e4f..2b06cb6242 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -182,4 +182,8 @@ class FastSynchronizer { return taskCollection; } + + public Optional calculateTrailingPeerRequirements() { + return fastSyncDownloader.getTrailingPeerRequirements(); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index a045c6c271..68325efb74 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -53,8 +53,6 @@ public class SynchronizerConfiguration { private final int downloaderCheckpointTimeoutsPermitted; private final int downloaderChainSegmentTimeoutsPermitted; private final int downloaderChainSegmentSize; - private final long trailingPeerBlocksBehindThreshold; - private final int maxTrailingPeers; private final int downloaderParallelism; private final int transactionsParallelism; private final int computationParallelism; @@ -75,8 +73,6 @@ public class SynchronizerConfiguration { final int downloaderCheckpointTimeoutsPermitted, final int downloaderChainSegmentTimeoutsPermitted, final int downloaderChainSegmentSize, - final long trailingPeerBlocksBehindThreshold, - final int maxTrailingPeers, final int downloaderParallelism, final int transactionsParallelism, final int computationParallelism) { @@ -95,8 +91,6 @@ public class SynchronizerConfiguration { this.downloaderCheckpointTimeoutsPermitted = downloaderCheckpointTimeoutsPermitted; this.downloaderChainSegmentTimeoutsPermitted = downloaderChainSegmentTimeoutsPermitted; this.downloaderChainSegmentSize = downloaderChainSegmentSize; - this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold; - this.maxTrailingPeers = maxTrailingPeers; this.downloaderParallelism = downloaderParallelism; this.transactionsParallelism = transactionsParallelism; this.computationParallelism = computationParallelism; @@ -159,19 +153,6 @@ public class SynchronizerConfiguration { return downloaderChainSegmentSize; } - /** - * The number of blocks behind we allow a peer to be before considering them a trailing peer. - * - * @return the maximum number of blocks behind a peer can be while being considered current. - */ - public long trailingPeerBlocksBehindThreshold() { - return trailingPeerBlocksBehindThreshold; - } - - public int maxTrailingPeers() { - return maxTrailingPeers; - } - public int downloaderParallelism() { return downloaderParallelism; } @@ -224,8 +205,6 @@ public class SynchronizerConfiguration { private int downloaderCheckpointTimeoutsPermitted = 5; private int downloaderChainSegmentTimeoutsPermitted = 5; private int downloaderChainSegmentSize = 200; - private long trailingPeerBlocksBehindThreshold; - private int maxTrailingPeers = Integer.MAX_VALUE; private int downloaderParallelism = 4; private int transactionsParallelism = 2; private int computationParallelism = Runtime.getRuntime().availableProcessors(); @@ -293,16 +272,6 @@ public class SynchronizerConfiguration { return this; } - public Builder trailingPeerBlocksBehindThreshold(final long trailingPeerBlocksBehindThreshold) { - this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold; - return this; - } - - public Builder maxTrailingPeers(final int maxTrailingPeers) { - this.maxTrailingPeers = maxTrailingPeers; - return this; - } - public Builder downloaderParallelisim(final int downloaderParallelism) { this.downloaderParallelism = downloaderParallelism; return this; @@ -361,8 +330,6 @@ public class SynchronizerConfiguration { downloaderCheckpointTimeoutsPermitted, downloaderChainSegmentTimeoutsPermitted, downloaderChainSegmentSize, - trailingPeerBlocksBehindThreshold, - maxTrailingPeers, downloaderParallelism, transactionsParallelism, computationParallelism); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiter.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiter.java index 01a07f4463..69a017666a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiter.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiter.java @@ -23,6 +23,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.Discon import java.util.Comparator; import java.util.List; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; @@ -37,30 +38,27 @@ public class TrailingPeerLimiter implements BlockAddedObserver { // how often we rerun the check. private static final int RECHECK_PEERS_WHEN_BLOCK_NUMBER_MULTIPLE_OF = 100; private final EthPeers ethPeers; - private final Blockchain blockchain; - private final long trailingPeerBlocksBehindThreshold; - private final int maxTrailingPeers; + private final Supplier trailingPeerRequirementsCalculator; public TrailingPeerLimiter( final EthPeers ethPeers, - final Blockchain blockchain, - final long trailingPeerBlocksBehindThreshold, - final int maxTrailingPeers) { + final Supplier trailingPeerRequirementsCalculator) { this.ethPeers = ethPeers; - this.blockchain = blockchain; - this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold; - this.maxTrailingPeers = maxTrailingPeers; + this.trailingPeerRequirementsCalculator = trailingPeerRequirementsCalculator; } public void enforceTrailingPeerLimit() { + final TrailingPeerRequirements requirements = trailingPeerRequirementsCalculator.get(); + if (requirements.getMaxTrailingPeers() == Long.MAX_VALUE) { + return; + } + final long minimumHeightToBeUpToDate = requirements.getMinimumHeightToBeUpToDate(); + final long maxTrailingPeers = requirements.getMaxTrailingPeers(); final List trailingPeers = ethPeers .availablePeers() .filter(peer -> peer.chainState().hasEstimatedHeight()) - .filter( - peer -> - peer.chainState().getEstimatedHeight() + trailingPeerBlocksBehindThreshold - < blockchain.getChainHeadBlockNumber()) + .filter(peer -> peer.chainState().getEstimatedHeight() < minimumHeightToBeUpToDate) .sorted(BY_CHAIN_HEIGHT) .collect(Collectors.toList()); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerRequirements.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerRequirements.java new file mode 100644 index 0000000000..96a76ff6c4 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerRequirements.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; + +import java.util.Objects; + +import com.google.common.base.MoreObjects; + +public class TrailingPeerRequirements { + public static TrailingPeerRequirements UNRESTRICTED = + new TrailingPeerRequirements(BlockHeader.GENESIS_BLOCK_NUMBER, Long.MAX_VALUE); + private final long minimumHeightToBeUpToDate; + private final long maxTrailingPeers; + + public TrailingPeerRequirements( + final long minimumHeightToBeUpToDate, final long maxTrailingPeers) { + this.minimumHeightToBeUpToDate = minimumHeightToBeUpToDate; + this.maxTrailingPeers = maxTrailingPeers; + } + + public long getMinimumHeightToBeUpToDate() { + return minimumHeightToBeUpToDate; + } + + public long getMaxTrailingPeers() { + return maxTrailingPeers; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TrailingPeerRequirements that = (TrailingPeerRequirements) o; + return minimumHeightToBeUpToDate == that.minimumHeightToBeUpToDate + && maxTrailingPeers == that.maxTrailingPeers; + } + + @Override + public int hashCode() { + return Objects.hash(minimumHeightToBeUpToDate, maxTrailingPeers); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("minimumHeightToBeUpToDate", minimumHeightToBeUpToDate) + .add("maxTrailingPeers", maxTrailingPeers) + .toString(); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java index a23dd06fc7..cc7cbd3370 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java @@ -15,10 +15,12 @@ package tech.pegasys.pantheon.ethereum.eth.sync.fastsync; import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally; import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose; +import tech.pegasys.pantheon.ethereum.eth.sync.TrailingPeerRequirements; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader; import tech.pegasys.pantheon.util.ExceptionUtils; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.logging.log4j.LogManager; @@ -29,6 +31,7 @@ public class FastSyncDownloader { private final FastSyncActions fastSyncActions; private final WorldStateDownloader worldStateDownloader; private final FastSyncStateStorage fastSyncStateStorage; + private volatile Optional trailingPeerRequirements = Optional.empty(); public FastSyncDownloader( final FastSyncActions fastSyncActions, @@ -45,12 +48,14 @@ public class FastSyncDownloader { .waitForSuitablePeers(fastSyncState) .thenCompose(fastSyncActions::selectPivotBlock) .thenCompose(fastSyncActions::downloadPivotBlockHeader) + .thenApply(this::updateMaxTrailingPeers) .thenApply(this::storeState) .thenCompose(this::downloadChainAndWorldState), this::handleWorldStateUnavailable); } private CompletableFuture handleWorldStateUnavailable(final Throwable error) { + trailingPeerRequirements = Optional.empty(); if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) { LOG.warn( "Fast sync was unable to download the world state. Retrying with a new pivot block."); @@ -60,6 +65,16 @@ public class FastSyncDownloader { } } + private FastSyncState updateMaxTrailingPeers(final FastSyncState state) { + if (state.getPivotBlockNumber().isPresent()) { + trailingPeerRequirements = + Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 0)); + } else { + trailingPeerRequirements = Optional.empty(); + } + return state; + } + private FastSyncState storeState(final FastSyncState state) { fastSyncStateStorage.storeState(state); return state; @@ -85,6 +100,14 @@ public class FastSyncDownloader { }); return CompletableFuture.allOf(worldStateFuture, chainFuture) - .thenApply(complete -> currentState); + .thenApply( + complete -> { + trailingPeerRequirements = Optional.empty(); + return currentState; + }); + } + + public Optional getTrailingPeerRequirements() { + return trailingPeerRequirements; } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiterTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiterTest.java index 45cbba99cb..e94dd8ba06 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiterTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiterTest.java @@ -48,12 +48,14 @@ public class TrailingPeerLimiterTest { private final List peers = new ArrayList<>(); private final TrailingPeerLimiter trailingPeerLimiter = new TrailingPeerLimiter( - ethPeers, blockchain, TRAILING_PEER_BLOCKS_BEHIND_THRESHOLD, MAX_TRAILING_PEERS); + ethPeers, + () -> + new TrailingPeerRequirements( + CHAIN_HEAD - TRAILING_PEER_BLOCKS_BEHIND_THRESHOLD, MAX_TRAILING_PEERS)); @Before public void setUp() { when(ethPeers.availablePeers()).then(invocation -> peers.stream()); - when(blockchain.getChainHeadBlockNumber()).thenReturn(CHAIN_HEAD); } @Test diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java index 2bf0a31850..140e48e202 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java @@ -26,6 +26,7 @@ import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState.EMP import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.ethereum.eth.sync.TrailingPeerRequirements; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader; @@ -309,6 +310,55 @@ public class FastSyncDownloaderTest { assertThat(result).isCompletedWithValue(secondDownloadPivotBlockHeaderState); } + @Test + public void shouldNotHaveTrailingPeerRequirementsBeforePivotBlockSelected() { + when(fastSyncActions.waitForSuitablePeers(EMPTY_SYNC_STATE)) + .thenReturn(new CompletableFuture<>()); + + downloader.start(EMPTY_SYNC_STATE); + + verify(fastSyncActions).waitForSuitablePeers(EMPTY_SYNC_STATE); + assertThat(downloader.getTrailingPeerRequirements()).isEmpty(); + } + + @Test + public void shouldNotAllowPeersBeforePivotBlockOnceSelected() { + final FastSyncState selectPivotBlockState = new FastSyncState(50); + final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader(); + final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader); + when(fastSyncActions.waitForSuitablePeers(EMPTY_SYNC_STATE)).thenReturn(COMPLETE); + when(fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE)) + .thenReturn(completedFuture(selectPivotBlockState)); + when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) + .thenReturn(completedFuture(downloadPivotBlockHeaderState)); + when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)) + .thenReturn(new CompletableFuture<>()); + when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(new CompletableFuture<>()); + + downloader.start(EMPTY_SYNC_STATE); + assertThat(downloader.getTrailingPeerRequirements()) + .contains(new TrailingPeerRequirements(50, 0)); + } + + @Test + public void shouldNotHaveTrailingPeerRequirementsAfterDownloadCompletes() { + final FastSyncState selectPivotBlockState = new FastSyncState(50); + final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader(); + final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader); + when(fastSyncActions.waitForSuitablePeers(EMPTY_SYNC_STATE)).thenReturn(COMPLETE); + when(fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE)) + .thenReturn(completedFuture(selectPivotBlockState)); + when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState)) + .thenReturn(completedFuture(downloadPivotBlockHeaderState)); + when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(COMPLETE); + when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(completedFuture(null)); + + final CompletableFuture result = downloader.start(EMPTY_SYNC_STATE); + assertThat(result).isDone(); + + assertThat(downloader.getTrailingPeerRequirements()).isEmpty(); + } + private CompletableFuture completedExceptionally(final Throwable error) { final CompletableFuture result = new CompletableFuture<>(); result.completeExceptionally(error); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/cli/DefaultCommandValues.java b/pantheon/src/main/java/tech/pegasys/pantheon/cli/DefaultCommandValues.java index e5a206008b..0403b01533 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/cli/DefaultCommandValues.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/cli/DefaultCommandValues.java @@ -56,7 +56,6 @@ public interface DefaultCommandValues { // but we use FULL for the moment as Fast is still in progress SyncMode DEFAULT_SYNC_MODE = SyncMode.FULL; int DEFAULT_MAX_PEERS = 25; - int MAX_TRAILING_PEERS = Integer.MAX_VALUE; static Path getDefaultPantheonDataPath(final Object command) { // this property is retrieved from Gradle tasks or Pantheon running shell script. diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java index 3379cc3ee1..e71c01e33e 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java @@ -902,7 +902,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable { private SynchronizerConfiguration buildSyncConfig() { synchronizerConfigurationBuilder.syncMode(syncMode); - synchronizerConfigurationBuilder.maxTrailingPeers(MAX_TRAILING_PEERS); return synchronizerConfigurationBuilder.build(); }