Replace FutureUtil method with standard library equivalent (#390)

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
pull/658/head
Ratan Rai Sur 5 years ago committed by GitHub
parent 96122e05d7
commit fbdd5301b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractEthTask.java
  2. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/GetBlockFromPeerTask.java
  3. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java
  4. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java
  5. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java
  6. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java
  7. 13
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSourceTest.java
  8. 9
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java
  9. 14
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java
  10. 13
      util/src/main/java/org/hyperledger/besu/util/FutureUtils.java
  11. 2
      util/src/test/java/org/hyperledger/besu/util/FutureUtilsTest.java

@ -14,8 +14,6 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.hyperledger.besu.util.FutureUtils.completedExceptionally;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
@ -127,7 +125,7 @@ public abstract class AbstractEthTask<T> implements EthTask<T> {
subTaskFuture.whenComplete((r, t) -> subTaskFutures.remove(subTaskFuture));
return subTaskFuture;
} else {
return completedExceptionally(new CancellationException());
return CompletableFuture.failedFuture(new CancellationException());
}
}
}

@ -14,8 +14,6 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import static org.hyperledger.besu.util.FutureUtils.completedExceptionally;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
@ -102,7 +100,7 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
private CompletableFuture<PeerTaskResult<List<Block>>> completeBlock(
final PeerTaskResult<List<BlockHeader>> headerResult) {
if (headerResult.getResult().isEmpty()) {
return completedExceptionally(new IncompleteResultsException());
return CompletableFuture.failedFuture(new IncompleteResultsException());
}
return executeSubTask(

@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hyperledger.besu.util.FutureUtils.completedExceptionally;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@ -134,7 +133,7 @@ public class PipelineChainDownloader<C> implements ChainDownloader {
logDownloadFailure("Chain download failed.", error);
// Propagate the error out, terminating this chain download.
return completedExceptionally(error);
return CompletableFuture.failedFuture(error);
}
private void logDownloadFailure(final String message, final Throwable error) {
@ -152,7 +151,8 @@ public class PipelineChainDownloader<C> implements ChainDownloader {
private synchronized CompletionStage<Void> startDownloadForSyncTarget(final SyncTarget target) {
if (cancelled.get()) {
return completedExceptionally(new CancellationException("Chain download was cancelled"));
return CompletableFuture.failedFuture(
new CancellationException("Chain download was cancelled"));
}
syncState.setSyncTarget(target.peer(), target.commonAncestor());
currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target);

@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hyperledger.besu.util.FutureUtils.completedExceptionally;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import org.hyperledger.besu.ethereum.ProtocolContext;
@ -103,7 +102,7 @@ public class FastSyncActions<C> {
if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) {
return waitForAnyPeer();
}
return completedExceptionally(throwable);
return CompletableFuture.failedFuture(throwable);
});
}

@ -14,7 +14,6 @@
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static org.hyperledger.besu.util.FutureUtils.completedExceptionally;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
@ -90,7 +89,7 @@ public class FastSyncDownloader<C> {
private CompletableFuture<FastSyncState> handleFailure(final Throwable error) {
trailingPeerRequirements = Optional.empty();
if (ExceptionUtils.rootCause(error) instanceof FastSyncException) {
return completedExceptionally(error);
return CompletableFuture.failedFuture(error);
} else if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
LOG.warn(
"Fast sync was unable to download the world state. Retrying with a new pivot block.");
@ -151,7 +150,8 @@ public class FastSyncDownloader<C> {
// after the stop method had called cancel.
synchronized (this) {
if (!running.get()) {
return completedExceptionally(new CancellationException("FastSyncDownloader stopped"));
return CompletableFuture.failedFuture(
new CancellationException("FastSyncDownloader stopped"));
}
final CompletableFuture<Void> worldStateFuture =
worldStateDownloader.run(currentState.getPivotBlockHeader().get());

@ -158,7 +158,7 @@ class PivotBlockConfirmer<C> {
private CompletableFuture<BlockHeader> executePivotQuery(final long blockNumber) {
if (isCancelled.get() || result.isDone()) {
// Stop loop if this task is done
return FutureUtils.completedExceptionally(new CancellationException());
return CompletableFuture.failedFuture(new CancellationException());
}
final Optional<RetryingGetHeaderFromPeerByNumberTask> query = createPivotQuery(blockNumber);

@ -19,7 +19,6 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.util.FutureUtils.completedExceptionally;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -95,7 +94,7 @@ public class CheckpointRangeSourceTest {
public void shouldNotHaveNextWhenNoMoreCheckpointsAvailableAndRetryLimitReached() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
.thenReturn(completedExceptionally(new TimeoutException()));
.thenReturn(CompletableFuture.failedFuture(new TimeoutException()));
for (int i = 1; i <= CHECKPOINT_TIMEOUTS_PERMITTED; i++) {
assertThat(source).hasNext();
@ -140,7 +139,7 @@ public class CheckpointRangeSourceTest {
public void shouldDelayBeforeRetryingRequestForCheckpointHeadersAfterFailure() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
.thenReturn(completedExceptionally(new RuntimeException("Nope")));
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("Nope")));
assertThat(source.next()).isNull();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
@ -151,10 +150,10 @@ public class CheckpointRangeSourceTest {
public void shouldResetCheckpointFailureCountWhenMoreCheckpointsReceived() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(any(), any()))
.thenReturn(completedExceptionally(new TimeoutException()))
.thenReturn(completedExceptionally(new TimeoutException()))
.thenReturn(CompletableFuture.failedFuture(new TimeoutException()))
.thenReturn(CompletableFuture.failedFuture(new TimeoutException()))
.thenReturn(completedFuture(singletonList(header(15))))
.thenReturn(completedExceptionally(new TimeoutException()));
.thenReturn(CompletableFuture.failedFuture(new TimeoutException()));
assertThat(source.next()).isNull(); // Fail
assertThat(source.next()).isNull(); // Fail
@ -234,7 +233,7 @@ public class CheckpointRangeSourceTest {
@Test
public void shouldSendNewRequestIfRequestForHeadersFails() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
.thenReturn(completedExceptionally(new NoAvailablePeersException()))
.thenReturn(CompletableFuture.failedFuture(new NoAvailablePeersException()))
.thenReturn(completedFuture(asList(header(15), header(20))));
// Returns null when the first request fails

@ -132,7 +132,8 @@ public class FastSyncDownloaderTest {
@Test
public void shouldAbortIfWaitForSuitablePeersFails() {
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedExceptionally(new FastSyncException(FastSyncError.UNEXPECTED_ERROR)));
.thenReturn(
CompletableFuture.failedFuture(new FastSyncException(FastSyncError.UNEXPECTED_ERROR)));
final CompletableFuture<FastSyncState> result = downloader.start();
@ -483,12 +484,6 @@ public class FastSyncDownloaderTest {
Assertions.assertThat(downloader.calculateTrailingPeerRequirements()).isEmpty();
}
private <T> CompletableFuture<T> completedExceptionally(final Throwable error) {
final CompletableFuture<T> result = new CompletableFuture<>();
result.completeExceptionally(error);
return result;
}
private <T> void assertCompletedExceptionally(
final CompletableFuture<T> future, final FastSyncError expectedError) {
assertThat(future).isCompletedExceptionally();

@ -37,7 +37,6 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.Di
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.util.FutureUtils;
import org.hyperledger.besu.util.Subscribers;
import java.util.ArrayList;
@ -118,7 +117,7 @@ public class RlpxAgent {
public CompletableFuture<Integer> start() {
if (!started.compareAndSet(false, true)) {
return FutureUtils.completedExceptionally(
return CompletableFuture.failedFuture(
new IllegalStateException(
"Unable to start an already started " + getClass().getSimpleName()));
}
@ -141,7 +140,7 @@ public class RlpxAgent {
public CompletableFuture<Void> stop() {
if (!started.get() || !stopped.compareAndSet(false, true)) {
return FutureUtils.completedExceptionally(
return CompletableFuture.failedFuture(
new IllegalStateException("Illegal attempt to stop " + getClass().getSimpleName()));
}
@ -196,7 +195,7 @@ public class RlpxAgent {
public CompletableFuture<PeerConnection> connect(final Peer peer) {
// Check if we're ready to establish connections
if (!localNode.isReady()) {
return FutureUtils.completedExceptionally(
return CompletableFuture.failedFuture(
new IllegalStateException(
"Cannot connect before "
+ this.getClass().getSimpleName()
@ -208,7 +207,7 @@ public class RlpxAgent {
final String errorMsg =
"Attempt to connect to peer with no listening port: " + enode.toString();
LOG.warn(errorMsg);
return FutureUtils.completedExceptionally((new IllegalArgumentException(errorMsg)));
return CompletableFuture.failedFuture((new IllegalArgumentException(errorMsg)));
}
// Shortcut checks if we're already connected
@ -223,12 +222,11 @@ public class RlpxAgent {
+ maxConnections
+ "). Cannot connect to peer: "
+ peer;
return FutureUtils.completedExceptionally(new IllegalStateException(errorMsg));
return CompletableFuture.failedFuture(new IllegalStateException(errorMsg));
}
// Check permissions
if (!peerPermissions.allowNewOutboundConnectionTo(peer)) {
return FutureUtils.completedExceptionally(
peerPermissions.newOutboundConnectionException(peer));
return CompletableFuture.failedFuture(peerPermissions.newOutboundConnectionException(peer));
}
// Initiate connection or return existing connection

@ -24,19 +24,6 @@ import java.util.function.Supplier;
public class FutureUtils {
/**
* Creates a {@link CompletableFuture} that is exceptionally completed by <code>error</code>.
*
* @param error the error to exceptionally complete the future with
* @param <T> the type of CompletableFuture
* @return a CompletableFuture exceptionally completed by <code>error</code>.
*/
public static <T> CompletableFuture<T> completedExceptionally(final Throwable error) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(error);
return future;
}
/**
* Returns a new CompletionStage that, when the provided stage completes exceptionally, is
* executed with the provided stage's exception as the argument to the supplied function.

@ -37,7 +37,7 @@ public class FutureUtilsTest {
@Test
public void shouldCreateExceptionallyCompletedFuture() {
final CompletableFuture<Void> future = FutureUtils.completedExceptionally(ERROR);
final CompletableFuture<Void> future = CompletableFuture.failedFuture(ERROR);
assertCompletedExceptionally(future, ERROR);
}

Loading…
Cancel
Save