delete the legacy pipelined import chain segment task. (#1003)

This has been replaced by the parallel import chain segment task.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent 0fcb3afa24
commit 5ffb304149
  1. 300
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java
  2. 490
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java

@ -1,300 +0,0 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Supplier;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class PipelinedImportChainSegmentTask<C, B> extends AbstractEthTask<List<B>> {
private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext;
private final ProtocolContext<C> protocolContext;
private final ProtocolSchedule<C> protocolSchedule;
private final List<B> importedBlocks = new ArrayList<>();
// First header is assumed to already be imported
private final List<BlockHeader> checkpointHeaders;
private final int chunksInTotal;
private final BlockHandler<B> blockHandler;
private final ValidationPolicy validationPolicy;
private final MetricsSystem metricsSystem;
private int chunksIssued;
private int chunksCompleted;
private final int maxActiveChunks;
private final Deque<CompletableFuture<List<BlockHeader>>> downloadAndValidateHeadersTasks =
new ConcurrentLinkedDeque<>();
private final Deque<CompletableFuture<List<B>>> downloadBodiesTasks =
new ConcurrentLinkedDeque<>();
private final Deque<CompletableFuture<List<B>>> validateAndImportBlocksTasks =
new ConcurrentLinkedDeque<>();
protected PipelinedImportChainSegmentTask(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final List<BlockHeader> checkpointHeaders,
final MetricsSystem metricsSystem,
final BlockHandler<B> blockHandler,
final ValidationPolicy validationPolicy) {
super(metricsSystem);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.checkpointHeaders = checkpointHeaders;
this.chunksInTotal = checkpointHeaders.size() - 1;
this.blockHandler = blockHandler;
this.validationPolicy = validationPolicy;
this.metricsSystem = metricsSystem;
this.chunksIssued = 0;
this.chunksCompleted = 0;
this.maxActiveChunks = maxActiveChunks;
}
public static <C, B> PipelinedImportChainSegmentTask<C, B> forCheckpoints(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final MetricsSystem metricsSystem,
final BlockHandler<B> blockHandler,
final ValidationPolicy validationPolicy,
final BlockHeader... checkpointHeaders) {
return forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
maxActiveChunks,
metricsSystem,
blockHandler,
validationPolicy,
Arrays.asList(checkpointHeaders));
}
public static <C, B> PipelinedImportChainSegmentTask<C, B> forCheckpoints(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final MetricsSystem metricsSystem,
final BlockHandler<B> blockHandler,
final ValidationPolicy validationPolicy,
final List<BlockHeader> checkpointHeaders) {
return new PipelinedImportChainSegmentTask<>(
protocolSchedule,
protocolContext,
ethContext,
maxActiveChunks,
checkpointHeaders,
metricsSystem,
blockHandler,
validationPolicy);
}
@Override
protected void executeTask() {
LOG.debug(
"Importing chain segment from {} to {}.",
firstHeader().getNumber(),
lastHeader().getNumber());
for (int i = 0; i < chunksInTotal && i < maxActiveChunks; i++) {
createNextChunkPipeline();
}
}
private void createNextChunkPipeline() {
final BlockHeader firstChunkHeader = checkpointHeaders.get(chunksIssued);
final BlockHeader lastChunkHeader = checkpointHeaders.get(chunksIssued + 1);
final CompletableFuture<List<BlockHeader>> downloadAndValidateHeadersTask =
lastDownloadAndValidateHeadersTask()
.thenCompose((ignore) -> downloadNextHeaders(firstChunkHeader, lastChunkHeader))
.thenCompose(this::validateHeaders);
final CompletableFuture<List<B>> downloadBodiesTask =
downloadAndValidateHeadersTask
.thenCombine(lastDownloadBodiesTask(), (headers, ignored) -> headers)
.thenCompose(this::downloadBlocks);
final CompletableFuture<List<B>> validateAndImportBlocksTask =
downloadBodiesTask
.thenCombine(lastValidateAndImportBlocksTasks(), (blocks, ignored) -> blocks)
.thenCompose(this::validateAndImportBlocks);
validateAndImportBlocksTask.whenComplete(this::completeChunkPipelineAndMaybeLaunchNextOne);
downloadAndValidateHeadersTasks.addLast(downloadAndValidateHeadersTask);
downloadBodiesTasks.addLast(downloadBodiesTask);
validateAndImportBlocksTasks.addLast(validateAndImportBlocksTask);
chunksIssued++;
}
private CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks) {
final Supplier<CompletableFuture<List<B>>> task =
() -> blockHandler.validateAndImportBlocks(blocks);
return executeWorkerSubTask(ethContext.getScheduler(), task);
}
public void completeChunkPipelineAndMaybeLaunchNextOne(
final List<B> blocks, final Throwable throwable) {
if (throwable != null) {
LOG.warn(
"Import of chain segment ({} to {}) failed: {}.",
firstHeader().getNumber(),
lastHeader().getNumber(),
ExceptionUtils.rootCause(throwable).getMessage());
result.get().completeExceptionally(throwable);
} else {
importedBlocks.addAll(blocks);
chunksCompleted++;
LOG.debug("Import chain segment succeeded (chunk {}/{}).", chunksCompleted, chunksInTotal);
if (chunksCompleted == chunksInTotal) {
LOG.info(
"Completed importing chain segment {} to {}",
firstHeader().getNumber(),
lastHeader().getNumber());
result.get().complete(importedBlocks);
} else {
downloadAndValidateHeadersTasks.removeFirst();
downloadBodiesTasks.removeFirst();
validateAndImportBlocksTasks.removeFirst();
if (chunksIssued < chunksInTotal) {
createNextChunkPipeline();
}
}
}
}
private CompletableFuture<List<BlockHeader>> downloadNextHeaders(
final BlockHeader firstChunkHeader, final BlockHeader lastChunkHeader) {
// Download the headers we're missing (between first and last)
LOG.debug(
"Downloading headers {} to {}",
firstChunkHeader.getNumber() + 1,
lastChunkHeader.getNumber());
final int segmentLength =
Math.toIntExact(lastChunkHeader.getNumber() - firstChunkHeader.getNumber() - 1);
if (segmentLength == 0) {
return CompletableFuture.completedFuture(
Lists.newArrayList(firstChunkHeader, lastChunkHeader));
}
final DownloadHeaderSequenceTask<C> task =
DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule,
protocolContext,
ethContext,
lastChunkHeader,
segmentLength,
metricsSystem);
return executeSubTask(task::run)
.thenApply(
headers -> {
final List<BlockHeader> finalHeaders = Lists.newArrayList(firstChunkHeader);
finalHeaders.addAll(headers);
finalHeaders.add(lastChunkHeader);
return finalHeaders;
});
}
private CompletableFuture<List<BlockHeader>> validateHeaders(final List<BlockHeader> headers) {
// First header needs to be validated
return executeWorkerSubTask(
ethContext.getScheduler(),
() -> {
final CompletableFuture<List<BlockHeader>> result = new CompletableFuture<>();
final BlockHeader parentHeader = headers.get(0);
final BlockHeader childHeader = headers.get(1);
final ProtocolSpec<C> protocolSpec =
protocolSchedule.getByBlockNumber(childHeader.getNumber());
final BlockHeaderValidator<C> blockHeaderValidator =
protocolSpec.getBlockHeaderValidator();
if (blockHeaderValidator.validateHeader(
childHeader,
parentHeader,
protocolContext,
validationPolicy.getValidationModeForNextBlock())) {
// The first header will be imported by the previous request range.
result.complete(headers.subList(1, headers.size()));
} else {
result.completeExceptionally(
new InvalidBlockException(
"Provided first header does not connect to last header.",
parentHeader.getNumber(),
parentHeader.getHash()));
}
return result;
});
}
private CompletableFuture<List<B>> downloadBlocks(final List<BlockHeader> headers) {
LOG.debug(
"Downloading bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
return executeSubTask(() -> blockHandler.downloadBlocks(headers));
}
private BlockHeader firstHeader() {
return checkpointHeaders.get(0);
}
private BlockHeader lastHeader() {
return checkpointHeaders.get(checkpointHeaders.size() - 1);
}
private CompletableFuture<List<BlockHeader>> lastDownloadAndValidateHeadersTask() {
if (downloadAndValidateHeadersTasks.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
} else {
return downloadAndValidateHeadersTasks.getLast();
}
}
private CompletableFuture<List<B>> lastDownloadBodiesTask() {
if (downloadBodiesTasks.isEmpty()) {
return CompletableFuture.completedFuture(Lists.newArrayList());
} else {
return downloadBodiesTasks.getLast();
}
}
private CompletableFuture<List<B>> lastValidateAndImportBlocksTasks() {
if (validateAndImportBlocksTasks.isEmpty()) {
return CompletableFuture.completedFuture(Lists.newArrayList());
} else {
return validateAndImportBlocksTasks.getLast();
}
}
}

@ -1,490 +0,0 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.AbstractMessageTaskTest;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncBlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.Test;
public class PipelinedImportChainSegmentTaskTest
extends AbstractMessageTaskTest<List<Block>, List<Block>> {
private static final ValidationPolicy DETACHED_ONLY_VALIDATION_POLICY =
() -> HeaderValidationMode.DETACHED_ONLY;
@Override
protected List<Block> generateDataToBeRequested() {
final long chainHead = blockchain.getChainHeadBlockNumber();
final long importSize = 5;
final long startNumber = chainHead - importSize + 1;
final List<Block> blocksToImport = new ArrayList<>();
for (long i = 0; i < importSize; i++) {
blocksToImport.add(getBlockAtNumber(startNumber + i));
}
return blocksToImport;
}
private Block getBlockAtNumber(final long number) {
final BlockHeader header = blockchain.getBlockHeader(number).get();
final BlockBody body = blockchain.getBlockBody(header.getHash()).get();
return new Block(header, body);
}
private CompletableFuture<List<Block>> validateAndImportBlocks(final List<Block> blocks) {
return PersistBlockTask.forSequentialBlocks(
protocolSchedule,
protocolContext,
blocks,
HeaderValidationMode.SKIP_DETACHED,
metricsSystem)
.get();
}
@Override
protected EthTask<List<Block>> createTask(final List<Block> requestedData) {
final Block firstBlock = requestedData.get(0);
final Block lastBlock = requestedData.get(requestedData.size() - 1);
final Block previousBlock = getBlockAtNumber(firstBlock.getHeader().getNumber() - 1);
final MutableBlockchain shortBlockchain = createShortChain(firstBlock.getHeader().getNumber());
final ProtocolContext<Void> modifiedContext =
new ProtocolContext<>(
shortBlockchain,
protocolContext.getWorldStateArchive(),
protocolContext.getConsensusState());
return PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
modifiedContext,
ethContext,
1,
metricsSystem,
createBlockHandler(),
DETACHED_ONLY_VALIDATION_POLICY,
previousBlock.getHeader(),
lastBlock.getHeader());
}
@Override
protected void assertResultMatchesExpectation(
final List<Block> requestedData, final List<Block> response, final EthPeer respondingPeer) {
assertThat(response).isEqualTo(requestedData);
}
@Test
public void betweenContiguousHeadersSucceeds() {
// Setup a responsive peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Setup task and expectations
final Block firstBlock = getBlockAtNumber(5L);
final Block secondBlock = getBlockAtNumber(6L);
final List<Block> expectedResult = Collections.singletonList(secondBlock);
final MutableBlockchain shortBlockchain = createShortChain(firstBlock.getHeader().getNumber());
final ProtocolContext<Void> modifiedContext =
new ProtocolContext<>(
shortBlockchain,
protocolContext.getWorldStateArchive(),
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
modifiedContext,
ethContext,
1,
metricsSystem,
createBlockHandler(),
DETACHED_ONLY_VALIDATION_POLICY,
firstBlock.getHeader(),
secondBlock.getHeader());
// Sanity check
assertThat(shortBlockchain.contains(secondBlock.getHash())).isFalse();
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);
final CompletableFuture<List<Block>> future = task.run();
respondingPeer.respond(responder);
future.whenComplete(
(result, error) -> {
actualResult.set(result);
done.compareAndSet(false, true);
});
assertThat(done).isTrue();
assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer());
}
@Test
public void betweenUnconnectedHeadersFails() {
final BlockDataGenerator gen = new BlockDataGenerator();
// Setup a responsive peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Setup data
final Block fakeFirstBlock = gen.block(BlockOptions.create().setBlockNumber(5L));
final Block firstBlock = getBlockAtNumber(5L);
final Block secondBlock = getBlockAtNumber(6L);
final Block thirdBlock = getBlockAtNumber(7L);
// Setup task
final MutableBlockchain shortBlockchain = createShortChain(firstBlock.getHeader().getNumber());
final ProtocolContext<Void> modifiedContext =
new ProtocolContext<>(
shortBlockchain,
protocolContext.getWorldStateArchive(),
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
modifiedContext,
ethContext,
1,
metricsSystem,
createBlockHandler(),
DETACHED_ONLY_VALIDATION_POLICY,
fakeFirstBlock.getHeader(),
thirdBlock.getHeader());
// Sanity check
assertThat(shortBlockchain.contains(secondBlock.getHash())).isFalse();
// Execute task and wait for response
final AtomicReference<Throwable> actualError = new AtomicReference<>();
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);
final CompletableFuture<List<Block>> future = task.run();
respondingPeer.respond(responder);
future.whenComplete(
(result, error) -> {
actualResult.set(result);
actualError.set(error);
done.compareAndSet(false, true);
});
assertThat(done).isTrue();
assertThat(actualResult.get()).isNull();
assertThat(actualError.get()).hasCauseInstanceOf(InvalidBlockException.class);
}
@Test
public void shouldSyncInSequencesOfChunksSequentially() {
// Setup a responsive peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Setup task for three chunks
final List<BlockHeader> checkpointHeaders =
LongStream.range(0, 13)
.filter(n -> n % 4 == 0)
.mapToObj(this::getBlockAtNumber)
.map(Block::getHeader)
.collect(Collectors.toList());
final List<Block> expectedResult =
LongStream.range(1, 13).mapToObj(this::getBlockAtNumber).collect(Collectors.toList());
final MutableBlockchain shortBlockchain = createShortChain(0);
final ProtocolContext<Void> modifiedContext =
new ProtocolContext<>(
shortBlockchain,
protocolContext.getWorldStateArchive(),
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
modifiedContext,
ethContext,
1,
metricsSystem,
createBlockHandler(),
DETACHED_ONLY_VALIDATION_POLICY,
checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);
final CompletableFuture<List<Block>> future = task.run();
final CountingResponder countingResponder = CountingResponder.wrap(responder);
// Import first segment's headers and bodies
respondingPeer.respondTimes(countingResponder, 2);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(1);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(1);
// Import second segment's headers and bodies
respondingPeer.respondTimes(countingResponder, 2);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(2);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(2);
// Import third segment's headers and bodies
respondingPeer.respondTimes(countingResponder, 2);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(3);
future.whenComplete(
(result, error) -> {
actualResult.set(result);
done.compareAndSet(false, true);
});
assertThat(done).isTrue();
assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer());
}
@Test
public void shouldPipelineChainSegmentImportsUpToMaxActiveChunks() {
// Setup a responsive peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Setup task and expectations
final List<BlockHeader> checkpointHeaders =
LongStream.range(0, 13)
.filter(n -> n % 4 == 0)
.mapToObj(this::getBlockAtNumber)
.map(Block::getHeader)
.collect(Collectors.toList());
final List<Block> expectedResult =
LongStream.range(1, 13).mapToObj(this::getBlockAtNumber).collect(Collectors.toList());
final MutableBlockchain shortBlockchain = createShortChain(0);
final ProtocolContext<Void> modifiedContext =
new ProtocolContext<>(
shortBlockchain,
protocolContext.getWorldStateArchive(),
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
modifiedContext,
ethContext,
2,
metricsSystem,
createBlockHandler(),
DETACHED_ONLY_VALIDATION_POLICY,
checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);
final CompletableFuture<List<Block>> future = task.run();
final CountingResponder countingResponder = CountingResponder.wrap(responder);
// Import first segment's header
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(1);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(0);
// Import first segment's body and second segment's header
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(2);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(1);
// Import second segment's body and third segment's header
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(2);
// Import third segment's body
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(3);
future.whenComplete(
(result, error) -> {
actualResult.set(result);
done.compareAndSet(false, true);
});
assertThat(done).isTrue();
assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer());
}
@Test
public void shouldPipelineChainSegmentImportsWithinMaxActiveChunks() {
// Setup a responsive peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Setup task and expectations
final List<BlockHeader> checkpointHeaders =
LongStream.range(0, 13)
.filter(n -> n % 4 == 0)
.mapToObj(this::getBlockAtNumber)
.map(Block::getHeader)
.collect(Collectors.toList());
final List<Block> expectedResult =
LongStream.range(1, 13).mapToObj(this::getBlockAtNumber).collect(Collectors.toList());
final MutableBlockchain shortBlockchain = createShortChain(0);
final ProtocolContext<Void> modifiedContext =
new ProtocolContext<>(
shortBlockchain,
protocolContext.getWorldStateArchive(),
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
modifiedContext,
ethContext,
3,
metricsSystem,
createBlockHandler(),
DETACHED_ONLY_VALIDATION_POLICY,
checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);
final CompletableFuture<List<Block>> future = task.run();
final CountingResponder countingResponder = CountingResponder.wrap(responder);
// Import first segment's header
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(1);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(0);
// Import first segment's body and second segment's header
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(2);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(1);
// Import second segment's body and third segment's header
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(2);
// Import third segment's body
respondingPeer.respond(countingResponder);
assertThat(countingResponder.getBlockHeaderMessages()).isEqualTo(3);
assertThat(countingResponder.getBlockBodiesMessages()).isEqualTo(3);
future.whenComplete(
(result, error) -> {
actualResult.set(result);
done.compareAndSet(false, true);
});
assertThat(done).isTrue();
assertResultMatchesExpectation(expectedResult, actualResult.get(), respondingPeer.getEthPeer());
}
private MutableBlockchain createShortChain(final long lastBlockToInclude) {
final BlockHeader genesisHeader =
blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get();
final BlockBody genesisBody = blockchain.getBlockBody(genesisHeader.getHash()).get();
final Block genesisBlock = new Block(genesisHeader, genesisBody);
final MutableBlockchain shortChain = createInMemoryBlockchain(genesisBlock);
long nextBlock = genesisHeader.getNumber() + 1;
while (nextBlock <= lastBlockToInclude) {
final BlockHeader header = blockchain.getBlockHeader(nextBlock).get();
final BlockBody body = blockchain.getBlockBody(header.getHash()).get();
final List<TransactionReceipt> receipts = blockchain.getTxReceipts(header.getHash()).get();
final Block block = new Block(header, body);
shortChain.appendBlock(block, receipts);
nextBlock++;
}
return shortChain;
}
private FullSyncBlockHandler<Void> createBlockHandler() {
return new FullSyncBlockHandler<>(protocolSchedule, protocolContext, ethContext, metricsSystem);
}
private static class CountingResponder implements Responder {
private final Responder delegate;
private int getBlockHeaderMessages = 0;
private int getBlockBodiesMessages = 0;
private int getReceiptsMessages = 0;
private int getNodeDataMessages = 0;
private static CountingResponder wrap(final Responder delegate) {
return new CountingResponder(delegate);
}
private CountingResponder(final Responder delegate) {
this.delegate = delegate;
}
@Override
public Optional<MessageData> respond(final Capability cap, final MessageData msg) {
final MessageData response = null;
switch (msg.getCode()) {
case EthPV62.GET_BLOCK_HEADERS:
getBlockHeaderMessages++;
break;
case EthPV62.GET_BLOCK_BODIES:
getBlockBodiesMessages++;
break;
case EthPV63.GET_RECEIPTS:
getReceiptsMessages++;
break;
case EthPV63.GET_NODE_DATA:
getNodeDataMessages++;
break;
}
return delegate.respond(cap, msg);
}
public int getBlockHeaderMessages() {
return getBlockHeaderMessages;
}
public int getBlockBodiesMessages() {
return getBlockBodiesMessages;
}
public int getReceiptsMessages() {
return getReceiptsMessages;
}
public int getNodeDataMessages() {
return getNodeDataMessages;
}
}
}
Loading…
Cancel
Save