From ef49f9b3e85dc31db8a6a89111cada4a0613b6a5 Mon Sep 17 00:00:00 2001 From: Danno Ferrin Date: Sun, 4 Nov 2018 10:37:56 +0100 Subject: [PATCH] NC-1815 Rinkeby import can stall with too many fragments revision As recommended in #231 alter the API so that the subclasses always return their list from executePeerTask in the `CompletableFuture` and reset the retry in `#executeTask()` when any non-empty list is returned. --- .../eth/manager/AbstractRetryingPeerTask.java | 27 +++++++------------ .../eth/sync/tasks/CompleteBlocksTask.java | 16 +++-------- .../tasks/DownloadHeaderSequenceTask.java | 3 +-- 3 files changed, 15 insertions(+), 31 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java index c6f8a8df27..d86dc22e8a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java @@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; import tech.pegasys.pantheon.util.ExceptionUtils; import java.time.Duration; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import org.apache.logging.log4j.LogManager; @@ -25,16 +26,12 @@ import org.apache.logging.log4j.Logger; /** * A task that will retry a fixed number of times before completing the associated CompletableFuture - * exceptionally with a new {@link MaxRetriesReachedException}. + * exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link + * #executePeerTask()} is complete with a non-empty list the retry counter is reset. * - *

As an additional semantic subclasses may call {@link #resetRetryCounter} so that if they have - * partial success they can reset the retry counter and only count zero progress retries against the - * exception limit. If this facility is used only consecutive zero progress retries count against - * the maximum retries limit. - * - * @param The type of the CompletableFuture this task will return. + * @param The type as a typed list that the peer task can get partial or full results in. */ -public abstract class AbstractRetryingPeerTask extends AbstractEthTask { +public abstract class AbstractRetryingPeerTask> extends AbstractEthTask { private static final Logger LOG = LogManager.getLogger(); private final EthContext ethContext; @@ -68,12 +65,16 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { if (error != null) { handleTaskError(error); } else { + // If we get a partial success reset the retry counter. + if (peerResult.size() > 0) { + retryCount = 0; + } executeTask(); } }); } - protected abstract CompletableFuture executePeerTask(); + protected abstract CompletableFuture executePeerTask(); private void handleTaskError(final Throwable error) { final Throwable cause = ExceptionUtils.rootCause(error); @@ -106,13 +107,5 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { ethContext.getScheduler().scheduleFutureTask(this::executeTask, Duration.ofSeconds(1))); } - /** - * Reset the retryCounter. Once called executeTask will get a fresh set of retries to complete the - * task. - */ - protected void resetRetryCounter() { - retryCount = 0; - } - protected abstract boolean isRetryableError(Throwable error); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java index a71ffec344..f67807fb0b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -83,7 +83,7 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> } @Override - protected CompletableFuture executePeerTask() { + protected CompletableFuture> executePeerTask() { return requestBodies().thenCompose(this::processBodiesResult); } @@ -117,26 +117,18 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> }); } - private CompletableFuture processBodiesResult( + private CompletableFuture> processBodiesResult( final PeerTaskResult> blocksResult) { - final int startingIncompleteHeaders = incompleteHeaders().size(); blocksResult.getResult().forEach((block) -> blocks.put(block.getHeader().getNumber(), block)); - final int endingIncompleteHeaders = incompleteHeaders().size(); - final boolean done = endingIncompleteHeaders == 0; - if (done) { + if (incompleteHeaders().size() == 0) { result .get() .complete( headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList())); - } else if (endingIncompleteHeaders < startingIncompleteHeaders) { - // If we made any progress reset the retry counter - resetRetryCounter(); } - final CompletableFuture future = new CompletableFuture<>(); - future.complete(null); - return future; + return CompletableFuture.completedFuture(blocksResult.getResult()); } private List incompleteHeaders() { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 15304d68a6..2b5f249e51 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -106,7 +106,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask executePeerTask() { + protected CompletableFuture> executePeerTask() { LOG.debug( "Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber() - 1); final CompletableFuture> task = @@ -163,7 +163,6 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask