From 493ac9152945ef8e23a15e00a625f05bef6ffc26 Mon Sep 17 00:00:00 2001 From: Matilda Clerke Date: Mon, 7 Oct 2024 16:48:13 +1100 Subject: [PATCH] 7311: Move GetReceipts to services worker for parallelism Signed-off-by: Matilda Clerke --- .../besu/ethereum/eth/manager/EthScheduler.java | 15 ++++++++++++++- .../eth/sync/fastsync/DownloadReceiptsStep.java | 5 ++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java index 1e2f3eb6ab..8c90993c68 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java @@ -145,7 +145,7 @@ public class EthScheduler { servicesExecutor.execute(command); } - public CompletableFuture scheduleServiceTask(final Runnable task) { + public CompletableFuture scheduleServiceTask(final Runnable task) { return CompletableFuture.runAsync(task, servicesExecutor); } @@ -156,6 +156,19 @@ public class EthScheduler { return serviceFuture; } + public CompletableFuture scheduleServiceTask(final Supplier> future) { + final CompletableFuture promise = new CompletableFuture<>(); + final Future workerFuture = servicesExecutor.submit(() -> propagateResult(future, promise)); + // If returned promise is cancelled, cancel the worker future + promise.whenComplete( + (r, t) -> { + if (t instanceof CancellationException) { + workerFuture.cancel(false); + } + }); + return promise; + } + public CompletableFuture startPipeline(final Pipeline pipeline) { final CompletableFuture pipelineFuture = pipeline.start(servicesExecutor); pendingFutures.add(pipelineFuture); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index 4c60ac1d19..5f540b7f64 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -37,6 +37,9 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DownloadReceiptsStep implements Function, CompletableFuture>> { @@ -62,7 +65,7 @@ public class DownloadReceiptsStep if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { return ethContext .getScheduler() - .scheduleSyncWorkerTask( + .scheduleServiceTask( () -> { Map> getReceipts = new HashMap<>(); do {