Pipeline based full sync (#1291)

Introduce a pipeline based full sync process.  Currently toggled off but can be enabled via a --X option.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent fbf5db7828
commit a4b473ad6f
  1. 38
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcher.java
  2. 29
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointRange.java
  3. 28
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointRangeSource.java
  4. 67
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloadHeadersStep.java
  5. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java
  6. 26
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java
  7. 11
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  8. 33
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/ExtractTxSignaturesTask.java
  9. 50
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullImportBlockStep.java
  10. 24
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java
  11. 119
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java
  12. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java
  13. 52
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderFetcherTest.java
  14. 17
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointRangeSourceTest.java
  15. 31
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloadHeadersStepTest.java
  16. 73
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullImportBlockStepTest.java
  17. 119
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java
  18. 6
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessor.java

@ -28,36 +28,42 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CheckpointHeaderFetcher {
private static final Logger LOG = LogManager.getLogger();
private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<?> protocolSchedule;
private final EthContext ethContext;
private final Optional<BlockHeader> lastCheckpointHeader;
// The checkpoint we're aiming to reach at the end of this sync.
private final Optional<BlockHeader> finalCheckpointHeader;
private final MetricsSystem metricsSystem;
public CheckpointHeaderFetcher(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final Optional<BlockHeader> lastCheckpointHeader,
final Optional<BlockHeader> finalCheckpointHeader,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.lastCheckpointHeader = lastCheckpointHeader;
this.finalCheckpointHeader = finalCheckpointHeader;
this.metricsSystem = metricsSystem;
}
public CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders(
final EthPeer peer, final BlockHeader lastHeader) {
final EthPeer peer, final BlockHeader previousCheckpointHeader) {
final int skip = syncConfig.downloaderChainSegmentSize() - 1;
final int maximumHeaderRequestSize = syncConfig.downloaderHeaderRequestSize();
final long previousCheckpointNumber = previousCheckpointHeader.getNumber();
final int additionalHeaderCount;
if (lastCheckpointHeader.isPresent()) {
final BlockHeader targetHeader = lastCheckpointHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - lastHeader.getNumber();
if (finalCheckpointHeader.isPresent()) {
final BlockHeader targetHeader = finalCheckpointHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - previousCheckpointNumber;
if (blocksUntilTarget <= 0) {
return completedFuture(emptyList());
}
@ -70,7 +76,7 @@ public class CheckpointHeaderFetcher {
additionalHeaderCount = maximumHeaderRequestSize;
}
return requestHeaders(peer, lastHeader, additionalHeaderCount, skip);
return requestHeaders(peer, previousCheckpointHeader, additionalHeaderCount, skip);
}
private CompletableFuture<List<BlockHeader>> requestHeaders(
@ -78,6 +84,11 @@ public class CheckpointHeaderFetcher {
final BlockHeader referenceHeader,
final int headerCount,
final int skip) {
LOG.debug(
"Requesting {} checkpoint headers, starting from {}, {} blocks apart",
headerCount,
referenceHeader.getNumber(),
skip);
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
@ -100,4 +111,15 @@ public class CheckpointHeaderFetcher {
}
return headers;
}
public boolean nextCheckpointEndsAtChainHead(
final EthPeer peer, final BlockHeader previousCheckpointHeader) {
if (finalCheckpointHeader.isPresent()) {
return false;
}
final int skip = syncConfig.downloaderChainSegmentSize() - 1;
final long peerEstimatedHeight = peer.chainState().getEstimatedHeight();
final long previousCheckpointNumber = previousCheckpointHeader.getNumber();
return previousCheckpointNumber + skip >= peerEstimatedHeight;
}
}

@ -17,24 +17,38 @@ import static java.lang.Math.toIntExact;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import java.util.Objects;
import java.util.Optional;
import com.google.common.base.MoreObjects;
public class CheckpointRange {
private final BlockHeader start;
private final BlockHeader end;
private final Optional<BlockHeader> end;
public CheckpointRange(final BlockHeader start) {
this.start = start;
this.end = Optional.empty();
}
public CheckpointRange(final BlockHeader start, final BlockHeader end) {
this.start = start;
this.end = end;
this.end = Optional.of(end);
}
public BlockHeader getStart() {
return start;
}
public boolean hasEnd() {
return end.isPresent();
}
public BlockHeader getEnd() {
return end;
return end.get();
}
public int getSegmentLengthExclusive() {
return toIntExact(end.get().getNumber() - start.getNumber() - 1);
}
@Override
@ -56,13 +70,6 @@ public class CheckpointRange {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("start", start.getNumber())
.add("end", end.getNumber())
.toString();
}
public int getSegmentLength() {
return toIntExact(end.getNumber() - start.getNumber());
return MoreObjects.toStringHelper(this).add("start", start).add("end", end).toString();
}
}

@ -46,11 +46,29 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
private final Queue<CheckpointRange> retrievedRanges = new ArrayDeque<>();
private BlockHeader lastRangeEnd;
private boolean reachedEndOfCheckpoints = false;
private Optional<CompletableFuture<List<BlockHeader>>> pendingCheckpointsRequest =
Optional.empty();
private int requestFailureCount = 0;
public CheckpointRangeSource(
final CheckpointHeaderFetcher checkpointFetcher,
final SyncTargetChecker syncTargetChecker,
final EthScheduler ethScheduler,
final EthPeer peer,
final BlockHeader commonAncestor,
final int checkpointTimeoutsPermitted) {
this(
checkpointFetcher,
syncTargetChecker,
ethScheduler,
peer,
commonAncestor,
checkpointTimeoutsPermitted,
Duration.ofSeconds(5));
}
CheckpointRangeSource(
final CheckpointHeaderFetcher checkpointFetcher,
final SyncTargetChecker syncTargetChecker,
final EthScheduler ethScheduler,
@ -71,7 +89,8 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
public boolean hasNext() {
return !retrievedRanges.isEmpty()
|| (requestFailureCount < checkpointTimeoutsPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd));
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfCheckpoints);
}
@Override
@ -82,6 +101,13 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
if (pendingCheckpointsRequest.isPresent()) {
return getCheckpointRangeFromPendingRequest();
}
if (reachedEndOfCheckpoints) {
return null;
}
if (checkpointFetcher.nextCheckpointEndsAtChainHead(peer, lastRangeEnd)) {
reachedEndOfCheckpoints = true;
return new CheckpointRange(lastRangeEnd);
}
pendingCheckpointsRequest = Optional.of(getNextCheckpointHeaders());
return getCheckpointRangeFromPendingRequest();
}

@ -12,9 +12,14 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DownloadHeaderSequenceTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
@ -25,13 +30,17 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DownloadHeadersStep<C>
implements Function<CheckpointRange, CompletableFuture<CheckpointRangeHeaders>> {
private static final Logger LOG = LogManager.getLogger();
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final ValidationPolicy validationPolicy;
private final int headerRequestSize;
private final MetricsSystem metricsSystem;
public DownloadHeadersStep(
@ -39,11 +48,13 @@ public class DownloadHeadersStep<C>
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final ValidationPolicy validationPolicy,
final int headerRequestSize,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.validationPolicy = validationPolicy;
this.headerRequestSize = headerRequestSize;
this.metricsSystem = metricsSystem;
}
@ -58,22 +69,50 @@ public class DownloadHeadersStep<C>
private CompletableFuture<List<BlockHeader>> downloadHeaders(
final CheckpointRange checkpointRange) {
return DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointRange.getEnd(),
// -1 because we don't want to request the range starting header
checkpointRange.getSegmentLength() - 1,
validationPolicy,
metricsSystem)
.run();
if (checkpointRange.hasEnd()) {
LOG.debug(
"Downloading headers for range {} to {}",
checkpointRange.getStart().getNumber(),
checkpointRange.getEnd().getNumber());
if (checkpointRange.getSegmentLengthExclusive() == 0) {
// There are no extra headers to download.
return completedFuture(emptyList());
}
return DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointRange.getEnd(),
checkpointRange.getSegmentLengthExclusive(),
validationPolicy,
metricsSystem)
.run();
} else {
LOG.debug("Downloading headers starting from {}", checkpointRange.getStart().getNumber());
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
checkpointRange.getStart().getHash(),
checkpointRange.getStart().getNumber(),
headerRequestSize,
metricsSystem)
.run()
.thenApply(PeerTaskResult::getResult);
}
}
private CheckpointRangeHeaders processHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headers) {
final List<BlockHeader> headersToImport = new ArrayList<>(headers);
headersToImport.add(checkpointRange.getEnd());
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
if (checkpointRange.hasEnd()) {
final List<BlockHeader> headersToImport = new ArrayList<>(headers);
headersToImport.add(checkpointRange.getEnd());
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
} else {
List<BlockHeader> headersToImport = headers;
if (!headers.isEmpty() && headers.get(0).equals(checkpointRange.getStart())) {
headersToImport = headers.subList(1, headers.size());
}
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
}
}
}

@ -101,11 +101,11 @@ public class PipelineChainDownloader<C> implements ChainDownloader {
private CompletionStage<Void> repeatUnlessDownloadComplete(
@SuppressWarnings("unused") final Void result) {
syncState.clearSyncTarget();
if (syncTargetManager.shouldContinueDownloading()) {
return performDownload();
} else {
LOG.info("Chain download complete");
syncState.clearSyncTarget();
return completedFuture(null);
}
}

@ -60,6 +60,7 @@ public class SynchronizerConfiguration {
private final int transactionsParallelism;
private final int computationParallelism;
private final int maxTrailingPeers;
private final boolean piplineDownloaderForFullSyncEnabled;
private final long worldStateMinMillisBeforeStalling;
private SynchronizerConfiguration(
@ -81,7 +82,8 @@ public class SynchronizerConfiguration {
final int downloaderParallelism,
final int transactionsParallelism,
final int computationParallelism,
final int maxTrailingPeers) {
final int maxTrailingPeers,
final boolean piplineDownloaderForFullSyncEnabled) {
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
@ -101,6 +103,7 @@ public class SynchronizerConfiguration {
this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
this.maxTrailingPeers = maxTrailingPeers;
this.piplineDownloaderForFullSyncEnabled = piplineDownloaderForFullSyncEnabled;
}
public static Builder builder() {
@ -207,6 +210,10 @@ public class SynchronizerConfiguration {
return maxTrailingPeers;
}
public boolean isPiplineDownloaderForFullSyncEnabled() {
return piplineDownloaderForFullSyncEnabled;
}
public static class Builder {
private SyncMode syncMode = SyncMode.FULL;
private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS;
@ -364,6 +371,14 @@ public class SynchronizerConfiguration {
"Minimum time in ms without progress before considering a world state download as stalled (default: ${DEFAULT-VALUE})")
private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING;
@CommandLine.Option(
names = "--Xsynchronizer-pipeline-full-sync-enabled",
hidden = true,
defaultValue = "false",
paramLabel = "<BOOLEAN>",
description = "Enable the pipeline based chain downloader during full synchronization")
private Boolean piplineDownloaderForFullSyncEnabled = false;
public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
return this;
@ -465,6 +480,12 @@ public class SynchronizerConfiguration {
return this;
}
public Builder piplineDownloaderForFullSyncEnabled(
final Boolean piplineDownloaderForFullSyncEnabled) {
this.piplineDownloaderForFullSyncEnabled = piplineDownloaderForFullSyncEnabled;
return this;
}
public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
fastSyncPivotDistance,
@ -485,7 +506,8 @@ public class SynchronizerConfiguration {
downloaderParallelism,
transactionsParallelism,
computationParallelism,
maxTrailingPeers);
maxTrailingPeers,
piplineDownloaderForFullSyncEnabled);
}
}
}

@ -37,7 +37,6 @@ import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;
import java.time.Duration;
import java.util.Optional;
public class FastSyncDownloadPipelineFactory<C> implements DownloadPipelineFactory {
@ -100,11 +99,15 @@ public class FastSyncDownloadPipelineFactory<C> implements DownloadPipelineFacto
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
syncConfig.downloaderCheckpointTimeoutsPermitted(),
Duration.ofSeconds(5));
syncConfig.downloaderCheckpointTimeoutsPermitted());
final DownloadHeadersStep<C> downloadHeadersStep =
new DownloadHeadersStep<>(
protocolSchedule, protocolContext, ethContext, detachedValidationPolicy, metricsSystem);
protocolSchedule,
protocolContext,
ethContext,
detachedValidationPolicy,
headerRequestSize,
metricsSystem);
final CheckpointHeaderValidationStep<C> validateHeadersJoinUpStep =
new CheckpointHeaderValidationStep<>(
protocolSchedule, protocolContext, detachedValidationPolicy);

@ -0,0 +1,33 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
public class ExtractTxSignaturesTask implements Function<List<Block>, Stream<Block>> {
@Override
public Stream<Block> apply(final List<Block> blocks) {
return blocks.stream().map(this::extractSignatures);
}
private Block extractSignatures(final Block block) {
block.getBody().getTransactions().forEach(Transaction::getSender);
return block;
}
}

@ -0,0 +1,50 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FullImportBlockStep<C> implements Consumer<Block> {
private static final Logger LOG = LogManager.getLogger();
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
public FullImportBlockStep(
final ProtocolSchedule<C> protocolSchedule, final ProtocolContext<C> protocolContext) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
}
@Override
public void accept(final Block block) {
final long blockNumber = block.getHeader().getNumber();
final BlockImporter<C> importer =
protocolSchedule.getByBlockNumber(blockNumber).getBlockImporter();
if (!importer.importBlock(protocolContext, block, HeaderValidationMode.SKIP_DETACHED)) {
throw new InvalidBlockException("Failed to import block", blockNumber, block.getHash());
}
if (blockNumber % 200 == 0) {
LOG.info("Import reached block {}", blockNumber);
}
}
}

@ -17,12 +17,17 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.CheckpointHeaderManager;
import tech.pegasys.pantheon.ethereum.eth.sync.EthTaskChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.PipelineChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FullSyncChainDownloader {
private static final Logger LOG = LogManager.getLogger();
private FullSyncChainDownloader() {}
@ -33,12 +38,27 @@ public class FullSyncChainDownloader {
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
final FullSyncTargetManager<C> syncTargetManager =
new FullSyncTargetManager<>(
config, protocolSchedule, protocolContext, ethContext, metricsSystem);
if (config.isPiplineDownloaderForFullSyncEnabled()) {
LOG.info("Using PipelineChainDownloader");
return new PipelineChainDownloader<>(
syncState,
syncTargetManager,
new FullSyncDownloadPipelineFactory<>(
config, protocolSchedule, protocolContext, ethContext, metricsSystem),
ethContext.getScheduler(),
metricsSystem);
}
return new EthTaskChainDownloader<>(
config,
ethContext,
syncState,
new FullSyncTargetManager<>(
config, protocolSchedule, protocolContext, ethContext, metricsSystem),
syncTargetManager,
new CheckpointHeaderManager<>(
config, protocolContext, ethContext, syncState, protocolSchedule, metricsSystem),
new FullSyncBlockImportTaskFactory<>(

@ -0,0 +1,119 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.CheckpointHeaderFetcher;
import tech.pegasys.pantheon.ethereum.eth.sync.CheckpointHeaderValidationStep;
import tech.pegasys.pantheon.ethereum.eth.sync.CheckpointRangeSource;
import tech.pegasys.pantheon.ethereum.eth.sync.DownloadBodiesStep;
import tech.pegasys.pantheon.ethereum.eth.sync.DownloadHeadersStep;
import tech.pegasys.pantheon.ethereum.eth.sync.DownloadPipelineFactory;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;
import java.util.Optional;
public class FullSyncDownloadPipelineFactory<C> implements DownloadPipelineFactory {
private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final ValidationPolicy detachedValidationPolicy =
() -> HeaderValidationMode.DETACHED_ONLY;
private final BetterSyncTargetEvaluator betterSyncTargetEvaluator;
public FullSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
}
@Override
public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target) {
final int downloaderParallelism = syncConfig.downloaderParallelism();
final int headerRequestSize = syncConfig.downloaderHeaderRequestSize();
final int singleHeaderBufferSize = headerRequestSize * downloaderParallelism;
final CheckpointRangeSource checkpointRangeSource =
new CheckpointRangeSource(
new CheckpointHeaderFetcher(
syncConfig, protocolSchedule, ethContext, Optional.empty(), metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
syncConfig.downloaderCheckpointTimeoutsPermitted());
final DownloadHeadersStep<C> downloadHeadersStep =
new DownloadHeadersStep<>(
protocolSchedule,
protocolContext,
ethContext,
detachedValidationPolicy,
headerRequestSize,
metricsSystem);
final CheckpointHeaderValidationStep<C> validateHeadersJoinUpStep =
new CheckpointHeaderValidationStep<>(
protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep<C> downloadBodiesStep =
new DownloadBodiesStep<>(protocolSchedule, ethContext, metricsSystem);
final ExtractTxSignaturesTask extractTxSignaturesTask = new ExtractTxSignaturesTask();
final FullImportBlockStep<C> importBlockStep =
new FullImportBlockStep<>(protocolSchedule, protocolContext);
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
checkpointRangeSource,
downloaderParallelism,
metricsSystem.createLabelledCounter(
MetricCategory.SYNCHRONIZER,
"chain_download_pipeline_processed_total",
"Number of entries process by each chain download pipeline stage",
"step",
"action"))
.thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism)
.thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize)
.inBatches(headerRequestSize)
.thenProcessAsyncOrdered("downloadBodies", downloadBodiesStep, downloaderParallelism)
.thenFlatMap("extractTxSignatures", extractTxSignaturesTask, singleHeaderBufferSize)
.andFinishWith("importBlock", importBlockStep);
}
private boolean shouldContinueDownloadingFromPeer(
final EthPeer peer, final BlockHeader lastCheckpointHeader) {
final boolean caughtUpToPeer =
peer.chainState().getEstimatedHeight() <= lastCheckpointHeader.getNumber();
return !peer.isDisconnected()
&& !caughtUpToPeer
&& !betterSyncTargetEvaluator.shouldSwitchSyncTarget(peer);
}
}

@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Arrays.asList;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
@ -82,6 +83,7 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
this.validationPolicy = validationPolicy;
this.metricsSystem = metricsSystem;
checkArgument(segmentLength > 0, "Segment length must not be 0");
startingBlockNumber = referenceHeader.getNumber() - segmentLength;
headers = new BlockHeader[segmentLength];
lastFilledHeaderIndex = segmentLength;

@ -42,6 +42,8 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class CheckpointHeaderFetcherTest {
private static final int SEGMENT_SIZE = 5;
private static Blockchain blockchain;
private static ProtocolSchedule<Void> protocolSchedule;
private static ProtocolContext<Void> protocolContext;
@ -66,7 +68,9 @@ public class CheckpointHeaderFetcherTest {
blockchain, protocolContext.getWorldStateArchive(), () -> false);
responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
respondingPeer =
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, blockchain.getChainHeadBlockNumber());
}
@Test
@ -141,12 +145,56 @@ public class CheckpointHeaderFetcherTest {
assertThat(result).isCompletedWithValue(emptyList());
}
@Test
public void nextCheckpointShouldEndAtChainHeadWhenNextCheckpointHeaderIsAfterHead() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.empty());
assertThat(
checkpointHeaderFetcher.nextCheckpointEndsAtChainHead(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE + 1)))
.isTrue();
}
@Test
public void nextCheckpointShouldNotEndAtChainHeadWhenAFinalCheckpointHeaderIsSpecified() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(remoteChainHeight)));
assertThat(
checkpointHeaderFetcher.nextCheckpointEndsAtChainHead(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE + 1)))
.isFalse();
}
@Test
public void shouldReturnRemoteChainHeadWhenNextCheckpointHeaderIsTheRemoteHead() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.empty());
assertThat(
checkpointHeaderFetcher.nextCheckpointEndsAtChainHead(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE)))
.isFalse();
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE));
respondingPeer.respond(responder);
assertThat(result).isCompletedWithValue(singletonList(header(remoteChainHeight)));
}
private CheckpointHeaderFetcher createCheckpointHeaderFetcher(
final Optional<BlockHeader> targetHeader) {
final EthContext ethContext = ethProtocolManager.ethContext();
return new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderChainSegmentSize(SEGMENT_SIZE)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,

@ -174,12 +174,14 @@ public class CheckpointRangeSourceTest {
assertThat(source.next()).isEqualTo(new CheckpointRange(commonAncestor, header(15)));
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, commonAncestor);
assertThat(source.next()).isEqualTo(new CheckpointRange(header(15), header(20)));
verifyNoMoreInteractions(checkpointFetcher);
assertThat(source.next()).isEqualTo(new CheckpointRange(header(20), header(25)));
verify(checkpointFetcher).getNextCheckpointHeaders(peer, header(20));
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, header(20));
assertThat(source.next()).isEqualTo(new CheckpointRange(header(25), header(30)));
verifyNoMoreInteractions(checkpointFetcher);
@ -209,6 +211,7 @@ public class CheckpointRangeSourceTest {
assertThat(source.next()).isNull();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, commonAncestor);
assertThat(source.next()).isNull();
verifyNoMoreInteractions(checkpointFetcher);
@ -241,6 +244,20 @@ public class CheckpointRangeSourceTest {
verify(checkpointFetcher, times(2)).getNextCheckpointHeaders(peer, commonAncestor);
}
@Test
public void shouldReturnUnboundedCheckpointRangeWhenNextCheckpointEndsAtChainHead() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, commonAncestor))
.thenReturn(true);
when(checkpointFetcher.nextCheckpointEndsAtChainHead(peer, commonAncestor)).thenReturn(true);
assertThat(source).hasNext();
assertThat(source.next()).isEqualTo(new CheckpointRange(commonAncestor));
// Once we've sent an open-ended range we shouldn't have any more ranges.
assertThat(source).isExhausted();
assertThat(source.next()).isNull();
}
private BlockHeader header(final int number) {
return new BlockHeaderTestFixture().number(number).buildHeader();
}

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync;
import static org.assertj.core.api.Assertions.assertThat;
import static tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.blockchainResponder;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
@ -36,6 +37,7 @@ import org.junit.Test;
public class DownloadHeadersStepTest {
private static final int HEADER_REQUEST_SIZE = 200;
private static ProtocolSchedule<Void> protocolSchedule;
private static ProtocolContext<Void> protocolContext;
private static MutableBlockchain blockchain;
@ -63,6 +65,7 @@ public class DownloadHeadersStepTest {
protocolContext,
ethProtocolManager.ethContext(),
() -> HeaderValidationMode.DETACHED_ONLY,
HEADER_REQUEST_SIZE,
new NoOpMetricsSystem());
checkpointRange =
@ -75,7 +78,7 @@ public class DownloadHeadersStepTest {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<CheckpointRangeHeaders> result = downloader.apply(checkpointRange);
peer.respond(RespondingEthPeer.blockchainResponder(blockchain));
peer.respond(blockchainResponder(blockchain));
// The start of the range should have been imported as part of the previous batch hence 2-10.
assertThat(result)
@ -92,11 +95,35 @@ public class DownloadHeadersStepTest {
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
peer.respond(RespondingEthPeer.blockchainResponder(blockchain));
peer.respond(blockchainResponder(blockchain));
assertThat(EthProtocolManagerTestUtil.getPendingFuturesCount(ethProtocolManager)).isZero();
}
@Test
public void shouldReturnOnlyEndHeaderWhenCheckpointRangeHasLengthOfOne() {
final CheckpointRange checkpointRange =
new CheckpointRange(blockchain.getBlockHeader(3).get(), blockchain.getBlockHeader(4).get());
final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);
assertThat(result)
.isCompletedWithValue(new CheckpointRangeHeaders(checkpointRange, headersFromChain(4, 4)));
}
@Test
public void shouldGetRemainingHeadersWhenRangeHasNoEnd() {
final CheckpointRange checkpointRange = new CheckpointRange(blockchain.getBlockHeader(3).get());
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);
peer.respond(blockchainResponder(blockchain));
assertThat(result)
.isCompletedWithValue(new CheckpointRangeHeaders(checkpointRange, headersFromChain(4, 19)));
}
private List<BlockHeader> headersFromChain(final long startNumber, final long endNumber) {
final List<BlockHeader> headers = new ArrayList<>();
for (long i = startNumber; i <= endNumber; i++) {

@ -0,0 +1,73 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.SKIP_DETACHED;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class FullImportBlockStepTest {
@Mock private ProtocolSchedule<Void> protocolSchedule;
@Mock private ProtocolSpec<Void> protocolSpec;
@Mock private ProtocolContext<Void> protocolContext;
@Mock private BlockImporter<Void> blockImporter;
private final BlockDataGenerator gen = new BlockDataGenerator();
private FullImportBlockStep<Void> importBlocksStep;
@Before
public void setUp() {
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getBlockImporter()).thenReturn(blockImporter);
importBlocksStep = new FullImportBlockStep<>(protocolSchedule, protocolContext);
}
@Test
public void shouldImportBlock() {
final Block block = gen.block();
when(blockImporter.importBlock(protocolContext, block, SKIP_DETACHED)).thenReturn(true);
importBlocksStep.accept(block);
verify(protocolSchedule).getByBlockNumber(block.getHeader().getNumber());
verify(blockImporter).importBlock(protocolContext, block, SKIP_DETACHED);
}
@Test
public void shouldThrowExceptionWhenValidationFails() {
final Block block = gen.block();
when(blockImporter.importBlock(protocolContext, block, SKIP_DETACHED)).thenReturn(false);
assertThatThrownBy(() -> importBlocksStep.accept(block))
.isInstanceOf(InvalidBlockException.class);
}
}

@ -13,6 +13,8 @@
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assumptions.assumeThatObject;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
@ -58,9 +60,24 @@ import java.util.function.Function;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class FullSyncChainDownloaderTest {
@Parameter public boolean usePipelineDownloader;
@Parameter(1)
public String name;
@Parameters(name = "{1}")
public static Object[][] params() {
return new Object[][] {{false, "EthTask"}, {true, "Pipeline"}};
}
protected ProtocolSchedule<Void> protocolSchedule;
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
@ -93,19 +110,21 @@ public class FullSyncChainDownloaderTest {
syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());
}
// A few tests break encapsulation and access the "current task", hence casting to a concrete type
@SuppressWarnings("unchecked")
private EthTaskChainDownloader<Void> downloader(final SynchronizerConfiguration syncConfig) {
return (EthTaskChainDownloader<Void>)
FullSyncChainDownloader.create(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) {
return FullSyncChainDownloader.create(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
}
private EthTaskChainDownloader<Void> downloader() {
final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().build();
private ChainDownloader downloader() {
final SynchronizerConfiguration syncConfig = syncConfigBuilder().build();
return downloader(syncConfig);
}
private SynchronizerConfiguration.Builder syncConfigBuilder() {
return SynchronizerConfiguration.builder()
.piplineDownloaderForFullSyncEnabled(usePipelineDownloader);
}
@Test
public void syncsToBetterChain_multipleSegments() {
otherBlockchainSetup.importFirstBlocks(15);
@ -118,7 +137,7 @@ public class FullSyncChainDownloaderTest {
final Responder responder = RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().downloaderChainSegmentSize(10).build();
syncConfigBuilder().downloaderChainSegmentSize(10).build();
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
@ -144,7 +163,7 @@ public class FullSyncChainDownloaderTest {
final Responder responder = RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().downloaderChainSegmentSize(10).build();
syncConfigBuilder().downloaderChainSegmentSize(10).build();
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
@ -170,7 +189,7 @@ public class FullSyncChainDownloaderTest {
final Responder responder = RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().downloaderChainSegmentSize(4).build();
syncConfigBuilder().downloaderChainSegmentSize(4).build();
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
@ -229,12 +248,15 @@ public class FullSyncChainDownloaderTest {
final Responder responder = RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().downloaderChainSegmentSize(10).build();
syncConfigBuilder().downloaderChainSegmentSize(10).build();
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
peer.respondWhileOtherThreadsWork(
responder, () -> localBlockchain.getChainHeadBlockNumber() < targetBlock);
responder,
() ->
localBlockchain.getChainHeadBlockNumber() < targetBlock
|| syncState.syncTarget().isPresent());
// Synctarget should not exist as chain has fully downloaded.
assertThat(syncState.syncTarget().isPresent()).isFalse();
@ -295,11 +317,11 @@ public class FullSyncChainDownloaderTest {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, localTd.plus(100), 50);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
syncConfigBuilder()
.downloaderChainSegmentSize(5)
.downloaderChangeTargetThresholdByHeight(10)
.build();
final EthTaskChainDownloader<Void> downloader = downloader(syncConfig);
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
// Process until the sync target is selected
@ -310,11 +332,7 @@ public class FullSyncChainDownloaderTest {
// Update Peer B so that its a better target and send some responses to push logic forward
peerB.getEthPeer().chainState().update(gen.hash(), 100);
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, peerA, peerB);
}
processUntilSyncTargetChecked(responder, downloader, peerA, peerB);
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(peerB.getEthPeer());
@ -332,11 +350,11 @@ public class FullSyncChainDownloaderTest {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, localTd.plus(100));
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
syncConfigBuilder()
.downloaderChainSegmentSize(5)
.downloaderChangeTargetThresholdByHeight(1000)
.build();
final EthTaskChainDownloader<Void> downloader = downloader(syncConfig);
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
// Process until the sync target is selected
@ -347,11 +365,7 @@ public class FullSyncChainDownloaderTest {
// Update otherPeer so that its a better target, but under the threshold to switch
otherPeer.getEthPeer().chainState().update(gen.hash(), 100);
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, bestPeer, otherPeer);
}
processUntilSyncTargetChecked(responder, downloader, bestPeer, otherPeer);
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(bestPeer.getEthPeer());
@ -369,11 +383,11 @@ public class FullSyncChainDownloaderTest {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, localTd.plus(100));
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
syncConfigBuilder()
.downloaderChainSegmentSize(5)
.downloaderChangeTargetThresholdByTd(UInt256.of(10))
.build();
final EthTaskChainDownloader<Void> downloader = downloader(syncConfig);
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
// Process until the sync target is selected
@ -388,11 +402,7 @@ public class FullSyncChainDownloaderTest {
.updateForAnnouncedBlock(
gen.header(), localBlockchain.getChainHead().getTotalDifficulty().plus(300));
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, peerA, peerB);
}
processUntilSyncTargetChecked(responder, downloader, peerA, peerB);
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(peerB.getEthPeer());
@ -414,11 +424,11 @@ public class FullSyncChainDownloaderTest {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, localTd.plus(100));
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
syncConfigBuilder()
.downloaderChainSegmentSize(5)
.downloaderChangeTargetThresholdByTd(UInt256.of(100_000_000L))
.build();
final EthTaskChainDownloader<Void> downloader = downloader(syncConfig);
final ChainDownloader downloader = downloader(syncConfig);
downloader.start();
// Process until the sync target is selected
@ -432,11 +442,7 @@ public class FullSyncChainDownloaderTest {
bestPeer.getEthPeer().chainState().updateForAnnouncedBlock(newBestBlock, localTd.plus(201));
otherPeer.getEthPeer().chainState().updateForAnnouncedBlock(newBestBlock, localTd.plus(300));
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, bestPeer, otherPeer);
}
processUntilSyncTargetChecked(responder, downloader, bestPeer, otherPeer);
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(bestPeer.getEthPeer());
@ -452,10 +458,7 @@ public class FullSyncChainDownloaderTest {
assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber());
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build();
syncConfigBuilder().downloaderChainSegmentSize(5).downloaderHeadersRequestSize(3).build();
final ChainDownloader downloader = downloader(syncConfig);
final long bestPeerChainHead = otherBlockchain.getChainHeadBlockNumber();
@ -521,10 +524,7 @@ public class FullSyncChainDownloaderTest {
assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber());
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build();
syncConfigBuilder().downloaderChainSegmentSize(5).downloaderHeadersRequestSize(3).build();
final ChainDownloader downloader = downloader(syncConfig);
// Setup the best peer we should use as our sync target
@ -559,7 +559,12 @@ public class FullSyncChainDownloaderTest {
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(bestPeer.getEthPeer());
int count = 0;
while (localBlockchain.getChainHeadBlockNumber() < bestPeerChainHead) {
if (count > 10_000) {
fail("Did not reach chain head soon enough");
}
count++;
// Check that any requests for checkpoint headers are only sent to the best peer
final long checkpointRequestsToOtherPeers =
otherPeers.stream()
@ -596,4 +601,20 @@ public class FullSyncChainDownloaderTest {
}
return shortChain;
}
@SuppressWarnings("unchecked")
private void processUntilSyncTargetChecked(
final Responder responder,
final ChainDownloader rawDownloader,
final RespondingEthPeer... peers) {
// This breaks encapsulation and depends on the particular semantics of EthTaskChainDownloader
// These cases are now handled by unit tests in BetterSyncTargetEvaluatorTest
assumeThatObject(rawDownloader).isInstanceOf(EthTaskChainDownloader.class);
final EthTaskChainDownloader<Void> downloader = (EthTaskChainDownloader<Void>) rawDownloader;
// Process through first task cycle
final CompletableFuture<?> firstTask = downloader.getCurrentTask();
while (downloader.getCurrentTask() == firstTask) {
RespondingEthPeer.respondOnce(responder, peers);
}
}
}

@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Stream;
@ -28,10 +27,7 @@ class FlatMapProcessor<I, O> implements Processor<I, O> {
public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe) {
final I value = inputPipe.get();
if (value != null) {
final Iterator<O> outputs = mapper.apply(value).iterator();
while (outputs.hasNext()) {
outputPipe.put(outputs.next());
}
mapper.apply(value).forEach(outputPipe::put);
}
}
}

Loading…
Cancel
Save