NC-1815 Rinkeby import can stall with too many fragments revision

As recommended in #231 alter the API so that the
subclasses always return their list from executePeerTask in the
`CompletableFuture` and reset the retry in `#executeTask()` when any
non-empty list is returned.

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent b18c9ccdc0
commit 5c57a3dc44
  1. 27
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java
  2. 16
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java
  3. 3
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java

@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
@ -25,16 +26,12 @@ import org.apache.logging.log4j.Logger;
/**
* A task that will retry a fixed number of times before completing the associated CompletableFuture
* exceptionally with a new {@link MaxRetriesReachedException}.
* exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link
* #executePeerTask()} is complete with a non-empty list the retry counter is reset.
*
* <p>As an additional semantic subclasses may call {@link #resetRetryCounter} so that if they have
* partial success they can reset the retry counter and only count zero progress retries against the
* exception limit. If this facility is used only consecutive zero progress retries count against
* the maximum retries limit.
*
* @param <T> The type of the CompletableFuture this task will return.
* @param <T> The type as a typed list that the peer task can get partial or full results in.
*/
public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends AbstractEthTask<T> {
private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext;
@ -68,12 +65,16 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
if (error != null) {
handleTaskError(error);
} else {
// If we get a partial success reset the retry counter.
if (peerResult.size() > 0) {
retryCount = 0;
}
executeTask();
}
});
}
protected abstract CompletableFuture<?> executePeerTask();
protected abstract CompletableFuture<T> executePeerTask();
private void handleTaskError(final Throwable error) {
final Throwable cause = ExceptionUtils.rootCause(error);
@ -106,13 +107,5 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
ethContext.getScheduler().scheduleFutureTask(this::executeTask, Duration.ofSeconds(1)));
}
/**
* Reset the retryCounter. Once called executeTask will get a fresh set of retries to complete the
* task.
*/
protected void resetRetryCounter() {
retryCount = 0;
}
protected abstract boolean isRetryableError(Throwable error);
}

@ -83,7 +83,7 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
}
@Override
protected CompletableFuture<?> executePeerTask() {
protected CompletableFuture<List<Block>> executePeerTask() {
return requestBodies().thenCompose(this::processBodiesResult);
}
@ -117,26 +117,18 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
});
}
private CompletableFuture<Void> processBodiesResult(
private CompletableFuture<List<Block>> processBodiesResult(
final PeerTaskResult<List<Block>> blocksResult) {
final int startingIncompleteHeaders = incompleteHeaders().size();
blocksResult.getResult().forEach((block) -> blocks.put(block.getHeader().getNumber(), block));
final int endingIncompleteHeaders = incompleteHeaders().size();
final boolean done = endingIncompleteHeaders == 0;
if (done) {
if (incompleteHeaders().size() == 0) {
result
.get()
.complete(
headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList()));
} else if (endingIncompleteHeaders < startingIncompleteHeaders) {
// If we made any progress reset the retry counter
resetRetryCounter();
}
final CompletableFuture<Void> future = new CompletableFuture<>();
future.complete(null);
return future;
return CompletableFuture.completedFuture(blocksResult.getResult());
}
private List<BlockHeader> incompleteHeaders() {

@ -106,7 +106,7 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
}
@Override
protected CompletableFuture<?> executePeerTask() {
protected CompletableFuture<List<BlockHeader>> executePeerTask() {
LOG.debug(
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber() - 1);
final CompletableFuture<List<BlockHeader>> task =
@ -163,7 +163,6 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
BlockHeader child = null;
boolean firstSkipped = false;
for (final BlockHeader header : headersResult.getResult()) {
resetRetryCounter();
final int headerIndex =
Ints.checkedCast(
segmentLength - (referenceHeader.getNumber() - header.getNumber()));

Loading…
Cancel
Save