[NC-1558] Retry tasks a max number of times before failing (#74)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent aeaa661603
commit 7e30513cce
  1. 11
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java
  2. 3
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/EthTaskException.java
  3. 8
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/exceptions/MaxRetriesReachedException.java
  4. 16
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java
  5. 24
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java
  6. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  7. 118
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java
  8. 14
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskWithResultsTest.java
  9. 7
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java
  10. 12
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java
  11. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java

@ -1,5 +1,6 @@
package tech.pegasys.pantheon.ethereum.eth.manager;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.util.ExceptionUtils;
@ -14,9 +15,12 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext;
private final int maxRetries;
private int requestCount = 0;
public AbstractRetryingPeerTask(final EthContext ethContext) {
public AbstractRetryingPeerTask(final EthContext ethContext, final int maxRetries) {
this.ethContext = ethContext;
this.maxRetries = maxRetries;
}
@Override
@ -25,7 +29,12 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
// Return if task is done
return;
}
if (requestCount > maxRetries) {
result.get().completeExceptionally(new MaxRetriesReachedException());
return;
}
requestCount += 1;
executePeerTask()
.whenComplete(
(peerResult, error) -> {

@ -17,6 +17,7 @@ public class EthTaskException extends RuntimeException {
PEER_DISCONNECTED,
NO_AVAILABLE_PEERS,
PEER_BREACHED_PROTOCOL,
INCOMPLETE_RESULTS
INCOMPLETE_RESULTS,
MAX_RETRIES_REACHED
}
}

@ -0,0 +1,8 @@
package tech.pegasys.pantheon.ethereum.eth.manager.exceptions;
public class MaxRetriesReachedException extends EthTaskException {
public MaxRetriesReachedException() {
super(FailureReason.MAX_RETRIES_REACHED);
}
}

@ -32,6 +32,7 @@ import org.apache.logging.log4j.Logger;
*/
public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>> {
private static final Logger LOG = LogManager.getLogger();
private static final int DEFAULT_RETRIES = 3;
private final EthContext ethContext;
private final ProtocolSchedule<C> protocolSchedule;
@ -43,8 +44,9 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
private CompleteBlocksTask(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers) {
super(ethContext);
final List<BlockHeader> headers,
final int maxRetries) {
super(ethContext, maxRetries);
checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
@ -53,11 +55,19 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
this.blocks = new HashMap<>();
}
public static <C> CompleteBlocksTask<C> forHeaders(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers,
final int maxRetries) {
return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, maxRetries);
}
public static <C> CompleteBlocksTask<C> forHeaders(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers) {
return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers);
return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, DEFAULT_RETRIES);
}
@Override

@ -35,6 +35,7 @@ import org.apache.logging.log4j.Logger;
*/
public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List<BlockHeader>> {
private static final Logger LOG = LogManager.getLogger();
private static final int DEFAULT_RETRIES = 3;
private final EthContext ethContext;
private final ProtocolContext<C> protocolContext;
@ -52,8 +53,9 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final BlockHeader referenceHeader,
final int segmentLength) {
super(ethContext);
final int segmentLength,
final int maxRetries) {
super(ethContext, maxRetries);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
@ -65,6 +67,17 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
lastFilledHeaderIndex = segmentLength;
}
public static <C> DownloadHeaderSequenceTask<C> endingAtHeader(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final BlockHeader referenceHeader,
final int segmentLength,
final int maxRetries) {
return new DownloadHeaderSequenceTask<>(
protocolSchedule, protocolContext, ethContext, referenceHeader, segmentLength, maxRetries);
}
public static <C> DownloadHeaderSequenceTask<C> endingAtHeader(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
@ -72,7 +85,12 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
final BlockHeader referenceHeader,
final int segmentLength) {
return new DownloadHeaderSequenceTask<>(
protocolSchedule, protocolContext, ethContext, referenceHeader, segmentLength);
protocolSchedule,
protocolContext,
ethContext,
referenceHeader,
segmentLength,
DEFAULT_RETRIES);
}
@Override

@ -23,6 +23,10 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* @param <T> The type of data being requested from the network
* @param <R> The type of data returned from the network
*/
public abstract class AbstractMessageTaskTest<T, R> {
protected static Blockchain blockchain;
protected static ProtocolSchedule<Void> protocolSchedule;

@ -1,17 +1,18 @@
package tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
@ -20,10 +21,22 @@ import org.junit.Test;
*
* @param <T>
*/
public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskTest<T, R> {
public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest<T, T> {
protected final int maxRetries;
public RetryingMessageTaskTest() {
this.maxRetries = 3;
}
@Override
protected void assertResultMatchesExpectation(
final T requestedData, final T response, final EthPeer respondingPeer) {
assertThat(response).isEqualTo(requestedData);
}
@Test
public void doesNotCompleteWhenPeerReturnsPartialResult()
public void failsWhenPeerRepeatedlyReturnsPartialResult()
throws ExecutionException, InterruptedException {
// Setup data to be requested and expected response
@ -33,17 +46,19 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Execute task and wait for response
final AtomicBoolean done = new AtomicBoolean(false);
final T requestedData = generateDataToBeRequested();
final EthTask<R> task = createTask(requestedData);
final CompletableFuture<R> future = task.run();
respondingPeer.respondTimes(responder, 20);
future.whenComplete(
(result, error) -> {
done.compareAndSet(false, true);
});
assertThat(done).isFalse();
final EthTask<T> task = createTask(requestedData);
final CompletableFuture<T> future = task.run();
// Respond max times
respondingPeer.respondTimes(responder, maxRetries);
assertThat(future.isDone()).isFalse();
// Next retry should fail
respondingPeer.respond(responder);
assertThat(future.isDone()).isTrue();
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
@Test
@ -52,16 +67,10 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT
// Setup data to be requested
final T requestedData = generateDataToBeRequested();
// Execute task and wait for response
final AtomicBoolean done = new AtomicBoolean(false);
final EthTask<R> task = createTask(requestedData);
final CompletableFuture<R> future = task.run();
future.whenComplete(
(result, error) -> {
done.compareAndSet(false, true);
});
assertThat(done).isFalse();
final EthTask<T> task = createTask(requestedData);
final CompletableFuture<T> future = task.run();
assertThat(future.isDone()).isFalse();
}
@Test
@ -71,17 +80,10 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT
final T requestedData = generateDataToBeRequested();
// Execute task and wait for response
final AtomicBoolean done = new AtomicBoolean(false);
final AtomicReference<R> actualResult = new AtomicReference<>();
final EthTask<R> task = createTask(requestedData);
final CompletableFuture<R> future = task.run();
future.whenComplete(
(result, error) -> {
actualResult.set(result);
done.compareAndSet(false, true);
});
assertThat(done).isFalse();
final EthTask<T> task = createTask(requestedData);
final CompletableFuture<T> future = task.run();
assertThat(future.isDone()).isFalse();
// Setup a peer
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
@ -89,7 +91,7 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
respondingPeer.respondWhile(responder, () -> !future.isDone());
assertResultMatchesExpectation(requestedData, actualResult.get(), respondingPeer.getEthPeer());
assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer());
}
@Test
@ -101,26 +103,17 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
final T requestedData = generateDataToBeRequested();
// Execute task and wait for response
final AtomicBoolean done = new AtomicBoolean(false);
final AtomicReference<R> actualResult = new AtomicReference<>();
final EthTask<R> task = createTask(requestedData);
final CompletableFuture<R> future = task.run();
future.whenComplete(
(result, error) -> {
actualResult.set(result);
done.compareAndSet(false, true);
});
assertThat(done).isFalse();
final EthTask<T> task = createTask(requestedData);
final CompletableFuture<T> future = task.run();
assertThat(future.isDone()).isFalse();
respondingPeer.respondWhile(responder, () -> !future.isDone());
assertResultMatchesExpectation(requestedData, actualResult.get(), respondingPeer.getEthPeer());
assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer());
}
@Test
public void doesNotCompleteWhenPeersSendEmptyResponses()
throws ExecutionException, InterruptedException {
public void failsWhenPeersSendEmptyResponses() throws ExecutionException, InterruptedException {
// Setup a unresponsive peer
final Responder responder = RespondingEthPeer.emptyResponder();
final RespondingEthPeer respondingPeer =
@ -129,15 +122,20 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT
// Setup data to be requested
final T requestedData = generateDataToBeRequested();
// Execute task and wait for response
final AtomicBoolean done = new AtomicBoolean(false);
final EthTask<R> task = createTask(requestedData);
final CompletableFuture<R> future = task.run();
respondingPeer.respondTimes(responder, 20);
future.whenComplete(
(response, error) -> {
done.compareAndSet(false, true);
});
// Setup and run task
final EthTask<T> task = createTask(requestedData);
final CompletableFuture<T> future = task.run();
assertThat(future.isDone()).isFalse();
// Respond max times
respondingPeer.respondTimes(responder, maxRetries);
assertThat(future.isDone()).isFalse();
// Next retry should fail
respondingPeer.respond(responder);
assertThat(future.isDone()).isTrue();
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
}

@ -1,14 +0,0 @@
package tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
public abstract class RetryingMessageTaskWithResultsTest<T> extends RetryingMessageTaskTest<T, T> {
@Override
protected void assertResultMatchesExpectation(
final T requestedData, final T response, final EthPeer respondingPeer) {
assertThat(response).isEqualTo(requestedData);
}
}

@ -4,13 +4,13 @@ import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskWithResultsTest;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class CompleteBlocksTaskTest extends RetryingMessageTaskWithResultsTest<List<Block>> {
public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>> {
@Override
protected List<Block> generateDataToBeRequested() {
@ -28,6 +28,7 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskWithResultsTest<L
protected EthTask<List<Block>> createTask(final List<Block> requestedData) {
final List<BlockHeader> headersToComplete =
requestedData.stream().map(Block::getHeader).collect(Collectors.toList());
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headersToComplete);
return CompleteBlocksTask.forHeaders(
protocolSchedule, ethContext, headersToComplete, maxRetries);
}
}

@ -2,13 +2,12 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskWithResultsTest;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import java.util.ArrayList;
import java.util.List;
public class DownloadHeaderSequenceTaskTest
extends RetryingMessageTaskWithResultsTest<List<BlockHeader>> {
public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List<BlockHeader>> {
@Override
protected List<BlockHeader> generateDataToBeRequested() {
@ -26,6 +25,11 @@ public class DownloadHeaderSequenceTaskTest
final BlockHeader lastHeader = requestedData.get(requestedData.size() - 1);
final BlockHeader referenceHeader = blockchain.getBlockHeader(lastHeader.getNumber() + 1).get();
return DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule, protocolContext, ethContext, referenceHeader, requestedData.size());
protocolSchedule,
protocolContext,
ethContext,
referenceHeader,
requestedData.size(),
maxRetries);
}
}

@ -14,7 +14,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.AbstractMessageTaskTest;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
@ -38,7 +38,7 @@ import java.util.stream.LongStream;
import org.junit.Test;
public class PipelinedImportChainSegmentTaskTest
extends RetryingMessageTaskTest<List<Block>, List<Block>> {
extends AbstractMessageTaskTest<List<Block>, List<Block>> {
@Override
protected List<Block> generateDataToBeRequested() {

Loading…
Cancel
Save