|
|
@ -1,17 +1,18 @@ |
|
|
|
package tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils; |
|
|
|
package tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils; |
|
|
|
|
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; |
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; |
|
|
|
|
|
|
|
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException; |
|
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import org.junit.Test; |
|
|
|
import org.junit.Test; |
|
|
|
|
|
|
|
|
|
|
@ -20,10 +21,22 @@ import org.junit.Test; |
|
|
|
* |
|
|
|
* |
|
|
|
* @param <T> |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskTest<T, R> { |
|
|
|
public abstract class RetryingMessageTaskTest<T> extends AbstractMessageTaskTest<T, T> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected final int maxRetries; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public RetryingMessageTaskTest() { |
|
|
|
|
|
|
|
this.maxRetries = 3; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
protected void assertResultMatchesExpectation( |
|
|
|
|
|
|
|
final T requestedData, final T response, final EthPeer respondingPeer) { |
|
|
|
|
|
|
|
assertThat(response).isEqualTo(requestedData); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void doesNotCompleteWhenPeerReturnsPartialResult() |
|
|
|
public void failsWhenPeerRepeatedlyReturnsPartialResult() |
|
|
|
throws ExecutionException, InterruptedException { |
|
|
|
throws ExecutionException, InterruptedException { |
|
|
|
// Setup data to be requested and expected response
|
|
|
|
// Setup data to be requested and expected response
|
|
|
|
|
|
|
|
|
|
|
@ -33,17 +46,19 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT |
|
|
|
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
|
|
|
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
|
|
|
|
|
|
|
|
|
|
|
// Execute task and wait for response
|
|
|
|
// Execute task and wait for response
|
|
|
|
final AtomicBoolean done = new AtomicBoolean(false); |
|
|
|
|
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
final EthTask<R> task = createTask(requestedData); |
|
|
|
final EthTask<T> task = createTask(requestedData); |
|
|
|
final CompletableFuture<R> future = task.run(); |
|
|
|
final CompletableFuture<T> future = task.run(); |
|
|
|
respondingPeer.respondTimes(responder, 20); |
|
|
|
|
|
|
|
future.whenComplete( |
|
|
|
// Respond max times
|
|
|
|
(result, error) -> { |
|
|
|
respondingPeer.respondTimes(responder, maxRetries); |
|
|
|
done.compareAndSet(false, true); |
|
|
|
assertThat(future.isDone()).isFalse(); |
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
// Next retry should fail
|
|
|
|
assertThat(done).isFalse(); |
|
|
|
respondingPeer.respond(responder); |
|
|
|
|
|
|
|
assertThat(future.isDone()).isTrue(); |
|
|
|
|
|
|
|
assertThat(future.isCompletedExceptionally()).isTrue(); |
|
|
|
|
|
|
|
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
@ -52,16 +67,10 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT |
|
|
|
// Setup data to be requested
|
|
|
|
// Setup data to be requested
|
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
|
|
|
|
|
|
|
|
// Execute task and wait for response
|
|
|
|
final EthTask<T> task = createTask(requestedData); |
|
|
|
final AtomicBoolean done = new AtomicBoolean(false); |
|
|
|
final CompletableFuture<T> future = task.run(); |
|
|
|
final EthTask<R> task = createTask(requestedData); |
|
|
|
|
|
|
|
final CompletableFuture<R> future = task.run(); |
|
|
|
assertThat(future.isDone()).isFalse(); |
|
|
|
future.whenComplete( |
|
|
|
|
|
|
|
(result, error) -> { |
|
|
|
|
|
|
|
done.compareAndSet(false, true); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
assertThat(done).isFalse(); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
@ -71,17 +80,10 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT |
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
|
|
|
|
|
|
|
|
// Execute task and wait for response
|
|
|
|
// Execute task and wait for response
|
|
|
|
final AtomicBoolean done = new AtomicBoolean(false); |
|
|
|
final EthTask<T> task = createTask(requestedData); |
|
|
|
final AtomicReference<R> actualResult = new AtomicReference<>(); |
|
|
|
final CompletableFuture<T> future = task.run(); |
|
|
|
final EthTask<R> task = createTask(requestedData); |
|
|
|
|
|
|
|
final CompletableFuture<R> future = task.run(); |
|
|
|
assertThat(future.isDone()).isFalse(); |
|
|
|
future.whenComplete( |
|
|
|
|
|
|
|
(result, error) -> { |
|
|
|
|
|
|
|
actualResult.set(result); |
|
|
|
|
|
|
|
done.compareAndSet(false, true); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
assertThat(done).isFalse(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Setup a peer
|
|
|
|
// Setup a peer
|
|
|
|
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); |
|
|
|
final Responder responder = RespondingEthPeer.blockchainResponder(blockchain); |
|
|
@ -89,7 +91,7 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT |
|
|
|
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
|
|
|
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
|
|
|
respondingPeer.respondWhile(responder, () -> !future.isDone()); |
|
|
|
respondingPeer.respondWhile(responder, () -> !future.isDone()); |
|
|
|
|
|
|
|
|
|
|
|
assertResultMatchesExpectation(requestedData, actualResult.get(), respondingPeer.getEthPeer()); |
|
|
|
assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
@ -101,26 +103,17 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT |
|
|
|
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
|
|
|
EthProtocolManagerTestUtil.createPeer(ethProtocolManager); |
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
|
|
|
|
|
|
|
|
// Execute task and wait for response
|
|
|
|
final EthTask<T> task = createTask(requestedData); |
|
|
|
final AtomicBoolean done = new AtomicBoolean(false); |
|
|
|
final CompletableFuture<T> future = task.run(); |
|
|
|
final AtomicReference<R> actualResult = new AtomicReference<>(); |
|
|
|
|
|
|
|
final EthTask<R> task = createTask(requestedData); |
|
|
|
assertThat(future.isDone()).isFalse(); |
|
|
|
final CompletableFuture<R> future = task.run(); |
|
|
|
|
|
|
|
future.whenComplete( |
|
|
|
|
|
|
|
(result, error) -> { |
|
|
|
|
|
|
|
actualResult.set(result); |
|
|
|
|
|
|
|
done.compareAndSet(false, true); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
assertThat(done).isFalse(); |
|
|
|
|
|
|
|
respondingPeer.respondWhile(responder, () -> !future.isDone()); |
|
|
|
respondingPeer.respondWhile(responder, () -> !future.isDone()); |
|
|
|
|
|
|
|
|
|
|
|
assertResultMatchesExpectation(requestedData, actualResult.get(), respondingPeer.getEthPeer()); |
|
|
|
assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void doesNotCompleteWhenPeersSendEmptyResponses() |
|
|
|
public void failsWhenPeersSendEmptyResponses() throws ExecutionException, InterruptedException { |
|
|
|
throws ExecutionException, InterruptedException { |
|
|
|
|
|
|
|
// Setup a unresponsive peer
|
|
|
|
// Setup a unresponsive peer
|
|
|
|
final Responder responder = RespondingEthPeer.emptyResponder(); |
|
|
|
final Responder responder = RespondingEthPeer.emptyResponder(); |
|
|
|
final RespondingEthPeer respondingPeer = |
|
|
|
final RespondingEthPeer respondingPeer = |
|
|
@ -129,15 +122,20 @@ public abstract class RetryingMessageTaskTest<T, R> extends AbstractMessageTaskT |
|
|
|
// Setup data to be requested
|
|
|
|
// Setup data to be requested
|
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
final T requestedData = generateDataToBeRequested(); |
|
|
|
|
|
|
|
|
|
|
|
// Execute task and wait for response
|
|
|
|
// Setup and run task
|
|
|
|
final AtomicBoolean done = new AtomicBoolean(false); |
|
|
|
final EthTask<T> task = createTask(requestedData); |
|
|
|
final EthTask<R> task = createTask(requestedData); |
|
|
|
final CompletableFuture<T> future = task.run(); |
|
|
|
final CompletableFuture<R> future = task.run(); |
|
|
|
|
|
|
|
respondingPeer.respondTimes(responder, 20); |
|
|
|
|
|
|
|
future.whenComplete( |
|
|
|
|
|
|
|
(response, error) -> { |
|
|
|
|
|
|
|
done.compareAndSet(false, true); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
assertThat(future.isDone()).isFalse(); |
|
|
|
assertThat(future.isDone()).isFalse(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Respond max times
|
|
|
|
|
|
|
|
respondingPeer.respondTimes(responder, maxRetries); |
|
|
|
|
|
|
|
assertThat(future.isDone()).isFalse(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Next retry should fail
|
|
|
|
|
|
|
|
respondingPeer.respond(responder); |
|
|
|
|
|
|
|
assertThat(future.isDone()).isTrue(); |
|
|
|
|
|
|
|
assertThat(future.isCompletedExceptionally()).isTrue(); |
|
|
|
|
|
|
|
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|