NC-1815 Rinkeby import can stall with too many fragments (#255)

Add tests that hit the retry failure case

Move "complete what we just got" semantic into the
DownloadHeaderSequenceTask.  Previously it was completing everything
in the chunk it was assigned.
Danno Ferrin 6 years ago committed by GitHub
parent c8baa9ae76
commit fc4be9e6c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java
  2. 75
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java
  3. 16
      ethereum/eth/src/test/resources/log4j2.xml

@ -114,7 +114,7 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
return task.whenComplete( return task.whenComplete(
(r, t) -> { (r, t) -> {
// We're done if we've filled all requested headers // We're done if we've filled all requested headers
if (r != null && r.size() == segmentLength) { if (lastFilledHeaderIndex == 0) {
LOG.debug( LOG.debug(
"Finished downloading headers from {} to {}.", "Finished downloading headers from {} to {}.",
startingBlockNumber, startingBlockNumber,
@ -162,6 +162,7 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
final CompletableFuture<List<BlockHeader>> future = new CompletableFuture<>(); final CompletableFuture<List<BlockHeader>> future = new CompletableFuture<>();
BlockHeader child = null; BlockHeader child = null;
boolean firstSkipped = false; boolean firstSkipped = false;
final int previousHeaderIndex = lastFilledHeaderIndex;
for (final BlockHeader header : headersResult.getResult()) { for (final BlockHeader header : headersResult.getResult()) {
final int headerIndex = final int headerIndex =
Ints.checkedCast( Ints.checkedCast(
@ -191,7 +192,7 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
lastFilledHeaderIndex = headerIndex; lastFilledHeaderIndex = headerIndex;
child = header; child = header;
} }
future.complete(asList(headers).subList(lastFilledHeaderIndex, segmentLength)); future.complete(asList(headers).subList(lastFilledHeaderIndex, previousHeaderIndex));
return future; return future;
}); });
} }

@ -12,12 +12,29 @@
*/ */
package tech.pegasys.pantheon.ethereum.eth.sync.tasks; package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.google.common.collect.Streams;
import org.junit.Test;
public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List<BlockHeader>> { public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List<BlockHeader>> {
@ -44,4 +61,62 @@ public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List
requestedData.size(), requestedData.size(),
maxRetries); maxRetries);
} }
@Test
public void failsWhenPeerReturnsOnlyReferenceHeader() {
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Execute task and wait for response
BlockHeader referenceHeader = blockchain.getChainHeadHeader();
final EthTask<List<BlockHeader>> task =
DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries);
final CompletableFuture<List<BlockHeader>> future = task.run();
// Respond with only the reference header
final Responder responder =
(cap, message) ->
Optional.of(BlockHeadersMessage.create(Collections.singletonList(referenceHeader)));
respondingPeer.respondWhile(responder, () -> !future.isDone());
assertThat(future.isDone()).isTrue();
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
@Test
public void failsWhenPeerReturnsOnlySubsetOfHeaders() {
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Execute task and wait for response
BlockHeader referenceHeader = blockchain.getChainHeadHeader();
final EthTask<List<BlockHeader>> task =
DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries);
final CompletableFuture<List<BlockHeader>> future = task.run();
// Filter response to include only reference header and previous header
final Responder fullResponder = RespondingEthPeer.blockchainResponder(blockchain);
final Responder responder =
(cap, message) -> {
Optional<MessageData> fullResponse = fullResponder.respond(cap, message);
if (!fullResponse.isPresent() || message.getCode() != EthPV62.GET_BLOCK_HEADERS) {
return fullResponse;
}
BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(fullResponse.get());
// Filter for a subset of headers
List<BlockHeader> headerSubset =
Streams.stream(headersMessage.getHeaders(protocolSchedule))
.filter(h -> h.getNumber() >= referenceHeader.getNumber() - 1L)
.collect(Collectors.toList());
return Optional.of(BlockHeadersMessage.create(headerSubset));
};
respondingPeer.respondTimes(responder, 100);
assertThat(future.isDone()).isTrue();
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
} }

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Properties>
<Property name="root.log.level">INFO</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSSZZZ} | %t | %-5level | %c{1} | %msg%n" /> </Console>
</Appenders>
<Loggers>
<Root level="${sys:root.log.level}">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
Loading…
Cancel
Save