Pipeline chain download - fetch and import data (#1207)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 362219d908
commit bac2828313
  1. 66
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStep.java
  2. 6
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointRange.java
  3. 72
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointRangeHeaders.java
  4. 47
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloadBodiesStep.java
  5. 79
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloadHeadersStep.java
  6. 22
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java
  7. 37
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/BlockWithReceipts.java
  8. 64
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java
  9. 60
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastImportBlocksStep.java
  10. 12
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java
  11. 93
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  12. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java
  13. 20
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  14. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java
  15. 91
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStepTest.java
  16. 107
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloadHeadersStepTest.java
  17. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java
  18. 88
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java
  19. 95
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastImportBlocksStepTest.java
  20. 11
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java
  21. 14
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java
  22. 22
      util/src/main/java/tech/pegasys/pantheon/util/FutureUtils.java
  23. 35
      util/src/test/java/tech/pegasys/pantheon/util/FutureUtilsTest.java

@ -0,0 +1,66 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import java.util.function.Function;
import java.util.stream.Stream;
public class CheckpointHeaderValidationStep<C>
implements Function<CheckpointRangeHeaders, Stream<BlockHeader>> {
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final ValidationPolicy validationPolicy;
public CheckpointHeaderValidationStep(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final ValidationPolicy validationPolicy) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.validationPolicy = validationPolicy;
}
@Override
public Stream<BlockHeader> apply(final CheckpointRangeHeaders checkpointRangeHeaders) {
final BlockHeader expectedParent = checkpointRangeHeaders.getCheckpointRange().getStart();
final BlockHeader firstHeaderToImport = checkpointRangeHeaders.getFirstHeaderToImport();
if (isValid(expectedParent, firstHeaderToImport)) {
return checkpointRangeHeaders.getHeadersToImport().stream();
} else {
throw new InvalidBlockException(
"Provided first header does not connect to last header.",
expectedParent.getNumber(),
expectedParent.getHash());
}
}
private boolean isValid(final BlockHeader expectedParent, final BlockHeader firstHeaderToImport) {
final BlockHeaderValidator<C> validator =
protocolSchedule
.getByBlockNumber(firstHeaderToImport.getNumber())
.getBlockHeaderValidator();
return validator.validateHeader(
firstHeaderToImport,
expectedParent,
protocolContext,
validationPolicy.getValidationModeForNextBlock());
}
}

@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import static java.lang.Math.toIntExact;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import java.util.Objects;
@ -59,4 +61,8 @@ public class CheckpointRange {
.add("end", end.getNumber())
.toString();
}
public int getSegmentLength() {
return toIntExact(end.getNumber() - start.getNumber());
}
}

@ -0,0 +1,72 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import static com.google.common.base.Preconditions.checkArgument;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import java.util.List;
import java.util.Objects;
import com.google.common.base.MoreObjects;
public class CheckpointRangeHeaders {
private final CheckpointRange checkpointRange;
private final List<BlockHeader> headersToImport;
public CheckpointRangeHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headersToImport) {
checkArgument(!headersToImport.isEmpty(), "Must have at least one header to import");
this.checkpointRange = checkpointRange;
this.headersToImport = headersToImport;
}
public CheckpointRange getCheckpointRange() {
return checkpointRange;
}
public List<BlockHeader> getHeadersToImport() {
return headersToImport;
}
public BlockHeader getFirstHeaderToImport() {
return headersToImport.get(0);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CheckpointRangeHeaders that = (CheckpointRangeHeaders) o;
return Objects.equals(checkpointRange, that.checkpointRange)
&& Objects.equals(headersToImport, that.headersToImport);
}
@Override
public int hashCode() {
return Objects.hash(checkpointRange, headersToImport);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("checkpointRange", checkpointRange)
.add("headersToImport", headersToImport)
.toString();
}
}

@ -0,0 +1,47 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class DownloadBodiesStep<C>
implements Function<List<BlockHeader>, CompletableFuture<List<Block>>> {
private final ProtocolSchedule<C> protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
public DownloadBodiesStep(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
}
@Override
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
}
}

@ -0,0 +1,79 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DownloadHeaderSequenceTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.FutureUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class DownloadHeadersStep<C>
implements Function<CheckpointRange, CompletableFuture<CheckpointRangeHeaders>> {
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final ValidationPolicy validationPolicy;
private final MetricsSystem metricsSystem;
public DownloadHeadersStep(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final ValidationPolicy validationPolicy,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.validationPolicy = validationPolicy;
this.metricsSystem = metricsSystem;
}
@Override
public CompletableFuture<CheckpointRangeHeaders> apply(final CheckpointRange checkpointRange) {
final CompletableFuture<List<BlockHeader>> taskFuture = downloadHeaders(checkpointRange);
final CompletableFuture<CheckpointRangeHeaders> processedFuture =
taskFuture.thenApply(headers -> processHeaders(checkpointRange, headers));
FutureUtils.propagateCancellation(processedFuture, taskFuture);
return processedFuture;
}
private CompletableFuture<List<BlockHeader>> downloadHeaders(
final CheckpointRange checkpointRange) {
return DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointRange.getEnd(),
// -1 because we don't want to request the range starting header
checkpointRange.getSegmentLength() - 1,
validationPolicy,
metricsSystem)
.run();
}
private CheckpointRangeHeaders processHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headers) {
final List<BlockHeader> headersToImport = new ArrayList<>(headers);
headersToImport.add(checkpointRange.getEnd());
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
}
}

@ -18,6 +18,10 @@ import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
import java.util.Optional;
@ -37,15 +41,27 @@ public class PipelineChainDownloader<C> implements ChainDownloader {
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final Counter pipelineCompleteCounter;
private final Counter pipelineErrorCounter;
private Pipeline<?> currentDownloadPipeline;
public PipelineChainDownloader(
final SyncTargetManager<C> syncTargetManager,
final DownloadPipelineFactory downloadPipelineFactory,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final MetricsSystem metricsSystem) {
this.syncTargetManager = syncTargetManager;
this.downloadPipelineFactory = downloadPipelineFactory;
this.scheduler = scheduler;
final LabelledMetric<Counter> labelledCounter =
metricsSystem.createLabelledCounter(
MetricCategory.SYNCHRONIZER,
"chain_download_pipeline_restarts",
"Number of times the chain download pipeline has been restarted",
"reason");
pipelineCompleteCounter = labelledCounter.labels("complete");
pipelineErrorCounter = labelledCounter.labels("error");
}
@Override
@ -72,7 +88,8 @@ public class PipelineChainDownloader<C> implements ChainDownloader {
private CompletableFuture<Void> selectSyncTargetAndDownload() {
return syncTargetManager
.findSyncTarget(Optional.empty())
.thenCompose(this::startDownloadForSyncTarget);
.thenCompose(this::startDownloadForSyncTarget)
.thenRun(pipelineCompleteCounter::inc);
}
private CompletionStage<Void> repeatUnlessDownloadComplete(
@ -87,6 +104,7 @@ public class PipelineChainDownloader<C> implements ChainDownloader {
private CompletionStage<Void> handleFailedDownload(final Throwable error) {
LOG.debug("Chain download failed. Will restart if required.", error);
pipelineErrorCounter.inc();
if (!cancelled.get() && syncTargetManager.shouldContinueDownloading()) {
// Drop the error, allowing the normal looping logic to retry.
return completedFuture(null);

@ -14,9 +14,13 @@ package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import java.util.List;
import java.util.Objects;
import com.google.common.base.MoreObjects;
class BlockWithReceipts {
private final Block block;
@ -38,4 +42,37 @@ class BlockWithReceipts {
public List<TransactionReceipt> getReceipts() {
return receipts;
}
public long getNumber() {
return block.getHeader().getNumber();
}
public Hash getHash() {
return block.getHash();
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final BlockWithReceipts that = (BlockWithReceipts) o;
return Objects.equals(block, that.block) && Objects.equals(receipts, that.receipts);
}
@Override
public int hashCode() {
return Objects.hash(block, receipts);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("block", block)
.add("receipts", receipts)
.toString();
}
}

@ -0,0 +1,64 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.FutureUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class DownloadReceiptsStep
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> {
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
public DownloadReceiptsStep(final EthContext ethContext, final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
}
@Override
public CompletableFuture<List<BlockWithReceipts>> apply(final List<Block> blocks) {
final List<BlockHeader> headers = blocks.stream().map(Block::getHeader).collect(toList());
final CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> getReceipts =
GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run();
final CompletableFuture<List<BlockWithReceipts>> combineWithBlocks =
getReceipts.thenApply(
receiptsByHeader -> combineBlocksAndReceipts(blocks, receiptsByHeader));
FutureUtils.propagateCancellation(combineWithBlocks, getReceipts);
return combineWithBlocks;
}
private List<BlockWithReceipts> combineBlocksAndReceipts(
final List<Block> blocks, final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader) {
return blocks.stream()
.map(
block -> {
final List<TransactionReceipt> receipts =
receiptsByHeader.getOrDefault(block.getHeader(), emptyList());
return new BlockWithReceipts(block, receipts);
})
.collect(toList());
}
}

@ -0,0 +1,60 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import java.util.List;
import java.util.function.Consumer;
public class FastImportBlocksStep<C> implements Consumer<List<BlockWithReceipts>> {
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final ValidationPolicy validationPolicy;
public FastImportBlocksStep(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final ValidationPolicy validationPolicy) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.validationPolicy = validationPolicy;
}
@Override
public void accept(final List<BlockWithReceipts> blocksWithReceipts) {
for (final BlockWithReceipts blockWithReceipts : blocksWithReceipts) {
if (!importBlock(blockWithReceipts)) {
throw new InvalidBlockException(
"Failed to import block",
blockWithReceipts.getHeader().getNumber(),
blockWithReceipts.getHash());
}
}
}
private boolean importBlock(final BlockWithReceipts blockWithReceipts) {
final BlockImporter<C> importer =
protocolSchedule.getByBlockNumber(blockWithReceipts.getNumber()).getBlockImporter();
return importer.fastImportBlock(
protocolContext,
blockWithReceipts.getBlock(),
blockWithReceipts.getReceipts(),
validationPolicy.getValidationModeForNextBlock());
}
}

@ -45,9 +45,15 @@ public class FastSyncChainDownloader {
if (USE_PIPELINE_DOWNLOADER) {
return new PipelineChainDownloader<>(
syncTargetManager,
new FastSyncDownloadPipelineFactory(
config, protocolSchedule, ethContext, pivotBlockHeader, metricsSystem),
ethContext.getScheduler());
new FastSyncDownloadPipelineFactory<>(
config,
protocolSchedule,
protocolContext,
ethContext,
pivotBlockHeader,
metricsSystem),
ethContext.getScheduler(),
metricsSystem);
}
return new EthTaskChainDownloader<>(

@ -12,15 +12,26 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.DETACHED_ONLY;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.LIGHT_DETACHED_ONLY;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.LIGHT_SKIP_DETACHED;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.SKIP_DETACHED;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.CheckpointHeaderFetcher;
import tech.pegasys.pantheon.ethereum.eth.sync.CheckpointHeaderValidationStep;
import tech.pegasys.pantheon.ethereum.eth.sync.CheckpointRangeSource;
import tech.pegasys.pantheon.ethereum.eth.sync.DownloadBodiesStep;
import tech.pegasys.pantheon.ethereum.eth.sync.DownloadHeadersStep;
import tech.pegasys.pantheon.ethereum.eth.sync.DownloadPipelineFactory;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
@ -29,54 +40,98 @@ import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;
import java.time.Duration;
import java.util.Optional;
public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory {
public class FastSyncDownloadPipelineFactory<C> implements DownloadPipelineFactory {
private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<?> protocolSchedule;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final BlockHeader pivotBlockHeader;
private final MetricsSystem metricsSystem;
private final FastSyncValidationPolicy attachedValidationPolicy;
private final FastSyncValidationPolicy detachedValidationPolicy;
public FastSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<?> protocolSchedule,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final BlockHeader pivotBlockHeader,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.pivotBlockHeader = pivotBlockHeader;
this.metricsSystem = metricsSystem;
final LabelledMetric<Counter> fastSyncValidationCounter =
metricsSystem.createLabelledCounter(
MetricCategory.SYNCHRONIZER,
"fast_sync_validation_mode",
"Number of blocks validated using light vs full validation during fast sync",
"validationMode");
attachedValidationPolicy =
new FastSyncValidationPolicy(
this.syncConfig.fastSyncFullValidationRate(),
LIGHT_SKIP_DETACHED,
SKIP_DETACHED,
fastSyncValidationCounter);
detachedValidationPolicy =
new FastSyncValidationPolicy(
this.syncConfig.fastSyncFullValidationRate(),
LIGHT_DETACHED_ONLY,
DETACHED_ONLY,
fastSyncValidationCounter);
}
@Override
public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target) {
final int downloaderParallelism = syncConfig.downloaderParallelism();
final int headerRequestSize = syncConfig.downloaderHeaderRequestSize();
final int singleHeaderBufferSize = downloaderParallelism * headerRequestSize * 10;
final int singleHeaderBufferSize = headerRequestSize * downloaderParallelism;
final CheckpointRangeSource checkpointRangeSource =
new CheckpointRangeSource(
new CheckpointHeaderFetcher(
syncConfig,
protocolSchedule,
ethContext,
Optional.of(pivotBlockHeader),
metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
syncConfig.downloaderCheckpointTimeoutsPermitted(),
Duration.ofSeconds(5));
final DownloadHeadersStep<C> downloadHeadersStep =
new DownloadHeadersStep<>(
protocolSchedule, protocolContext, ethContext, detachedValidationPolicy, metricsSystem);
final CheckpointHeaderValidationStep<C> validateHeadersJoinUpStep =
new CheckpointHeaderValidationStep<>(
protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep<C> downloadBodiesStep =
new DownloadBodiesStep<>(protocolSchedule, ethContext, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(ethContext, metricsSystem);
final FastImportBlocksStep<C> importBlockStep =
new FastImportBlocksStep<>(protocolSchedule, protocolContext, attachedValidationPolicy);
// TODO: Use async preserving order when that's ready.
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
new CheckpointRangeSource(
new CheckpointHeaderFetcher(
syncConfig,
protocolSchedule,
ethContext,
Optional.of(pivotBlockHeader),
metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
syncConfig.downloaderCheckpointTimeoutsPermitted(),
Duration.ofSeconds(5)),
singleHeaderBufferSize,
checkpointRangeSource,
downloaderParallelism,
metricsSystem.createLabelledCounter(
MetricCategory.SYNCHRONIZER,
"chain_download_pipeline_processed_total",
"Number of entries process by each chain download pipeline stage",
"step",
"action"))
.andFinishWith("complete", result -> {});
.thenProcessAsync("downloadHeaders", downloadHeadersStep, downloaderParallelism)
.thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize)
.inBatches(headerRequestSize)
.thenProcessAsync("downloadBodies", downloadBodiesStep, downloaderParallelism)
.thenProcessAsync("downloadReceipts", downloadReceiptsStep, downloaderParallelism)
.andFinishWith("importBlock", importBlockStep);
}
private boolean shouldContinueDownloadingFromPeer(

@ -53,6 +53,10 @@ public class DeterministicEthScheduler extends EthScheduler {
executors.forEach(MockExecutorService::runPendingFutures);
}
public long getPendingFuturesCount() {
return executors.stream().mapToLong(MockExecutorService::getPendingFuturesCount).sum();
}
public void disableAutoRun() {
executors.forEach(e -> e.setAutoRun(false));
}

@ -76,7 +76,7 @@ public class EthProtocolManagerTestUtil {
// Utility to prevent scheduler from automatically running submitted tasks
public static void disableEthSchedulerAutoRun(final EthProtocolManager ethProtocolManager) {
EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler();
final EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler();
checkArgument(
scheduler instanceof DeterministicEthScheduler,
"EthProtocolManager must be set up with "
@ -89,7 +89,7 @@ public class EthProtocolManagerTestUtil {
// Works with {@code disableEthSchedulerAutoRun} - tasks will only be pending if
// autoRun has been disabled.
public static void runPendingFutures(final EthProtocolManager ethProtocolManager) {
EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler();
final EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler();
checkArgument(
scheduler instanceof DeterministicEthScheduler,
"EthProtocolManager must be set up with "
@ -98,6 +98,22 @@ public class EthProtocolManagerTestUtil {
((DeterministicEthScheduler) scheduler).runPendingFutures();
}
/**
* Gets the number of pending tasks submitted to the EthScheduler.
*
* <p>Works with {@code disableEthSchedulerAutoRun} - tasks will only be pending if autoRun has
* been disabled.
*/
public static long getPendingFuturesCount(final EthProtocolManager ethProtocolManager) {
final EthScheduler scheduler = ethProtocolManager.ethContext().getScheduler();
checkArgument(
scheduler instanceof DeterministicEthScheduler,
"EthProtocolManager must be set up with "
+ DeterministicEthScheduler.class.getSimpleName()
+ " in order to manually run pending futures.");
return ((DeterministicEthScheduler) scheduler).getPendingFuturesCount();
}
public static void broadcastMessage(
final EthProtocolManager ethProtocolManager,
final RespondingEthPeer peer,

@ -46,6 +46,10 @@ public class MockExecutorService implements ExecutorService {
currentTasks.forEach(ExecutorTask::run);
}
public long getPendingFuturesCount() {
return tasks.stream().filter(ExecutorTask::isPending).count();
}
public void runPendingFuturesInSeparateThreads(final ExecutorService executorService) {
final List<ExecutorTask<?>> currentTasks = new ArrayList<>(tasks);
currentTasks.forEach(task -> executorService.execute(task::run));

@ -0,0 +1,91 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.DETACHED_ONLY;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class CheckpointHeaderValidationStepTest {
@Mock private ProtocolSchedule<Void> protocolSchedule;
@Mock private ProtocolSpec<Void> protocolSpec;
@Mock private ProtocolContext<Void> protocolContext;
@Mock private BlockHeaderValidator<Void> headerValidator;
@Mock private ValidationPolicy validationPolicy;
private final BlockDataGenerator gen = new BlockDataGenerator();
private CheckpointHeaderValidationStep<Void> validationStep;
private final BlockHeader checkpointStart = gen.header(10);
private final BlockHeader firstHeader = gen.header(11);
private final CheckpointRangeHeaders rangeHeaders =
new CheckpointRangeHeaders(
new CheckpointRange(checkpointStart, gen.header(13)),
asList(firstHeader, gen.header(12), gen.header(13)));
@Before
public void setUp() {
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getBlockHeaderValidator()).thenReturn(headerValidator);
when(validationPolicy.getValidationModeForNextBlock()).thenReturn(DETACHED_ONLY);
validationStep =
new CheckpointHeaderValidationStep<>(protocolSchedule, protocolContext, validationPolicy);
}
@Test
public void shouldValidateFirstHeaderAgainstCheckpointStartHeader() {
when(headerValidator.validateHeader(
firstHeader, checkpointStart, protocolContext, DETACHED_ONLY))
.thenReturn(true);
final Stream<BlockHeader> result = validationStep.apply(rangeHeaders);
verify(protocolSchedule).getByBlockNumber(firstHeader.getNumber());
verify(validationPolicy).getValidationModeForNextBlock();
verify(headerValidator)
.validateHeader(firstHeader, checkpointStart, protocolContext, DETACHED_ONLY);
verifyNoMoreInteractions(headerValidator, validationPolicy);
assertThat(result).containsExactlyElementsOf(rangeHeaders.getHeadersToImport());
}
@Test
public void shouldThrowExceptionWhenValidationFails() {
when(headerValidator.validateHeader(
firstHeader, checkpointStart, protocolContext, DETACHED_ONLY))
.thenReturn(false);
assertThatThrownBy(() -> validationStep.apply(rangeHeaders))
.isInstanceOf(InvalidBlockException.class);
}
}

@ -0,0 +1,107 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class DownloadHeadersStepTest {
private static ProtocolSchedule<Void> protocolSchedule;
private static ProtocolContext<Void> protocolContext;
private static MutableBlockchain blockchain;
private EthProtocolManager ethProtocolManager;
private DownloadHeadersStep<Void> downloader;
private CheckpointRange checkpointRange;
@BeforeClass
public static void setUpClass() {
final BlockchainSetupUtil<Void> setupUtil = BlockchainSetupUtil.forTesting();
setupUtil.importFirstBlocks(20);
protocolSchedule = setupUtil.getProtocolSchedule();
protocolContext = setupUtil.getProtocolContext();
blockchain = protocolContext.getBlockchain();
}
@Before
public void setUp() {
ethProtocolManager =
EthProtocolManagerTestUtil.create(blockchain, protocolContext.getWorldStateArchive());
downloader =
new DownloadHeadersStep<>(
protocolSchedule,
protocolContext,
ethProtocolManager.ethContext(),
() -> HeaderValidationMode.DETACHED_ONLY,
new NoOpMetricsSystem());
checkpointRange =
new CheckpointRange(
blockchain.getBlockHeader(1).get(), blockchain.getBlockHeader(10).get());
}
@Test
public void shouldRetrieveHeadersForCheckpointRange() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<CheckpointRangeHeaders> result = downloader.apply(checkpointRange);
peer.respond(RespondingEthPeer.blockchainResponder(blockchain));
// The start of the range should have been imported as part of the previous batch hence 2-10.
assertThat(result)
.isCompletedWithValue(new CheckpointRangeHeaders(checkpointRange, headersFromChain(2, 10)));
}
@Test
public void shouldCancelRequestToPeerWhenReturnedFutureIsCancelled() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);
result.cancel(true);
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
peer.respond(RespondingEthPeer.blockchainResponder(blockchain));
assertThat(EthProtocolManagerTestUtil.getPendingFuturesCount(ethProtocolManager)).isZero();
}
private List<BlockHeader> headersFromChain(final long startNumber, final long endNumber) {
final List<BlockHeader> headers = new ArrayList<>();
for (long i = startNumber; i <= endNumber; i++) {
headers.add(blockchain.getBlockHeader(i).get());
}
return Collections.unmodifiableList(headers);
}
}

@ -27,6 +27,7 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
import java.util.Optional;
@ -57,7 +58,8 @@ public class PipelineChainDownloaderTest {
@Before
public void setUp() {
chainDownloader =
new PipelineChainDownloader<>(syncTargetManager, downloadPipelineFactory, scheduler);
new PipelineChainDownloader<>(
syncTargetManager, downloadPipelineFactory, scheduler, new NoOpMetricsSystem());
}
@Test

@ -0,0 +1,88 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class DownloadReceiptsStepTest {
private static ProtocolContext<Void> protocolContext;
private static MutableBlockchain blockchain;
private EthProtocolManager ethProtocolManager;
private DownloadReceiptsStep downloadReceiptsStep;
@BeforeClass
public static void setUpClass() {
final BlockchainSetupUtil<Void> setupUtil = BlockchainSetupUtil.forTesting();
setupUtil.importFirstBlocks(20);
protocolContext = setupUtil.getProtocolContext();
blockchain = setupUtil.getBlockchain();
}
@Before
public void setUp() {
ethProtocolManager =
EthProtocolManagerTestUtil.create(blockchain, protocolContext.getWorldStateArchive());
downloadReceiptsStep =
new DownloadReceiptsStep(ethProtocolManager.ethContext(), new NoOpMetricsSystem());
}
@Test
public void shouldDownloadReceiptsForBlocks() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final List<Block> blocks = asList(block(1), block(2), block(3), block(4));
final CompletableFuture<List<BlockWithReceipts>> result = downloadReceiptsStep.apply(blocks);
peer.respond(RespondingEthPeer.blockchainResponder(blockchain));
assertThat(result)
.isCompletedWithValue(
asList(
blockWithReceipts(1),
blockWithReceipts(2),
blockWithReceipts(3),
blockWithReceipts(4)));
}
private Block block(final long number) {
final BlockHeader header = blockchain.getBlockHeader(number).get();
return new Block(header, blockchain.getBlockBody(header.getHash()).get());
}
private BlockWithReceipts blockWithReceipts(final long number) {
final Block block = block(number);
final List<TransactionReceipt> receipts = blockchain.getTxReceipts(block.getHash()).get();
return new BlockWithReceipts(block, receipts);
}
}

@ -0,0 +1,95 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.FULL;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class FastImportBlocksStepTest {
@Mock private ProtocolSchedule<Void> protocolSchedule;
@Mock private ProtocolSpec<Void> protocolSpec;
@Mock private ProtocolContext<Void> protocolContext;
@Mock private BlockImporter<Void> blockImporter;
@Mock private ValidationPolicy validationPolicy;
private final BlockDataGenerator gen = new BlockDataGenerator();
private FastImportBlocksStep<Void> importBlocksStep;
@Before
public void setUp() {
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getBlockImporter()).thenReturn(blockImporter);
when(validationPolicy.getValidationModeForNextBlock()).thenReturn(FULL);
importBlocksStep =
new FastImportBlocksStep<>(protocolSchedule, protocolContext, validationPolicy);
}
@Test
public void shouldImportBlocks() {
final List<Block> blocks = gen.blockSequence(5);
final List<BlockWithReceipts> blocksWithReceipts =
blocks.stream()
.map(block -> new BlockWithReceipts(block, gen.receipts(block)))
.collect(toList());
for (final BlockWithReceipts blockWithReceipts : blocksWithReceipts) {
when(blockImporter.fastImportBlock(
protocolContext, blockWithReceipts.getBlock(), blockWithReceipts.getReceipts(), FULL))
.thenReturn(true);
}
importBlocksStep.accept(blocksWithReceipts);
for (final BlockWithReceipts blockWithReceipts : blocksWithReceipts) {
verify(protocolSchedule).getByBlockNumber(blockWithReceipts.getNumber());
}
verify(validationPolicy, times(blocks.size())).getValidationModeForNextBlock();
}
@Test
public void shouldThrowExceptionWhenValidationFails() {
final Block block = gen.block();
final BlockWithReceipts blockWithReceipts = new BlockWithReceipts(block, gen.receipts(block));
when(blockImporter.fastImportBlock(
protocolContext, block, blockWithReceipts.getReceipts(), FULL))
.thenReturn(false);
assertThatThrownBy(() -> importBlocksStep.accept(singletonList(blockWithReceipts)))
.isInstanceOf(InvalidBlockException.class);
}
}

@ -37,7 +37,6 @@ import org.apache.logging.log4j.Logger;
public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
private static final Logger LOG = LogManager.getLogger();
private final BlockingQueue<T> queue;
private final int capacity;
private final Counter inputCounter;
private final Counter outputCounter;
private final Counter abortedItemCounter;
@ -50,7 +49,6 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
final Counter outputCounter,
final Counter abortedItemCounter) {
queue = new ArrayBlockingQueue<>(capacity);
this.capacity = capacity;
this.inputCounter = inputCounter;
this.outputCounter = outputCounter;
this.abortedItemCounter = abortedItemCounter;
@ -66,15 +64,6 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
return aborted.get();
}
/**
* Get the number of items that can be queued inside this pipe.
*
* @return the pipe's capacity.
*/
public int getCapacity() {
return capacity;
}
@Override
public boolean hasRemainingCapacity() {
return queue.remainingCapacity() > 0 && isOpen();

@ -107,7 +107,7 @@ public class PipelineBuilder<I, T> {
final String sourceName, final int bufferSize, final LabelledMetric<Counter> outputCounter) {
final Pipe<T> pipe = createPipe(bufferSize, sourceName, outputCounter);
return new PipelineBuilder<>(
pipe, emptyList(), singleton(pipe), sourceName, pipe, pipe.getCapacity(), outputCounter);
pipe, emptyList(), singleton(pipe), sourceName, pipe, bufferSize, outputCounter);
}
/**
@ -213,7 +213,7 @@ public class PipelineBuilder<I, T> {
pipeEnd,
maximumBatchSize,
outputCounter.labels(lastStageName + "_outputPipe", "batches")),
bufferSize / maximumBatchSize + 1,
(int) Math.ceil(((double) bufferSize) / maximumBatchSize),
outputCounter);
}
@ -307,18 +307,14 @@ public class PipelineBuilder<I, T> {
final Processor<T, O> processor, final int newBufferSize, final String stageName) {
final Pipe<O> outputPipe = createPipe(newBufferSize, stageName, outputCounter);
final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor);
return addStage(processStage, outputPipe);
}
private <O> PipelineBuilder<I, O> addStage(final Stage stage, final Pipe<O> outputPipe) {
final List<Stage> newStages = concat(stages, stage);
final List<Stage> newStages = concat(stages, processStage);
return new PipelineBuilder<>(
inputPipe,
newStages,
concat(pipes, outputPipe),
stage.getName(),
processStage.getName(),
outputPipe,
bufferSize,
newBufferSize,
outputCounter);
}

@ -14,6 +14,7 @@ package tech.pegasys.pantheon.util;
import static java.util.concurrent.CompletableFuture.completedFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
@ -109,4 +110,25 @@ public class FutureUtils {
to.completeExceptionally(t);
}
}
/**
* Propagates cancellation, and only cancellation, from one future to another.
*
* <p>When <code>from</code> is completed with a {@link
* java.util.concurrent.CancellationException} {@link java.util.concurrent.Future#cancel(boolean)}
* will be called on <code>to</code>, allowing interruption if the future is currently running.
*
* @param from the CompletableFuture to take cancellation from
* @param to the CompletableFuture to propagate cancellation to
*/
public static void propagateCancellation(
final CompletableFuture<?> from, final CompletableFuture<?> to) {
from.exceptionally(
error -> {
if (error instanceof CancellationException) {
to.cancel(true);
}
return null;
});
}
}

@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;
import static tech.pegasys.pantheon.util.FutureUtils.propagateCancellation;
import static tech.pegasys.pantheon.util.FutureUtils.propagateResult;
import java.util.concurrent.CompletableFuture;
@ -74,6 +75,40 @@ public class FutureUtilsTest {
assertCompletedExceptionally(output, ERROR);
}
@Test
@SuppressWarnings("unchecked")
public void shouldPropagateCancellation() {
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> output = mock(CompletableFuture.class);
propagateCancellation(input, output);
input.cancel(true);
verify(output).cancel(true);
}
@Test
public void shouldNotPropagateExceptionsOtherThanCancellationWhenPropagatingCancellation() {
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> output = new CompletableFuture<>();
propagateCancellation(input, output);
assertThat(output).isNotDone();
input.completeExceptionally(ERROR);
assertThat(output).isNotDone();
}
@Test
public void shouldNotPropagateResultsWhenPropagatingCancellation() {
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> output = new CompletableFuture<>();
propagateCancellation(input, output);
assertThat(output).isNotDone();
input.complete("foo");
assertThat(output).isNotDone();
}
@Test
public void shouldComposeExceptionallyWhenErrorOccurs() {
final Function<Throwable, CompletionStage<String>> errorHandler = mockFunction();

Loading…
Cancel
Save