7311: Move GetReceipts to services worker for parallelism

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7638/head
Matilda Clerke 2 months ago
parent 07f3a7e111
commit 493ac91529
  1. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java
  2. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java

@ -145,7 +145,7 @@ public class EthScheduler {
servicesExecutor.execute(command); servicesExecutor.execute(command);
} }
public <T> CompletableFuture<Void> scheduleServiceTask(final Runnable task) { public CompletableFuture<Void> scheduleServiceTask(final Runnable task) {
return CompletableFuture.runAsync(task, servicesExecutor); return CompletableFuture.runAsync(task, servicesExecutor);
} }
@ -156,6 +156,19 @@ public class EthScheduler {
return serviceFuture; return serviceFuture;
} }
public <T> CompletableFuture<T> scheduleServiceTask(final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture = servicesExecutor.submit(() -> propagateResult(future, promise));
// If returned promise is cancelled, cancel the worker future
promise.whenComplete(
(r, t) -> {
if (t instanceof CancellationException) {
workerFuture.cancel(false);
}
});
return promise;
}
public CompletableFuture<Void> startPipeline(final Pipeline<?> pipeline) { public CompletableFuture<Void> startPipeline(final Pipeline<?> pipeline) {
final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor); final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor);
pendingFutures.add(pipelineFuture); pendingFutures.add(pipelineFuture);

@ -37,6 +37,9 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Function; import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DownloadReceiptsStep public class DownloadReceiptsStep
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> { implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> {
@ -62,7 +65,7 @@ public class DownloadReceiptsStep
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
return ethContext return ethContext
.getScheduler() .getScheduler()
.scheduleSyncWorkerTask( .scheduleServiceTask(
() -> { () -> {
Map<BlockHeader, List<TransactionReceipt>> getReceipts = new HashMap<>(); Map<BlockHeader, List<TransactionReceipt>> getReceipts = new HashMap<>();
do { do {

Loading…
Cancel
Save