diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 2b5f249e51..ef4cd750f0 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -114,7 +114,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask { // We're done if we've filled all requested headers - if (r != null && r.size() == segmentLength) { + if (lastFilledHeaderIndex == 0) { LOG.debug( "Finished downloading headers from {} to {}.", startingBlockNumber, @@ -162,6 +162,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask> future = new CompletableFuture<>(); BlockHeader child = null; boolean firstSkipped = false; + final int previousHeaderIndex = lastFilledHeaderIndex; for (final BlockHeader header : headersResult.getResult()) { final int headerIndex = Ints.checkedCast( @@ -191,7 +192,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask> { @@ -44,4 +61,62 @@ public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest> task = + DownloadHeaderSequenceTask.endingAtHeader( + protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries); + final CompletableFuture> future = task.run(); + + // Respond with only the reference header + final Responder responder = + (cap, message) -> + Optional.of(BlockHeadersMessage.create(Collections.singletonList(referenceHeader))); + respondingPeer.respondWhile(responder, () -> !future.isDone()); + + assertThat(future.isDone()).isTrue(); + assertThat(future.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); + } + + @Test + public void failsWhenPeerReturnsOnlySubsetOfHeaders() { + final RespondingEthPeer respondingPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager); + + // Execute task and wait for response + BlockHeader referenceHeader = blockchain.getChainHeadHeader(); + final EthTask> task = + DownloadHeaderSequenceTask.endingAtHeader( + protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries); + final CompletableFuture> future = task.run(); + + // Filter response to include only reference header and previous header + final Responder fullResponder = RespondingEthPeer.blockchainResponder(blockchain); + final Responder responder = + (cap, message) -> { + Optional fullResponse = fullResponder.respond(cap, message); + if (!fullResponse.isPresent() || message.getCode() != EthPV62.GET_BLOCK_HEADERS) { + return fullResponse; + } + BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(fullResponse.get()); + // Filter for a subset of headers + List headerSubset = + Streams.stream(headersMessage.getHeaders(protocolSchedule)) + .filter(h -> h.getNumber() >= referenceHeader.getNumber() - 1L) + .collect(Collectors.toList()); + return Optional.of(BlockHeadersMessage.create(headerSubset)); + }; + respondingPeer.respondTimes(responder, 100); + + assertThat(future.isDone()).isTrue(); + assertThat(future.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); + } } diff --git a/ethereum/eth/src/test/resources/log4j2.xml b/ethereum/eth/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..af05117015 --- /dev/null +++ b/ethereum/eth/src/test/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + INFO + + + + + + + + + + + +