Disconnect peers before the pivot block while fast syncing (#1139)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 695f5fa2e1
commit e7c7d72a04
  1. 10
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java
  2. 12
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java
  3. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java
  4. 33
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
  5. 24
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiter.java
  6. 66
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerRequirements.java
  7. 25
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloader.java
  8. 6
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/TrailingPeerLimiterTest.java
  9. 50
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java
  10. 1
      pantheon/src/main/java/tech/pegasys/pantheon/cli/DefaultCommandValues.java
  11. 1
      pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java

@ -25,6 +25,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
public class ChainHeadTracker implements ConnectCallback {
@ -51,14 +53,10 @@ public class ChainHeadTracker implements ConnectCallback {
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final Blockchain blockchain,
final SynchronizerConfiguration syncConfiguration,
final Supplier<TrailingPeerRequirements> trailingPeerRequirementsCalculator,
final MetricsSystem metricsSystem) {
final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethContext.getEthPeers(),
blockchain,
syncConfiguration.trailingPeerBlocksBehindThreshold(),
syncConfiguration.maxTrailingPeers());
new TrailingPeerLimiter(ethContext.getEthPeers(), trailingPeerRequirementsCalculator);
final ChainHeadTracker tracker =
new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, metricsSystem);
ethContext.getEthPeers().subscribeConnect(tracker);

@ -74,7 +74,11 @@ public class DefaultSynchronizer<C> implements Synchronizer {
new BlockBroadcaster(ethContext));
ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, metricsSystem);
ethContext,
protocolSchedule,
protocolContext.getBlockchain(),
this::calculateTrailingPeerRequirements,
metricsSystem);
this.fullSyncDownloader =
new FullSyncDownloader<>(
@ -92,6 +96,12 @@ public class DefaultSynchronizer<C> implements Synchronizer {
syncState);
}
private TrailingPeerRequirements calculateTrailingPeerRequirements() {
return fastSynchronizer
.flatMap(FastSynchronizer::calculateTrailingPeerRequirements)
.orElse(TrailingPeerRequirements.UNRESTRICTED);
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {

@ -182,4 +182,8 @@ class FastSynchronizer<C> {
return taskCollection;
}
public Optional<TrailingPeerRequirements> calculateTrailingPeerRequirements() {
return fastSyncDownloader.getTrailingPeerRequirements();
}
}

@ -53,8 +53,6 @@ public class SynchronizerConfiguration {
private final int downloaderCheckpointTimeoutsPermitted;
private final int downloaderChainSegmentTimeoutsPermitted;
private final int downloaderChainSegmentSize;
private final long trailingPeerBlocksBehindThreshold;
private final int maxTrailingPeers;
private final int downloaderParallelism;
private final int transactionsParallelism;
private final int computationParallelism;
@ -75,8 +73,6 @@ public class SynchronizerConfiguration {
final int downloaderCheckpointTimeoutsPermitted,
final int downloaderChainSegmentTimeoutsPermitted,
final int downloaderChainSegmentSize,
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers,
final int downloaderParallelism,
final int transactionsParallelism,
final int computationParallelism) {
@ -95,8 +91,6 @@ public class SynchronizerConfiguration {
this.downloaderCheckpointTimeoutsPermitted = downloaderCheckpointTimeoutsPermitted;
this.downloaderChainSegmentTimeoutsPermitted = downloaderChainSegmentTimeoutsPermitted;
this.downloaderChainSegmentSize = downloaderChainSegmentSize;
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
@ -159,19 +153,6 @@ public class SynchronizerConfiguration {
return downloaderChainSegmentSize;
}
/**
* The number of blocks behind we allow a peer to be before considering them a trailing peer.
*
* @return the maximum number of blocks behind a peer can be while being considered current.
*/
public long trailingPeerBlocksBehindThreshold() {
return trailingPeerBlocksBehindThreshold;
}
public int maxTrailingPeers() {
return maxTrailingPeers;
}
public int downloaderParallelism() {
return downloaderParallelism;
}
@ -224,8 +205,6 @@ public class SynchronizerConfiguration {
private int downloaderCheckpointTimeoutsPermitted = 5;
private int downloaderChainSegmentTimeoutsPermitted = 5;
private int downloaderChainSegmentSize = 200;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 4;
private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();
@ -293,16 +272,6 @@ public class SynchronizerConfiguration {
return this;
}
public Builder trailingPeerBlocksBehindThreshold(final long trailingPeerBlocksBehindThreshold) {
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
return this;
}
public Builder maxTrailingPeers(final int maxTrailingPeers) {
this.maxTrailingPeers = maxTrailingPeers;
return this;
}
public Builder downloaderParallelisim(final int downloaderParallelism) {
this.downloaderParallelism = downloaderParallelism;
return this;
@ -361,8 +330,6 @@ public class SynchronizerConfiguration {
downloaderCheckpointTimeoutsPermitted,
downloaderChainSegmentTimeoutsPermitted,
downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism,
transactionsParallelism,
computationParallelism);

@ -23,6 +23,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.Discon
import java.util.Comparator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
@ -37,30 +38,27 @@ public class TrailingPeerLimiter implements BlockAddedObserver {
// how often we rerun the check.
private static final int RECHECK_PEERS_WHEN_BLOCK_NUMBER_MULTIPLE_OF = 100;
private final EthPeers ethPeers;
private final Blockchain blockchain;
private final long trailingPeerBlocksBehindThreshold;
private final int maxTrailingPeers;
private final Supplier<TrailingPeerRequirements> trailingPeerRequirementsCalculator;
public TrailingPeerLimiter(
final EthPeers ethPeers,
final Blockchain blockchain,
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers) {
final Supplier<TrailingPeerRequirements> trailingPeerRequirementsCalculator) {
this.ethPeers = ethPeers;
this.blockchain = blockchain;
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
this.maxTrailingPeers = maxTrailingPeers;
this.trailingPeerRequirementsCalculator = trailingPeerRequirementsCalculator;
}
public void enforceTrailingPeerLimit() {
final TrailingPeerRequirements requirements = trailingPeerRequirementsCalculator.get();
if (requirements.getMaxTrailingPeers() == Long.MAX_VALUE) {
return;
}
final long minimumHeightToBeUpToDate = requirements.getMinimumHeightToBeUpToDate();
final long maxTrailingPeers = requirements.getMaxTrailingPeers();
final List<EthPeer> trailingPeers =
ethPeers
.availablePeers()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.filter(
peer ->
peer.chainState().getEstimatedHeight() + trailingPeerBlocksBehindThreshold
< blockchain.getChainHeadBlockNumber())
.filter(peer -> peer.chainState().getEstimatedHeight() < minimumHeightToBeUpToDate)
.sorted(BY_CHAIN_HEIGHT)
.collect(Collectors.toList());

@ -0,0 +1,66 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import java.util.Objects;
import com.google.common.base.MoreObjects;
public class TrailingPeerRequirements {
public static TrailingPeerRequirements UNRESTRICTED =
new TrailingPeerRequirements(BlockHeader.GENESIS_BLOCK_NUMBER, Long.MAX_VALUE);
private final long minimumHeightToBeUpToDate;
private final long maxTrailingPeers;
public TrailingPeerRequirements(
final long minimumHeightToBeUpToDate, final long maxTrailingPeers) {
this.minimumHeightToBeUpToDate = minimumHeightToBeUpToDate;
this.maxTrailingPeers = maxTrailingPeers;
}
public long getMinimumHeightToBeUpToDate() {
return minimumHeightToBeUpToDate;
}
public long getMaxTrailingPeers() {
return maxTrailingPeers;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TrailingPeerRequirements that = (TrailingPeerRequirements) o;
return minimumHeightToBeUpToDate == that.minimumHeightToBeUpToDate
&& maxTrailingPeers == that.maxTrailingPeers;
}
@Override
public int hashCode() {
return Objects.hash(minimumHeightToBeUpToDate, maxTrailingPeers);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("minimumHeightToBeUpToDate", minimumHeightToBeUpToDate)
.add("maxTrailingPeers", maxTrailingPeers)
.toString();
}
}

@ -15,10 +15,12 @@ package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;
import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;
import tech.pegasys.pantheon.ethereum.eth.sync.TrailingPeerRequirements;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
@ -29,6 +31,7 @@ public class FastSyncDownloader<C> {
private final FastSyncActions<C> fastSyncActions;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncStateStorage fastSyncStateStorage;
private volatile Optional<TrailingPeerRequirements> trailingPeerRequirements = Optional.empty();
public FastSyncDownloader(
final FastSyncActions<C> fastSyncActions,
@ -45,12 +48,14 @@ public class FastSyncDownloader<C> {
.waitForSuitablePeers(fastSyncState)
.thenCompose(fastSyncActions::selectPivotBlock)
.thenCompose(fastSyncActions::downloadPivotBlockHeader)
.thenApply(this::updateMaxTrailingPeers)
.thenApply(this::storeState)
.thenCompose(this::downloadChainAndWorldState),
this::handleWorldStateUnavailable);
}
private CompletableFuture<FastSyncState> handleWorldStateUnavailable(final Throwable error) {
trailingPeerRequirements = Optional.empty();
if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
LOG.warn(
"Fast sync was unable to download the world state. Retrying with a new pivot block.");
@ -60,6 +65,16 @@ public class FastSyncDownloader<C> {
}
}
private FastSyncState updateMaxTrailingPeers(final FastSyncState state) {
if (state.getPivotBlockNumber().isPresent()) {
trailingPeerRequirements =
Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 0));
} else {
trailingPeerRequirements = Optional.empty();
}
return state;
}
private FastSyncState storeState(final FastSyncState state) {
fastSyncStateStorage.storeState(state);
return state;
@ -85,6 +100,14 @@ public class FastSyncDownloader<C> {
});
return CompletableFuture.allOf(worldStateFuture, chainFuture)
.thenApply(complete -> currentState);
.thenApply(
complete -> {
trailingPeerRequirements = Optional.empty();
return currentState;
});
}
public Optional<TrailingPeerRequirements> getTrailingPeerRequirements() {
return trailingPeerRequirements;
}
}

@ -48,12 +48,14 @@ public class TrailingPeerLimiterTest {
private final List<EthPeer> peers = new ArrayList<>();
private final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethPeers, blockchain, TRAILING_PEER_BLOCKS_BEHIND_THRESHOLD, MAX_TRAILING_PEERS);
ethPeers,
() ->
new TrailingPeerRequirements(
CHAIN_HEAD - TRAILING_PEER_BLOCKS_BEHIND_THRESHOLD, MAX_TRAILING_PEERS));
@Before
public void setUp() {
when(ethPeers.availablePeers()).then(invocation -> peers.stream());
when(blockchain.getChainHeadBlockNumber()).thenReturn(CHAIN_HEAD);
}
@Test

@ -26,6 +26,7 @@ import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState.EMP
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.eth.sync.TrailingPeerRequirements;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
@ -309,6 +310,55 @@ public class FastSyncDownloaderTest {
assertThat(result).isCompletedWithValue(secondDownloadPivotBlockHeaderState);
}
@Test
public void shouldNotHaveTrailingPeerRequirementsBeforePivotBlockSelected() {
when(fastSyncActions.waitForSuitablePeers(EMPTY_SYNC_STATE))
.thenReturn(new CompletableFuture<>());
downloader.start(EMPTY_SYNC_STATE);
verify(fastSyncActions).waitForSuitablePeers(EMPTY_SYNC_STATE);
assertThat(downloader.getTrailingPeerRequirements()).isEmpty();
}
@Test
public void shouldNotAllowPeersBeforePivotBlockOnceSelected() {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState))
.thenReturn(new CompletableFuture<>());
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(new CompletableFuture<>());
downloader.start(EMPTY_SYNC_STATE);
assertThat(downloader.getTrailingPeerRequirements())
.contains(new TrailingPeerRequirements(50, 0));
}
@Test
public void shouldNotHaveTrailingPeerRequirementsAfterDownloadCompletes() {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(COMPLETE);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(completedFuture(null));
final CompletableFuture<FastSyncState> result = downloader.start(EMPTY_SYNC_STATE);
assertThat(result).isDone();
assertThat(downloader.getTrailingPeerRequirements()).isEmpty();
}
private <T> CompletableFuture<T> completedExceptionally(final Throwable error) {
final CompletableFuture<T> result = new CompletableFuture<>();
result.completeExceptionally(error);

@ -56,7 +56,6 @@ public interface DefaultCommandValues {
// but we use FULL for the moment as Fast is still in progress
SyncMode DEFAULT_SYNC_MODE = SyncMode.FULL;
int DEFAULT_MAX_PEERS = 25;
int MAX_TRAILING_PEERS = Integer.MAX_VALUE;
static Path getDefaultPantheonDataPath(final Object command) {
// this property is retrieved from Gradle tasks or Pantheon running shell script.

@ -902,7 +902,6 @@ public class PantheonCommand implements DefaultCommandValues, Runnable {
private SynchronizerConfiguration buildSyncConfig() {
synchronizerConfigurationBuilder.syncMode(syncMode);
synchronizerConfigurationBuilder.maxTrailingPeers(MAX_TRAILING_PEERS);
return synchronizerConfigurationBuilder.build();
}

Loading…
Cancel
Save