diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java index 8a7fb313a9..c6f8a8df27 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java @@ -23,13 +23,28 @@ import java.util.concurrent.CompletableFuture; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +/** + * A task that will retry a fixed number of times before completing the associated CompletableFuture + * exceptionally with a new {@link MaxRetriesReachedException}. + * + *

As an additional semantic subclasses may call {@link #resetRetryCounter} so that if they have + * partial success they can reset the retry counter and only count zero progress retries against the + * exception limit. If this facility is used only consecutive zero progress retries count against + * the maximum retries limit. + * + * @param The type of the CompletableFuture this task will return. + */ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { private static final Logger LOG = LogManager.getLogger(); private final EthContext ethContext; private final int maxRetries; - private int requestCount = 0; + private int retryCount = 0; + /** + * @param ethContext The context of the current Eth network we are attached to. + * @param maxRetries Maximum number of retries to accept before completing exceptionally. + */ public AbstractRetryingPeerTask(final EthContext ethContext, final int maxRetries) { this.ethContext = ethContext; this.maxRetries = maxRetries; @@ -41,12 +56,12 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { // Return if task is done return; } - if (requestCount > maxRetries) { + if (retryCount > maxRetries) { result.get().completeExceptionally(new MaxRetriesReachedException()); return; } - requestCount += 1; + retryCount += 1; executePeerTask() .whenComplete( (peerResult, error) -> { @@ -77,10 +92,7 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { ethContext .getScheduler() .timeout(waitTask, Duration.ofSeconds(5)) - .whenComplete( - (r, t) -> { - executeTask(); - })); + .whenComplete((r, t) -> executeTask())); return; } @@ -94,5 +106,13 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { ethContext.getScheduler().scheduleFutureTask(this::executeTask, Duration.ofSeconds(1))); } + /** + * Reset the retryCounter. Once called executeTask will get a fresh set of retries to complete the + * task. + */ + protected void resetRetryCounter() { + retryCount = 0; + } + protected abstract boolean isRetryableError(Throwable error); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java index ff0afcbceb..a71ffec344 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -44,7 +44,7 @@ import org.apache.logging.log4j.Logger; */ public class CompleteBlocksTask extends AbstractRetryingPeerTask> { private static final Logger LOG = LogManager.getLogger(); - private static final int DEFAULT_RETRIES = 20; + private static final int DEFAULT_RETRIES = 3; private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; @@ -119,19 +119,19 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> private CompletableFuture processBodiesResult( final PeerTaskResult> blocksResult) { - blocksResult - .getResult() - .forEach( - (block) -> { - blocks.put(block.getHeader().getNumber(), block); - }); - - final boolean done = incompleteHeaders().size() == 0; + final int startingIncompleteHeaders = incompleteHeaders().size(); + blocksResult.getResult().forEach((block) -> blocks.put(block.getHeader().getNumber(), block)); + + final int endingIncompleteHeaders = incompleteHeaders().size(); + final boolean done = endingIncompleteHeaders == 0; if (done) { result .get() .complete( headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList())); + } else if (endingIncompleteHeaders < startingIncompleteHeaders) { + // If we made any progress reset the retry counter + resetRetryCounter(); } final CompletableFuture future = new CompletableFuture<>(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 9f7ea55f0a..15304d68a6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -47,7 +47,7 @@ import org.apache.logging.log4j.Logger; */ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask> { private static final Logger LOG = LogManager.getLogger(); - private static final int DEFAULT_RETRIES = 20; + private static final int DEFAULT_RETRIES = 3; private final EthContext ethContext; private final ProtocolContext protocolContext; @@ -163,6 +163,7 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask currentMessages = new ArrayList<>(outgoingMessages); @@ -207,8 +206,17 @@ public class RespondingEthPeer { }; } + /** + * Create a responder that only responds with a fixed portion of the available data. + * + * @param portion The portion of the available data to return, from 0 to 1 + */ public static Responder partialResponder( - final Blockchain blockchain, final ProtocolSchedule protocolSchedule) { + final Blockchain blockchain, + final ProtocolSchedule protocolSchedule, + final float portion) { + checkArgument(portion >= 0.0 && portion <= 1.0, "Portion is in the range [0.0..1.0]"); + final Responder fullResponder = blockchainResponder(blockchain); return (cap, msg) -> { final Optional maybeResponse = fullResponder.respond(cap, msg); @@ -225,7 +233,7 @@ public class RespondingEthPeer { final List originalHeaders = Lists.newArrayList(headersMessage.getHeaders(protocolSchedule)); final List partialHeaders = - originalHeaders.subList(0, originalHeaders.size() / 2); + originalHeaders.subList(0, (int) (originalHeaders.size() * portion)); partialResponse = BlockHeadersMessage.create(partialHeaders); } finally { headersMessage.release(); @@ -237,7 +245,7 @@ public class RespondingEthPeer { final List originalBodies = Lists.newArrayList(bodiesMessage.bodies(protocolSchedule)); final List partialBodies = - originalBodies.subList(0, originalBodies.size() / 2); + originalBodies.subList(0, (int) (originalBodies.size() * portion)); partialResponse = BlockBodiesMessage.create(partialBodies); } finally { bodiesMessage.release(); @@ -249,7 +257,7 @@ public class RespondingEthPeer { final List> originalReceipts = Lists.newArrayList(receiptsMessage.receipts()); final List> partialReceipts = - originalReceipts.subList(0, originalReceipts.size() / 2); + originalReceipts.subList(0, (int) (originalReceipts.size() * portion)); partialResponse = ReceiptsMessage.create(partialReceipts); } finally { receiptsMessage.release(); @@ -261,7 +269,7 @@ public class RespondingEthPeer { final List originalNodeData = Lists.newArrayList(nodeDataMessage.nodeData()); final List partialNodeData = - originalNodeData.subList(0, originalNodeData.size() / 2); + originalNodeData.subList(0, (int) (originalNodeData.size() * portion)); partialResponse = NodeDataMessage.create(partialNodeData); } finally { nodeDataMessage.release(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java index 0970f6dede..528fea4da7 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java @@ -25,7 +25,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException.Fa import tech.pegasys.pantheon.util.ExceptionUtils; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -39,10 +38,10 @@ import org.junit.Test; */ public abstract class PeerMessageTaskTest extends AbstractMessageTaskTest> { @Test - public void completesWhenPeerReturnsPartialResult() - throws ExecutionException, InterruptedException { + public void completesWhenPeerReturnsPartialResult() { // Setup a partially responsive peer - final Responder responder = RespondingEthPeer.partialResponder(blockchain, protocolSchedule); + final Responder responder = + RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f); final RespondingEthPeer respondingEthPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); @@ -67,7 +66,7 @@ public abstract class PeerMessageTaskTest extends AbstractMessageTaskTest extends AbstractMessageTaskTest> task = createTask(requestedData); final CompletableFuture> future = task.run(); final AtomicReference failure = new AtomicReference<>(); - future.whenComplete( - (r, t) -> { - failure.set(t); - }); + future.whenComplete((r, t) -> failure.set(t)); assertThat(future.isCompletedExceptionally()).isTrue(); assertThat(failure.get()).isNotNull(); @@ -108,10 +104,7 @@ public abstract class PeerMessageTaskTest extends AbstractMessageTaskTest> task = createTask(requestedData); final CompletableFuture> future = task.run(); respondingEthPeer.respondWhile(responder, () -> !future.isDone()); - future.whenComplete( - (response, error) -> { - done.compareAndSet(false, true); - }); + future.whenComplete((response, error) -> done.compareAndSet(false, true)); assertThat(future.isDone()).isTrue(); assertThat(future.isCompletedExceptionally()).isFalse(); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java index 46187e7a43..e00405b483 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java @@ -24,7 +24,6 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedEx import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import org.junit.Test; @@ -48,12 +47,13 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest } @Test - public void failsWhenPeerRepeatedlyReturnsPartialResult() - throws ExecutionException, InterruptedException { + public void failsWhenPeerReturnsPartialResultThenStops() { // Setup data to be requested and expected response - // Setup a partially responsive peer - final Responder responder = RespondingEthPeer.partialResponder(blockchain, protocolSchedule); + // Setup a partially responsive peer and a non-responsive peer + final Responder partialResponder = + RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.5f); + final Responder emptyResponder = RespondingEthPeer.emptyResponder(); final RespondingEthPeer respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); @@ -62,20 +62,51 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest final EthTask task = createTask(requestedData); final CompletableFuture future = task.run(); - // Respond max times - respondingPeer.respondTimes(responder, maxRetries); + // Respond once with no data + respondingPeer.respond(emptyResponder); + assertThat(future.isDone()).isFalse(); + + // Respond once with partial data, this should reset failures + respondingPeer.respond(partialResponder); + assertThat(future.isDone()).isFalse(); + + // Respond max times with no data + respondingPeer.respondTimes(emptyResponder, maxRetries); assertThat(future.isDone()).isFalse(); // Next retry should fail - respondingPeer.respond(responder); + respondingPeer.respond(emptyResponder); assertThat(future.isDone()).isTrue(); assertThat(future.isCompletedExceptionally()).isTrue(); assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); } @Test - public void doesNotCompleteWhenPeersAreUnavailable() + public void completesWhenPeerReturnsPartialResult() throws ExecutionException, InterruptedException { + // Setup data to be requested and expected response + + // Setup a partially responsive peer + final RespondingEthPeer respondingPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager); + + // Execute task and wait for response + final T requestedData = generateDataToBeRequested(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + // Respond with partial data up until complete. + respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.25f)); + respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.50f)); + respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 0.75f)); + respondingPeer.respond(RespondingEthPeer.partialResponder(blockchain, protocolSchedule, 1.0f)); + + assertThat(future.isDone()).isTrue(); + assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); + } + + @Test + public void doesNotCompleteWhenPeersAreUnavailable() { // Setup data to be requested final T requestedData = generateDataToBeRequested(); @@ -87,7 +118,7 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest @Test public void completesWhenPeersAreTemporarilyUnavailable() - throws ExecutionException, InterruptedException, TimeoutException { + throws ExecutionException, InterruptedException { // Setup data to be requested final T requestedData = generateDataToBeRequested(); @@ -108,7 +139,7 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest @Test public void completeWhenPeersTimeoutTemporarily() - throws ExecutionException, InterruptedException, TimeoutException { + throws ExecutionException, InterruptedException { peerCountToTimeout.set(1); final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); final RespondingEthPeer respondingPeer = @@ -125,7 +156,7 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest } @Test - public void failsWhenPeersSendEmptyResponses() throws ExecutionException, InterruptedException { + public void failsWhenPeersSendEmptyResponses() { // Setup a unresponsive peer final Responder responder = RespondingEthPeer.emptyResponder(); final RespondingEthPeer respondingPeer =