NC-1815 Rinkeby import can stall with too many fragments (#231)

A more robust fix: be tolerant of partial success.  Instead of counting
every call as a limited retry change the semantics of
AbstractRetryingPeerTask such that it's retryCounter may be reset on a
partial success.  Hence the limit is on consecutive zero progress
retries and large blocks dribbled in one at a time are not penalized.

Undo temp fix.  Restore retries to 3 in a row.
Danno Ferrin 6 years ago committed by Adrian Sutton
parent c86193b423
commit 94f6d4e1a3
  1. 34
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java
  2. 16
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java
  3. 3
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java
  4. 26
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java
  5. 19
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java
  6. 55
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java

@ -23,13 +23,28 @@ import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* A task that will retry a fixed number of times before completing the associated CompletableFuture
* exceptionally with a new {@link MaxRetriesReachedException}.
*
* <p>As an additional semantic subclasses may call {@link #resetRetryCounter} so that if they have
* partial success they can reset the retry counter and only count zero progress retries against the
* exception limit. If this facility is used only consecutive zero progress retries count against
* the maximum retries limit.
*
* @param <T> The type of the CompletableFuture this task will return.
*/
public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext;
private final int maxRetries;
private int requestCount = 0;
private int retryCount = 0;
/**
* @param ethContext The context of the current Eth network we are attached to.
* @param maxRetries Maximum number of retries to accept before completing exceptionally.
*/
public AbstractRetryingPeerTask(final EthContext ethContext, final int maxRetries) {
this.ethContext = ethContext;
this.maxRetries = maxRetries;
@ -41,12 +56,12 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
// Return if task is done
return;
}
if (requestCount > maxRetries) {
if (retryCount > maxRetries) {
result.get().completeExceptionally(new MaxRetriesReachedException());
return;
}
requestCount += 1;
retryCount += 1;
executePeerTask()
.whenComplete(
(peerResult, error) -> {
@ -77,10 +92,7 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
ethContext
.getScheduler()
.timeout(waitTask, Duration.ofSeconds(5))
.whenComplete(
(r, t) -> {
executeTask();
}));
.whenComplete((r, t) -> executeTask()));
return;
}
@ -94,5 +106,13 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
ethContext.getScheduler().scheduleFutureTask(this::executeTask, Duration.ofSeconds(1)));
}
/**
* Reset the retryCounter. Once called executeTask will get a fresh set of retries to complete the
* task.
*/
protected void resetRetryCounter() {
retryCount = 0;
}
protected abstract boolean isRetryableError(Throwable error);
}

@ -44,7 +44,7 @@ import org.apache.logging.log4j.Logger;
*/
public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>> {
private static final Logger LOG = LogManager.getLogger();
private static final int DEFAULT_RETRIES = 20;
private static final int DEFAULT_RETRIES = 3;
private final EthContext ethContext;
private final ProtocolSchedule<C> protocolSchedule;
@ -119,19 +119,19 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
private CompletableFuture<Void> processBodiesResult(
final PeerTaskResult<List<Block>> blocksResult) {
blocksResult
.getResult()
.forEach(
(block) -> {
blocks.put(block.getHeader().getNumber(), block);
});
final int startingIncompleteHeaders = incompleteHeaders().size();
blocksResult.getResult().forEach((block) -> blocks.put(block.getHeader().getNumber(), block));
final boolean done = incompleteHeaders().size() == 0;
final int endingIncompleteHeaders = incompleteHeaders().size();
final boolean done = endingIncompleteHeaders == 0;
if (done) {
result
.get()
.complete(
headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList()));
} else if (endingIncompleteHeaders < startingIncompleteHeaders) {
// If we made any progress reset the retry counter
resetRetryCounter();
}
final CompletableFuture<Void> future = new CompletableFuture<>();

@ -47,7 +47,7 @@ import org.apache.logging.log4j.Logger;
*/
public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List<BlockHeader>> {
private static final Logger LOG = LogManager.getLogger();
private static final int DEFAULT_RETRIES = 20;
private static final int DEFAULT_RETRIES = 3;
private final EthContext ethContext;
private final ProtocolContext<C> protocolContext;
@ -163,6 +163,7 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
BlockHeader child = null;
boolean firstSkipped = false;
for (final BlockHeader header : headersResult.getResult()) {
resetRetryCounter();
final int headerIndex =
Ints.checkedCast(
segmentLength - (referenceHeader.getNumber() - header.getNumber()));

@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;
import static com.google.common.base.Preconditions.checkArgument;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
@ -143,10 +145,7 @@ public class RespondingEthPeer {
}
}
/**
* @param responder
* @return True if any requests were processed
*/
/** @return True if any requests were processed */
public boolean respond(final Responder responder) {
// Respond to queued messages
final List<OutgoingMessage> currentMessages = new ArrayList<>(outgoingMessages);
@ -207,8 +206,17 @@ public class RespondingEthPeer {
};
}
/**
* Create a responder that only responds with a fixed portion of the available data.
*
* @param portion The portion of the available data to return, from 0 to 1
*/
public static <C> Responder partialResponder(
final Blockchain blockchain, final ProtocolSchedule<C> protocolSchedule) {
final Blockchain blockchain,
final ProtocolSchedule<C> protocolSchedule,
final float portion) {
checkArgument(portion >= 0.0 && portion <= 1.0, "Portion is in the range [0.0..1.0]");
final Responder fullResponder = blockchainResponder(blockchain);
return (cap, msg) -> {
final Optional<MessageData> maybeResponse = fullResponder.respond(cap, msg);
@ -225,7 +233,7 @@ public class RespondingEthPeer {
final List<BlockHeader> originalHeaders =
Lists.newArrayList(headersMessage.getHeaders(protocolSchedule));
final List<BlockHeader> partialHeaders =
originalHeaders.subList(0, originalHeaders.size() / 2);
originalHeaders.subList(0, (int) (originalHeaders.size() * portion));
partialResponse = BlockHeadersMessage.create(partialHeaders);
} finally {
headersMessage.release();
@ -237,7 +245,7 @@ public class RespondingEthPeer {
final List<BlockBody> originalBodies =
Lists.newArrayList(bodiesMessage.bodies(protocolSchedule));
final List<BlockBody> partialBodies =
originalBodies.subList(0, originalBodies.size() / 2);
originalBodies.subList(0, (int) (originalBodies.size() * portion));
partialResponse = BlockBodiesMessage.create(partialBodies);
} finally {
bodiesMessage.release();
@ -249,7 +257,7 @@ public class RespondingEthPeer {
final List<List<TransactionReceipt>> originalReceipts =
Lists.newArrayList(receiptsMessage.receipts());
final List<List<TransactionReceipt>> partialReceipts =
originalReceipts.subList(0, originalReceipts.size() / 2);
originalReceipts.subList(0, (int) (originalReceipts.size() * portion));
partialResponse = ReceiptsMessage.create(partialReceipts);
} finally {
receiptsMessage.release();
@ -261,7 +269,7 @@ public class RespondingEthPeer {
final List<BytesValue> originalNodeData =
Lists.newArrayList(nodeDataMessage.nodeData());
final List<BytesValue> partialNodeData =
originalNodeData.subList(0, originalNodeData.size() / 2);
originalNodeData.subList(0, (int) (originalNodeData.size() * portion));
partialResponse = NodeDataMessage.create(partialNodeData);
} finally {
nodeDataMessage.release();

@ -25,7 +25,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException.Fa
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -39,10 +38,10 @@ import org.junit.Test;
*/
public abstract class PeerMessageTaskTest<T> extends AbstractMessageTaskTest<T, PeerTaskResult<T>> {
@Test
public void completesWhenPeerReturnsPartialResult()
throws ExecutionException, InterruptedException {
public void completesWhenPeerReturnsPartialResult() {
// Setup a partially responsive peer
final Responder responder = RespondingEthPeer.partialResponder(blockchain, protocolSchedule);
final Responder responder =
RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f);
final RespondingEthPeer respondingEthPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
@ -67,7 +66,7 @@ public abstract class PeerMessageTaskTest<T> extends AbstractMessageTaskTest<T,
}
@Test
public void failsWhenNoPeersAreAvailable() throws ExecutionException, InterruptedException {
public void failsWhenNoPeersAreAvailable() {
// Setup data to be requested
final T requestedData = generateDataToBeRequested();
@ -75,10 +74,7 @@ public abstract class PeerMessageTaskTest<T> extends AbstractMessageTaskTest<T,
final EthTask<PeerTaskResult<T>> task = createTask(requestedData);
final CompletableFuture<PeerTaskResult<T>> future = task.run();
final AtomicReference<Throwable> failure = new AtomicReference<>();
future.whenComplete(
(r, t) -> {
failure.set(t);
});
future.whenComplete((r, t) -> failure.set(t));
assertThat(future.isCompletedExceptionally()).isTrue();
assertThat(failure.get()).isNotNull();
@ -108,10 +104,7 @@ public abstract class PeerMessageTaskTest<T> extends AbstractMessageTaskTest<T,
final EthTask<PeerTaskResult<T>> task = createTask(requestedData);
final CompletableFuture<PeerTaskResult<T>> future = task.run();
respondingEthPeer.respondWhile(responder, () -> !future.isDone());
future.whenComplete(
(response, error) -> {
done.compareAndSet(false, true);
});
future.whenComplete((response, error) -> done.compareAndSet(false, true));
assertThat(future.isDone()).isTrue();
assertThat(future.isCompletedExceptionally()).isFalse();
}

@ -24,7 +24,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedEx
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
@ -48,12 +47,13 @@ public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest
}
@Test
public void failsWhenPeerRepeatedlyReturnsPartialResult()
throws ExecutionException, InterruptedException {
public void failsWhenPeerReturnsPartialResultThenStops() {
// Setup data to be requested and expected response
// Setup a partially responsive peer
final Responder responder = RespondingEthPeer.partialResponder(blockchain, protocolSchedule);
// Setup a partially responsive peer and a non-responsive peer
final Responder partialResponder =
RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f);
final Responder emptyResponder = RespondingEthPeer.emptyResponder();
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
@ -62,20 +62,51 @@ public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest
final EthTask<T> task = createTask(requestedData);
final CompletableFuture<T> future = task.run();
// Respond max times
respondingPeer.respondTimes(responder, maxRetries);
// Respond once with no data
respondingPeer.respond(emptyResponder);
assertThat(future.isDone()).isFalse();
// Respond once with partial data, this should reset failures
respondingPeer.respond(partialResponder);
assertThat(future.isDone()).isFalse();
// Respond max times with no data
respondingPeer.respondTimes(emptyResponder, maxRetries);
assertThat(future.isDone()).isFalse();
// Next retry should fail
respondingPeer.respond(responder);
respondingPeer.respond(emptyResponder);
assertThat(future.isDone()).isTrue();
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
@Test
public void doesNotCompleteWhenPeersAreUnavailable()
public void completesWhenPeerReturnsPartialResult()
throws ExecutionException, InterruptedException {
// Setup data to be requested and expected response
// Setup a partially responsive peer
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Execute task and wait for response
final T requestedData = generateDataToBeRequested();
final EthTask<T> task = createTask(requestedData);
final CompletableFuture<T> future = task.run();
// Respond with partial data up until complete.
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.25f));
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.50f));
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.75f));
respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 1.0f));
assertThat(future.isDone()).isTrue();
assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer());
}
@Test
public void doesNotCompleteWhenPeersAreUnavailable() {
// Setup data to be requested
final T requestedData = generateDataToBeRequested();
@ -87,7 +118,7 @@ public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest
@Test
public void completesWhenPeersAreTemporarilyUnavailable()
throws ExecutionException, InterruptedException, TimeoutException {
throws ExecutionException, InterruptedException {
// Setup data to be requested
final T requestedData = generateDataToBeRequested();
@ -108,7 +139,7 @@ public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest
@Test
public void completeWhenPeersTimeoutTemporarily()
throws ExecutionException, InterruptedException, TimeoutException {
throws ExecutionException, InterruptedException {
peerCountToTimeout.set(1);
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
final RespondingEthPeer respondingPeer =
@ -125,7 +156,7 @@ public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest
}
@Test
public void failsWhenPeersSendEmptyResponses() throws ExecutionException, InterruptedException {
public void failsWhenPeersSendEmptyResponses() {
// Setup a unresponsive peer
final Responder responder = RespondingEthPeer.emptyResponder();
final RespondingEthPeer respondingPeer =

Loading…
Cancel
Save