diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java index ee8e7d1c52..199c865345 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java @@ -1,5 +1,6 @@ package tech.pegasys.pantheon.ethereum.eth.manager; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; import tech.pegasys.pantheon.util.ExceptionUtils; @@ -14,9 +15,12 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { private static final Logger LOG = LogManager.getLogger(); private final EthContext ethContext; + private final int maxRetries; + private int requestCount = 0; - public AbstractRetryingPeerTask(final EthContext ethContext) { + public AbstractRetryingPeerTask(final EthContext ethContext, final int maxRetries) { this.ethContext = ethContext; + this.maxRetries = maxRetries; } @Override @@ -25,7 +29,12 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { // Return if task is done return; } + if (requestCount > maxRetries) { + result.get().completeExceptionally(new MaxRetriesReachedException()); + return; + } + requestCount += 1; executePeerTask() .whenComplete( (peerResult, error) -> { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/EthTaskException.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/EthTaskException.java index 912ced6472..7591056a63 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/EthTaskException.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/EthTaskException.java @@ -17,6 +17,7 @@ public class EthTaskException extends RuntimeException { PEER_DISCONNECTED, NO_AVAILABLE_PEERS, PEER_BREACHED_PROTOCOL, - INCOMPLETE_RESULTS + INCOMPLETE_RESULTS, + MAX_RETRIES_REACHED } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/MaxRetriesReachedException.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/MaxRetriesReachedException.java new file mode 100644 index 0000000000..0d1598dd4a --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/MaxRetriesReachedException.java @@ -0,0 +1,8 @@ +package tech.pegasys.pantheon.ethereum.eth.manager.exceptions; + +public class MaxRetriesReachedException extends EthTaskException { + + public MaxRetriesReachedException() { + super(FailureReason.MAX_RETRIES_REACHED); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java index ad1d4a1b7e..28a5db4578 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -32,6 +32,7 @@ import org.apache.logging.log4j.Logger; */ public class CompleteBlocksTask extends AbstractRetryingPeerTask> { private static final Logger LOG = LogManager.getLogger(); + private static final int DEFAULT_RETRIES = 3; private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; @@ -43,8 +44,9 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> private CompleteBlocksTask( final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final List headers) { - super(ethContext); + final List headers, + final int maxRetries) { + super(ethContext, maxRetries); checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; @@ -53,11 +55,19 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask> this.blocks = new HashMap<>(); } + public static CompleteBlocksTask forHeaders( + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final List headers, + final int maxRetries) { + return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, maxRetries); + } + public static CompleteBlocksTask forHeaders( final ProtocolSchedule protocolSchedule, final EthContext ethContext, final List headers) { - return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers); + return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, DEFAULT_RETRIES); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 99a03371f5..de0cf9b148 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.Logger; */ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask> { private static final Logger LOG = LogManager.getLogger(); + private static final int DEFAULT_RETRIES = 3; private final EthContext ethContext; private final ProtocolContext protocolContext; @@ -52,8 +53,9 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask protocolContext, final EthContext ethContext, final BlockHeader referenceHeader, - final int segmentLength) { - super(ethContext); + final int segmentLength, + final int maxRetries) { + super(ethContext, maxRetries); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; @@ -65,6 +67,17 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask DownloadHeaderSequenceTask endingAtHeader( + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final BlockHeader referenceHeader, + final int segmentLength, + final int maxRetries) { + return new DownloadHeaderSequenceTask<>( + protocolSchedule, protocolContext, ethContext, referenceHeader, segmentLength, maxRetries); + } + public static DownloadHeaderSequenceTask endingAtHeader( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, @@ -72,7 +85,12 @@ public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask( - protocolSchedule, protocolContext, ethContext, referenceHeader, segmentLength); + protocolSchedule, + protocolContext, + ethContext, + referenceHeader, + segmentLength, + DEFAULT_RETRIES); } @Override diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java index 4f27a1cde6..2435ba5d83 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java @@ -23,6 +23,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +/** + * @param The type of data being requested from the network + * @param The type of data returned from the network + */ public abstract class AbstractMessageTaskTest { protected static Blockchain blockchain; protected static ProtocolSchedule protocolSchedule; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java index 04bf1506d9..0ec68ad3a5 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java @@ -1,17 +1,18 @@ package tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -20,10 +21,22 @@ import org.junit.Test; * * @param */ -public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest { +public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest { + + protected final int maxRetries; + + public RetryingMessageTaskTest() { + this.maxRetries = 3; + } + + @Override + protected void assertResultMatchesExpectation( + final T requestedData, final T response, final EthPeer respondingPeer) { + assertThat(response).isEqualTo(requestedData); + } @Test - public void doesNotCompleteWhenPeerReturnsPartialResult() + public void failsWhenPeerRepeatedlyReturnsPartialResult() throws ExecutionException, InterruptedException { // Setup data to be requested and expected response @@ -33,17 +46,19 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskT EthProtocolManagerTestUtil.createPeer(ethProtocolManager); // Execute task and wait for response - final AtomicBoolean done = new AtomicBoolean(false); final T requestedData = generateDataToBeRequested(); - final EthTask task = createTask(requestedData); - final CompletableFuture future = task.run(); - respondingPeer.respondTimes(responder, 20); - future.whenComplete( - (result, error) -> { - done.compareAndSet(false, true); - }); - - assertThat(done).isFalse(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + // Respond max times + respondingPeer.respondTimes(responder, maxRetries); + assertThat(future.isDone()).isFalse(); + + // Next retry should fail + respondingPeer.respond(responder); + assertThat(future.isDone()).isTrue(); + assertThat(future.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); } @Test @@ -52,16 +67,10 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskT // Setup data to be requested final T requestedData = generateDataToBeRequested(); - // Execute task and wait for response - final AtomicBoolean done = new AtomicBoolean(false); - final EthTask task = createTask(requestedData); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - done.compareAndSet(false, true); - }); - - assertThat(done).isFalse(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + assertThat(future.isDone()).isFalse(); } @Test @@ -71,17 +80,10 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskT final T requestedData = generateDataToBeRequested(); // Execute task and wait for response - final AtomicBoolean done = new AtomicBoolean(false); - final AtomicReference actualResult = new AtomicReference<>(); - final EthTask task = createTask(requestedData); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - actualResult.set(result); - done.compareAndSet(false, true); - }); - - assertThat(done).isFalse(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + assertThat(future.isDone()).isFalse(); // Setup a peer final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); @@ -89,7 +91,7 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskT EthProtocolManagerTestUtil.createPeer(ethProtocolManager); respondingPeer.respondWhile(responder, () -> !future.isDone()); - assertResultMatchesExpectation(requestedData, actualResult.get(), respondingPeer.getEthPeer()); + assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); } @Test @@ -101,26 +103,17 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskT EthProtocolManagerTestUtil.createPeer(ethProtocolManager); final T requestedData = generateDataToBeRequested(); - // Execute task and wait for response - final AtomicBoolean done = new AtomicBoolean(false); - final AtomicReference actualResult = new AtomicReference<>(); - final EthTask task = createTask(requestedData); - final CompletableFuture future = task.run(); - future.whenComplete( - (result, error) -> { - actualResult.set(result); - done.compareAndSet(false, true); - }); - - assertThat(done).isFalse(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + assertThat(future.isDone()).isFalse(); respondingPeer.respondWhile(responder, () -> !future.isDone()); - assertResultMatchesExpectation(requestedData, actualResult.get(), respondingPeer.getEthPeer()); + assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); } @Test - public void doesNotCompleteWhenPeersSendEmptyResponses() - throws ExecutionException, InterruptedException { + public void failsWhenPeersSendEmptyResponses() throws ExecutionException, InterruptedException { // Setup a unresponsive peer final Responder responder = RespondingEthPeer.emptyResponder(); final RespondingEthPeer respondingPeer = @@ -129,15 +122,20 @@ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskT // Setup data to be requested final T requestedData = generateDataToBeRequested(); - // Execute task and wait for response - final AtomicBoolean done = new AtomicBoolean(false); - final EthTask task = createTask(requestedData); - final CompletableFuture future = task.run(); - respondingPeer.respondTimes(responder, 20); - future.whenComplete( - (response, error) -> { - done.compareAndSet(false, true); - }); + // Setup and run task + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + assertThat(future.isDone()).isFalse(); + + // Respond max times + respondingPeer.respondTimes(responder, maxRetries); + assertThat(future.isDone()).isFalse(); + + // Next retry should fail + respondingPeer.respond(responder); + assertThat(future.isDone()).isTrue(); + assertThat(future.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskWithResultsTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskWithResultsTest.java deleted file mode 100644 index 466dad6e2d..0000000000 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskWithResultsTest.java +++ /dev/null @@ -1,14 +0,0 @@ -package tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils; - -import static org.assertj.core.api.Assertions.assertThat; - -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; - -public abstract class RetryingMessageTaskWithResultsTest extends RetryingMessageTaskTest { - - @Override - protected void assertResultMatchesExpectation( - final T requestedData, final T response, final EthPeer respondingPeer) { - assertThat(response).isEqualTo(requestedData); - } -} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java index f83b61a791..8602bb6ff1 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java @@ -4,13 +4,13 @@ import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockBody; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; -import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskWithResultsTest; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -public class CompleteBlocksTaskTest extends RetryingMessageTaskWithResultsTest> { +public class CompleteBlocksTaskTest extends RetryingMessageTaskTest> { @Override protected List generateDataToBeRequested() { @@ -28,6 +28,7 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskWithResultsTest> createTask(final List requestedData) { final List headersToComplete = requestedData.stream().map(Block::getHeader).collect(Collectors.toList()); - return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headersToComplete); + return CompleteBlocksTask.forHeaders( + protocolSchedule, ethContext, headersToComplete, maxRetries); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java index bcddeb0e75..dee1a49d4e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java @@ -2,13 +2,12 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; -import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskWithResultsTest; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; import java.util.ArrayList; import java.util.List; -public class DownloadHeaderSequenceTaskTest - extends RetryingMessageTaskWithResultsTest> { +public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest> { @Override protected List generateDataToBeRequested() { @@ -26,6 +25,11 @@ public class DownloadHeaderSequenceTaskTest final BlockHeader lastHeader = requestedData.get(requestedData.size() - 1); final BlockHeader referenceHeader = blockchain.getBlockHeader(lastHeader.getNumber() + 1).get(); return DownloadHeaderSequenceTask.endingAtHeader( - protocolSchedule, protocolContext, ethContext, referenceHeader, requestedData.size()); + protocolSchedule, + protocolContext, + ethContext, + referenceHeader, + requestedData.size(), + maxRetries); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java index 4e6394dcdf..6938ebd9c4 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java @@ -14,7 +14,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; -import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.AbstractMessageTaskTest; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; @@ -38,7 +38,7 @@ import java.util.stream.LongStream; import org.junit.Test; public class PipelinedImportChainSegmentTaskTest - extends RetryingMessageTaskTest, List> { + extends AbstractMessageTaskTest, List> { @Override protected List generateDataToBeRequested() {