[NC-2138] Complete fast sync chain download (#701)

* Download receipts
* store blocks without processing transactions
* Only apply light validation during fast sync.
* Mark fast sync chain download as complete when pivot block has been reached.

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 1489fa5543
commit 254661683b
  1. 12
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/BlockHeaderValidator.java
  2. 6
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/HeaderValidationMode.java
  3. 6
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/mainnet/MainnetBlockImporter.java
  4. 9
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainDownloader.java
  5. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SyncTargetManager.java
  6. 41
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/BlockWithReceipts.java
  7. 3
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncActions.java
  8. 111
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java
  9. 17
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java
  10. 14
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncTargetManager.java
  11. 88
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java
  12. 9
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloader.java
  13. 5
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManager.java
  14. 118
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java
  15. 49
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java

@ -41,6 +41,18 @@ public class BlockHeaderValidator<C> {
switch (mode) {
case NONE:
return true;
case LIGHT_DETACHED_ONLY:
return applyRules(
header,
parent,
protocolContext,
rule -> rule.includeInLightValidation() && rule.isDetachedSupported());
case LIGHT_SKIP_DETACHED:
return applyRules(
header,
parent,
protocolContext,
rule -> rule.includeInLightValidation() && !rule.isDetachedSupported());
case LIGHT:
return applyRules(header, parent, protocolContext, Rule::includeInLightValidation);
case DETACHED_ONLY:

@ -16,6 +16,12 @@ public enum HeaderValidationMode {
/** No Validation. data must be pre-validated */
NONE,
/** Skip proof of work validation */
LIGHT_DETACHED_ONLY,
/** Skip proof of work validation */
LIGHT_SKIP_DETACHED,
/** Skip proof of work validation */
LIGHT,

@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.mainnet;
import static org.apache.logging.log4j.LogManager.getLogger;
import tech.pegasys.pantheon.ethereum.BlockValidator;
import tech.pegasys.pantheon.ethereum.BlockValidator.BlockProcessingOutputs;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
@ -25,12 +23,8 @@ import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.Logger;
public class MainnetBlockImporter<C> implements BlockImporter<C> {
private static final Logger LOG = getLogger();
final BlockValidator<C> blockValidator;
public MainnetBlockImporter(final BlockValidator<C> blockValidator) {

@ -49,6 +49,7 @@ public class ChainDownloader<C> {
private final CheckpointHeaderManager<C> checkpointHeaderManager;
private final BlockImportTaskFactory blockImportTaskFactory;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final CompletableFuture<Void> downloadFuture = new CompletableFuture<>();
private int chainSegmentTimeouts = 0;
@ -73,9 +74,10 @@ public class ChainDownloader<C> {
this.blockImportTaskFactory = blockImportTaskFactory;
}
public void start() {
public CompletableFuture<Void> start() {
if (started.compareAndSet(false, true)) {
executeDownload();
return downloadFuture;
} else {
throw new IllegalStateException(
"Attempt to start an already started " + this.getClass().getSimpleName() + ".");
@ -110,8 +112,11 @@ public class ChainDownloader<C> {
ethContext
.getScheduler()
.scheduleFutureTask(this::executeDownload, Duration.ofSeconds(2));
} else {
} else if (syncTargetManager.shouldContinueDownloading()) {
executeDownload();
} else {
LOG.info("Chain download complete");
downloadFuture.complete(null);
}
});
return currentTask;

@ -126,4 +126,6 @@ public abstract class SyncTargetManager<C> {
}
public abstract boolean shouldSwitchSyncTarget(final SyncTarget currentTarget);
public abstract boolean shouldContinueDownloading();
}

@ -0,0 +1,41 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import java.util.List;
class BlockWithReceipts {
private final Block block;
private final List<TransactionReceipt> receipts;
BlockWithReceipts(final Block block, final List<TransactionReceipt> receipts) {
this.block = block;
this.receipts = receipts;
}
public BlockHeader getHeader() {
return block.getHeader();
}
public Block getBlock() {
return block;
}
public List<TransactionReceipt> getReceipts() {
return receipts;
}
}

@ -158,7 +158,6 @@ public class FastSyncActions<C> {
syncState,
ethTasksTimer,
currentState.getPivotBlockHeader().get());
downloader.start();
return new CompletableFuture<>();
return downloader.start();
}
}

@ -0,0 +1,111 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;
import static java.util.Collections.emptyList;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FastSyncBlockHandler<C> implements BlockHandler<BlockWithReceipts> {
private static final Logger LOG = LogManager.getLogger();
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;
public FastSyncBlockHandler(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
}
@Override
public CompletableFuture<List<BlockWithReceipts>> downloadBlocks(
final List<BlockHeader> headers) {
return downloadBodies(headers)
.thenCombine(downloadReceipts(headers), this::combineBlocksAndReceipts);
}
private CompletableFuture<List<Block>> downloadBodies(final List<BlockHeader> headers) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer)
.run();
}
private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> downloadReceipts(
final List<BlockHeader> headers) {
return GetReceiptsFromPeerTask.forHeaders(ethContext, headers, ethTasksTimer)
.run()
.thenApply(PeerTaskResult::getResult);
}
private List<BlockWithReceipts> combineBlocksAndReceipts(
final List<Block> blocks, final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader) {
return blocks
.stream()
.map(
block -> {
final List<TransactionReceipt> receipts =
receiptsByHeader.getOrDefault(block.getHeader(), emptyList());
return new BlockWithReceipts(block, receipts);
})
.collect(Collectors.toList());
}
@Override
public CompletableFuture<List<BlockWithReceipts>> validateAndImportBlocks(
final List<BlockWithReceipts> blocksWithReceipts) {
LOG.debug(
"Storing blocks {} to {}",
blocksWithReceipts.get(0).getHeader().getNumber(),
blocksWithReceipts.get(blocksWithReceipts.size() - 1).getHeader().getNumber());
blocksWithReceipts.forEach(
block -> {
final BlockImporter<C> blockImporter =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber()).getBlockImporter();
// TODO: This is still doing full ommer validation. Is that required?
blockImporter.fastImportBlock(
protocolContext,
block.getBlock(),
block.getReceipts(),
HeaderValidationMode.LIGHT_SKIP_DETACHED);
});
return CompletableFuture.completedFuture(blocksWithReceipts);
}
}

@ -22,12 +22,14 @@ import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class FastSyncChainDownloader<C> {
private final ChainDownloader<C> chainDownloader;
@ -77,8 +79,8 @@ public class FastSyncChainDownloader<C> {
this::importBlocksForCheckpoints);
}
public void start() {
chainDownloader.start();
public CompletableFuture<Void> start() {
return chainDownloader.start();
}
private CompletableFuture<List<Block>> importBlocksForCheckpoints(
@ -91,14 +93,21 @@ public class FastSyncChainDownloader<C> {
return CompletableFuture.completedFuture(emptyList());
}
}
final PipelinedImportChainSegmentTask<C> importTask =
final PipelinedImportChainSegmentTask<C, BlockWithReceipts> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
config.downloaderParallelism(),
ethTasksTimer,
new FastSyncBlockHandler<>(
protocolSchedule, protocolContext, ethContext, ethTasksTimer),
HeaderValidationMode.LIGHT_DETACHED_ONLY,
checkpointHeaders);
return importTask.run();
return importTask
.run()
.thenApply(
results ->
results.stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList()));
}
}

@ -29,6 +29,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.Discon
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -39,6 +40,7 @@ class FastSyncTargetManager<C> extends SyncTargetManager<C> {
private static final Logger LOG = LogManager.getLogger();
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final BlockHeader pivotBlockHeader;
@ -53,6 +55,7 @@ class FastSyncTargetManager<C> extends SyncTargetManager<C> {
final BlockHeader pivotBlockHeader) {
super(config, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.pivotBlockHeader = pivotBlockHeader;
@ -89,7 +92,7 @@ class FastSyncTargetManager<C> extends SyncTargetManager<C> {
.timeout(task)
.thenApply(
result -> {
if (result.size() != 1 || !result.get(0).equals(pivotBlockHeader)) {
if (peerHasDifferentPivotBlock(result)) {
bestPeer.disconnect(DisconnectReason.USELESS_PEER);
return Optional.<EthPeer>empty();
} else {
@ -103,8 +106,17 @@ class FastSyncTargetManager<C> extends SyncTargetManager<C> {
});
}
private boolean peerHasDifferentPivotBlock(final List<BlockHeader> result) {
return result.size() != 1 || !result.get(0).equals(pivotBlockHeader);
}
@Override
public boolean shouldSwitchSyncTarget(final SyncTarget currentTarget) {
return false;
}
@Override
public boolean shouldContinueDownloading() {
return !protocolContext.getBlockchain().getChainHeadHash().equals(pivotBlockHeader.getHash());
}
}

@ -0,0 +1,88 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FullSyncBlockHandler<C> implements BlockHandler<Block> {
private static final Logger LOG = LogManager.getLogger();
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;
public FullSyncBlockHandler(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
}
@Override
public CompletableFuture<List<Block>> validateAndImportBlocks(final List<Block> blocks) {
LOG.debug(
"Validating and importing {} to {}",
blocks.get(0).getHeader().getNumber(),
blocks.get(blocks.size() - 1).getHeader().getNumber());
return PersistBlockTask.forSequentialBlocks(
protocolSchedule,
protocolContext,
blocks,
HeaderValidationMode.SKIP_DETACHED,
ethTasksTimer)
.get();
}
@Override
public CompletableFuture<List<Block>> downloadBlocks(final List<BlockHeader> headers) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer)
.run()
.thenCompose(this::extractTransactionSenders);
}
private CompletableFuture<List<Block>> extractTransactionSenders(final List<Block> blocks) {
LOG.debug(
"Extracting sender {} to {}",
blocks.get(0).getHeader().getNumber(),
blocks.get(blocks.size() - 1).getHeader().getNumber());
for (final Block block : blocks) {
for (final Transaction transaction : block.getBody().getTransactions()) {
// This method internally performs the transaction sender extraction.
transaction.getSender();
}
}
return CompletableFuture.completedFuture(blocks);
}
}

@ -23,6 +23,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
@ -31,8 +32,11 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FullSyncDownloader<C> {
private static final Logger LOG = LogManager.getLogger();
private final ChainDownloader<C> chainDownloader;
private final SynchronizerConfiguration config;
private final ProtocolSchedule<C> protocolSchedule;
@ -89,13 +93,16 @@ public class FullSyncDownloader<C> {
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C> importTask =
final PipelinedImportChainSegmentTask<C, Block> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
config.downloaderParallelism(),
ethTasksTimer,
new FullSyncBlockHandler<>(
protocolSchedule, protocolContext, ethContext, ethTasksTimer),
HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders);
importedBlocks = importTask.run();
}

@ -107,4 +107,9 @@ class FullSyncTargetManager<C> extends SyncTargetManager<C> {
})
.orElse(false);
}
@Override
public boolean shouldContinueDownloading() {
return true;
}
}

@ -13,9 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractEthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
@ -40,29 +38,29 @@ import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Block>> {
public class PipelinedImportChainSegmentTask<C, B> extends AbstractEthTask<List<B>> {
private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext;
private final ProtocolContext<C> protocolContext;
private final ProtocolSchedule<C> protocolSchedule;
private final List<Block> importedBlocks = new ArrayList<>();
private final List<B> importedBlocks = new ArrayList<>();
private final LabelledMetric<OperationTimer> ethTasksTimer;
// First header is assumed to already be imported
private final List<BlockHeader> checkpointHeaders;
private final int chunksInTotal;
private final BlockHandler<B> blockHandler;
private final HeaderValidationMode headerValidationMode;
private int chunksIssued;
private int chunksCompleted;
private final int maxActiveChunks;
private final Deque<CompletableFuture<List<BlockHeader>>> downloadAndValidateHeadersTasks =
new ConcurrentLinkedDeque<>();
private final Deque<CompletableFuture<List<Block>>> downloadBodiesTasks =
private final Deque<CompletableFuture<List<B>>> downloadBodiesTasks =
new ConcurrentLinkedDeque<>();
private final Deque<CompletableFuture<List<Block>>> extractTransactionSendersTasks =
new ConcurrentLinkedDeque<>();
private final Deque<CompletableFuture<List<Block>>> validateAndImportBlocksTasks =
private final Deque<CompletableFuture<List<B>>> validateAndImportBlocksTasks =
new ConcurrentLinkedDeque<>();
protected PipelinedImportChainSegmentTask(
@ -71,7 +69,9 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
final EthContext ethContext,
final int maxActiveChunks,
final List<BlockHeader> checkpointHeaders,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final LabelledMetric<OperationTimer> ethTasksTimer,
final BlockHandler<B> blockHandler,
final HeaderValidationMode headerValidationMode) {
super(ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
@ -79,17 +79,21 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
this.ethTasksTimer = ethTasksTimer;
this.checkpointHeaders = checkpointHeaders;
this.chunksInTotal = checkpointHeaders.size() - 1;
this.blockHandler = blockHandler;
this.chunksIssued = 0;
this.chunksCompleted = 0;
this.maxActiveChunks = maxActiveChunks;
this.headerValidationMode = headerValidationMode;
}
public static <C> PipelinedImportChainSegmentTask<C> forCheckpoints(
public static <C, B> PipelinedImportChainSegmentTask<C, B> forCheckpoints(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final LabelledMetric<OperationTimer> ethTasksTimer,
final BlockHandler<B> blockHandler,
final HeaderValidationMode headerValidationMode,
final BlockHeader... checkpointHeaders) {
return forCheckpoints(
protocolSchedule,
@ -97,15 +101,19 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
ethContext,
maxActiveChunks,
ethTasksTimer,
blockHandler,
headerValidationMode,
Arrays.asList(checkpointHeaders));
}
public static <C> PipelinedImportChainSegmentTask<C> forCheckpoints(
public static <C, B> PipelinedImportChainSegmentTask<C, B> forCheckpoints(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final LabelledMetric<OperationTimer> ethTasksTimer,
final BlockHandler<B> blockHandler,
final HeaderValidationMode headerValidationMode,
final List<BlockHeader> checkpointHeaders) {
return new PipelinedImportChainSegmentTask<>(
protocolSchedule,
@ -113,7 +121,9 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
ethContext,
maxActiveChunks,
checkpointHeaders,
ethTasksTimer);
ethTasksTimer,
blockHandler,
headerValidationMode);
}
@Override
@ -135,29 +145,30 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
lastDownloadAndValidateHeadersTask()
.thenCompose((ignore) -> downloadNextHeaders(firstChunkHeader, lastChunkHeader))
.thenCompose(this::validateHeaders);
final CompletableFuture<List<Block>> downloadBodiesTask =
final CompletableFuture<List<B>> downloadBodiesTask =
downloadAndValidateHeadersTask
.thenCombine(lastDownloadBodiesTask(), (headers, ignored) -> headers)
.thenCompose(this::downloadBlocks);
final CompletableFuture<List<Block>> extractTransactionSendersTask =
final CompletableFuture<List<B>> validateAndImportBlocksTask =
downloadBodiesTask
.thenCombine(lastExtractTransactionSendersTasks(), (blocks, ignored) -> blocks)
.thenCompose(this::extractTransactionSenders);
final CompletableFuture<List<Block>> validateAndImportBlocksTask =
extractTransactionSendersTask
.thenCombine(lastValidateAndImportBlocksTasks(), (blocks, ignored) -> blocks)
.thenCompose(this::validateAndImportBlocks);
validateAndImportBlocksTask.whenComplete(this::completeChunkPipelineAndMaybeLaunchNextOne);
downloadAndValidateHeadersTasks.addLast(downloadAndValidateHeadersTask);
downloadBodiesTasks.addLast(downloadBodiesTask);
extractTransactionSendersTasks.addLast(extractTransactionSendersTask);
validateAndImportBlocksTasks.addLast(validateAndImportBlocksTask);
chunksIssued++;
}
private CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks) {
final Supplier<CompletableFuture<List<B>>> task =
() -> blockHandler.validateAndImportBlocks(blocks);
return executeWorkerSubTask(ethContext.getScheduler(), task);
}
public void completeChunkPipelineAndMaybeLaunchNextOne(
final List<Block> blocks, final Throwable throwable) {
final List<B> blocks, final Throwable throwable) {
if (throwable != null) {
LOG.warn(
"Import of chain segment ({} to {}) failed: {}.",
@ -167,15 +178,8 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
result.get().completeExceptionally(throwable);
} else {
importedBlocks.addAll(blocks);
final BlockHeader firstHeader = blocks.get(0).getHeader();
final BlockHeader lastHeader = blocks.get(blocks.size() - 1).getHeader();
chunksCompleted++;
LOG.debug(
"Import chain segment from {} to {} succeeded (chunk {}/{}).",
firstHeader.getNumber(),
lastHeader.getNumber(),
chunksCompleted,
chunksInTotal);
LOG.debug("Import chain segment succeeded (chunk {}/{}).", chunksCompleted, chunksInTotal);
if (chunksCompleted == chunksInTotal) {
LOG.info(
"Completed importing chain segment {} to {}",
@ -185,7 +189,6 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
} else {
downloadAndValidateHeadersTasks.removeFirst();
downloadBodiesTasks.removeFirst();
extractTransactionSendersTasks.removeFirst();
validateAndImportBlocksTasks.removeFirst();
if (chunksIssued < chunksInTotal) {
createNextChunkPipeline();
@ -238,7 +241,7 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
final BlockHeaderValidator<C> blockHeaderValidator =
protocolSpec.getBlockHeaderValidator();
if (blockHeaderValidator.validateHeader(
childHeader, parentHeader, protocolContext, HeaderValidationMode.DETACHED_ONLY)) {
childHeader, parentHeader, protocolContext, headerValidationMode)) {
// The first header will be imported by the previous request range.
result.complete(headers.subList(1, headers.size()));
} else {
@ -252,49 +255,12 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
});
}
private CompletableFuture<List<Block>> downloadBlocks(final List<BlockHeader> headers) {
private CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers) {
LOG.debug(
"Downloading bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
final CompleteBlocksTask<C> task =
CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer);
return executeSubTask(task::run);
}
private CompletableFuture<List<Block>> validateAndImportBlocks(final List<Block> blocks) {
LOG.debug(
"Validating and importing {} to {}",
blocks.get(0).getHeader().getNumber(),
blocks.get(blocks.size() - 1).getHeader().getNumber());
final Supplier<CompletableFuture<List<Block>>> task =
PersistBlockTask.forSequentialBlocks(
protocolSchedule,
protocolContext,
blocks,
HeaderValidationMode.SKIP_DETACHED,
ethTasksTimer);
return executeWorkerSubTask(ethContext.getScheduler(), task);
}
private CompletableFuture<List<Block>> extractTransactionSenders(final List<Block> blocks) {
LOG.debug(
"Extracting sender {} to {}",
blocks.get(0).getHeader().getNumber(),
blocks.get(blocks.size() - 1).getHeader().getNumber());
return executeWorkerSubTask(
ethContext.getScheduler(),
() -> {
final CompletableFuture<List<Block>> result = new CompletableFuture<>();
for (final Block block : blocks) {
for (final Transaction transaction : block.getBody().getTransactions()) {
// This method internally performs the transaction sender extraction.
transaction.getSender();
}
}
result.complete(blocks);
return result;
});
return executeSubTask(() -> blockHandler.downloadBlocks(headers));
}
private BlockHeader firstHeader() {
@ -313,7 +279,7 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
}
}
private CompletableFuture<List<Block>> lastDownloadBodiesTask() {
private CompletableFuture<List<B>> lastDownloadBodiesTask() {
if (downloadBodiesTasks.isEmpty()) {
return CompletableFuture.completedFuture(Lists.newArrayList());
} else {
@ -321,7 +287,7 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
}
}
private CompletableFuture<List<Block>> lastValidateAndImportBlocksTasks() {
private CompletableFuture<List<B>> lastValidateAndImportBlocksTasks() {
if (validateAndImportBlocksTasks.isEmpty()) {
return CompletableFuture.completedFuture(Lists.newArrayList());
} else {
@ -329,11 +295,9 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
}
}
private CompletableFuture<List<Block>> lastExtractTransactionSendersTasks() {
if (extractTransactionSendersTasks.isEmpty()) {
return CompletableFuture.completedFuture(Lists.newArrayList());
} else {
return extractTransactionSendersTasks.getLast();
}
public interface BlockHandler<B> {
CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers);
CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);
}
}

@ -31,7 +31,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.AbstractMessageTaskTest;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncBlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
@ -68,6 +70,16 @@ public class PipelinedImportChainSegmentTaskTest
return new Block(header, body);
}
private CompletableFuture<List<Block>> validateAndImportBlocks(final List<Block> blocks) {
return PersistBlockTask.forSequentialBlocks(
protocolSchedule,
protocolContext,
blocks,
HeaderValidationMode.SKIP_DETACHED,
ethTasksTimer)
.get();
}
@Override
protected EthTask<List<Block>> createTask(final List<Block> requestedData) {
final Block firstBlock = requestedData.get(0);
@ -85,6 +97,8 @@ public class PipelinedImportChainSegmentTaskTest
ethContext,
1,
ethTasksTimer,
createBlockHandler(),
HeaderValidationMode.DETACHED_ONLY,
previousBlock.getHeader(),
lastBlock.getHeader());
}
@ -119,6 +133,8 @@ public class PipelinedImportChainSegmentTaskTest
ethContext,
1,
ethTasksTimer,
createBlockHandler(),
HeaderValidationMode.DETACHED_ONLY,
firstBlock.getHeader(),
secondBlock.getHeader());
@ -169,6 +185,8 @@ public class PipelinedImportChainSegmentTaskTest
ethContext,
1,
ethTasksTimer,
createBlockHandler(),
HeaderValidationMode.DETACHED_ONLY,
fakeFirstBlock.getHeader(),
thirdBlock.getHeader());
@ -218,7 +236,14 @@ public class PipelinedImportChainSegmentTaskTest
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule, modifiedContext, ethContext, 1, ethTasksTimer, checkpointHeaders);
protocolSchedule,
modifiedContext,
ethContext,
1,
ethTasksTimer,
createBlockHandler(),
HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
@ -274,7 +299,14 @@ public class PipelinedImportChainSegmentTaskTest
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule, modifiedContext, ethContext, 2, ethTasksTimer, checkpointHeaders);
protocolSchedule,
modifiedContext,
ethContext,
2,
ethTasksTimer,
createBlockHandler(),
HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
@ -334,7 +366,14 @@ public class PipelinedImportChainSegmentTaskTest
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule, modifiedContext, ethContext, 3, ethTasksTimer, checkpointHeaders);
protocolSchedule,
modifiedContext,
ethContext,
3,
ethTasksTimer,
createBlockHandler(),
HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
@ -388,6 +427,10 @@ public class PipelinedImportChainSegmentTaskTest
return shortChain;
}
private FullSyncBlockHandler<Void> createBlockHandler() {
return new FullSyncBlockHandler<>(protocolSchedule, protocolContext, ethContext, ethTasksTimer);
}
private static class CountingResponder implements Responder {
private final Responder delegate;

Loading…
Cancel
Save