diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index 5f540b7f64..8ee7ef414a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -37,9 +37,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class DownloadReceiptsStep implements Function, CompletableFuture>> { @@ -65,37 +62,8 @@ public class DownloadReceiptsStep if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { return ethContext .getScheduler() - .scheduleServiceTask( - () -> { - Map> getReceipts = new HashMap<>(); - do { - GetReceiptsFromPeerTask task = - new GetReceiptsFromPeerTask(headers, new BodyValidator()); - PeerTaskExecutorResult>> - getReceiptsResult = peerTaskExecutor.execute(task); - if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS - && getReceiptsResult.result().isPresent()) { - Map> taskResult = - getReceiptsResult.result().get(); - taskResult - .keySet() - .forEach( - (blockHeader) -> - getReceipts.merge( - blockHeader, - taskResult.get(blockHeader), - (initialReceipts, newReceipts) -> { - throw new IllegalStateException( - "Unexpectedly got receipts for block header already populated!"); - })); - } - // remove all the headers we found receipts for - headers.removeAll(getReceipts.keySet()); - // repeat until all headers have receipts - } while (!headers.isEmpty()); - return CompletableFuture.completedFuture( - combineBlocksAndReceipts(blocks, getReceipts)); - }); + .scheduleServiceTask(() -> getReceiptsWithPeerTaskSystem(headers)) + .thenApply((receipts) -> combineBlocksAndReceipts(blocks, receipts)); } else { return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem) @@ -104,6 +72,35 @@ public class DownloadReceiptsStep } } + private CompletableFuture>> + getReceiptsWithPeerTaskSystem(final List headers) { + Map> getReceipts = new HashMap<>(); + do { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(headers, new BodyValidator()); + PeerTaskExecutorResult>> getReceiptsResult = + peerTaskExecutor.execute(task); + if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS + && getReceiptsResult.result().isPresent()) { + Map> taskResult = getReceiptsResult.result().get(); + taskResult + .keySet() + .forEach( + (blockHeader) -> + getReceipts.merge( + blockHeader, + taskResult.get(blockHeader), + (initialReceipts, newReceipts) -> { + throw new IllegalStateException( + "Unexpectedly got receipts for block header already populated!"); + })); + } + // remove all the headers we found receipts for + headers.removeAll(getReceipts.keySet()); + // repeat until all headers have receipts + } while (!headers.isEmpty()); + return CompletableFuture.completedFuture(getReceipts); + } + private List combineBlocksAndReceipts( final List blocks, final Map> receiptsByHeader) { return blocks.stream()