diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 18646a1306..b85ad75736 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -659,7 +659,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides final PeerSelector peerSelector = new DefaultPeerSelector(currentProtocolSpecSupplier); ethPeers.streamAllPeers().forEach(peerSelector::addPeer); final PeerTaskExecutor peerTaskExecutor = - new PeerTaskExecutor(peerSelector, new PeerTaskRequestSender(), metricsSystem); + new PeerTaskExecutor(peerSelector, new PeerTaskRequestSender(), scheduler, metricsSystem); final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode()); final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 10c882e7e5..c7a7d2079f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.metrics.BesuMetricCategory; @@ -37,15 +38,18 @@ public class PeerTaskExecutor { public static final int NO_RETRIES = 1; private final PeerSelector peerSelector; private final PeerTaskRequestSender requestSender; + private final EthScheduler ethScheduler; private final LabelledMetric requestTimer; public PeerTaskExecutor( final PeerSelector peerSelector, final PeerTaskRequestSender requestSender, + final EthScheduler ethScheduler, final MetricsSystem metricsSystem) { this.peerSelector = peerSelector; this.requestSender = requestSender; + this.ethScheduler = ethScheduler; requestTimer = metricsSystem.createLabelledTimer( BesuMetricCategory.PEERS, @@ -81,7 +85,13 @@ public class PeerTaskExecutor { } public CompletableFuture> executeAsync(final PeerTask peerTask) { - return CompletableFuture.supplyAsync(() -> execute(peerTask)); + return ethScheduler.scheduleSyncWorkerTask( + () -> CompletableFuture.completedFuture(execute(peerTask))); + } + + public Collection>> executeBatchAsync( + final Collection> peerTasks) { + return peerTasks.stream().map(this::executeAsync).toList(); } public PeerTaskExecutorResult executeAgainstPeer( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index 407effb7e0..c3741292a0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; @@ -32,11 +33,15 @@ import org.hyperledger.besu.ethereum.mainnet.BodyValidator; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import com.google.common.collect.Lists; + public class DownloadReceiptsStep implements Function, CompletableFuture>> { private final EthContext ethContext; @@ -61,26 +66,46 @@ public class DownloadReceiptsStep if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { return CompletableFuture.supplyAsync( () -> { - List blockWithReceiptsList = new ArrayList<>(headers.size()); + Map> getReceipts = new ConcurrentHashMap<>(); do { - GetReceiptsFromPeerTask getReceiptsFromPeerTask = - new GetReceiptsFromPeerTask(headers, new BodyValidator()); - PeerTaskExecutorResult>> getReceiptsResult = - peerTaskExecutor.execute(getReceiptsFromPeerTask); - if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS - && getReceiptsResult.result().isPresent()) { - // remove all the headers we found receipts for - headers.removeAll(getReceiptsResult.result().get().keySet()); - blockWithReceiptsList.addAll(combineBlocksAndReceipts(blocks, getReceiptsResult.result().get())); + List> blockHeaderSubLists = Lists.partition(headers, 20); + List>>> tasks = new ArrayList<>(); + for (List blockHeaderSubList : blockHeaderSubLists) { + tasks.add(new GetReceiptsFromPeerTask(blockHeaderSubList, new BodyValidator())); + } + Collection< + CompletableFuture< + PeerTaskExecutorResult>>>> + taskExecutions = peerTaskExecutor.executeBatchAsync(tasks); + for (CompletableFuture< + PeerTaskExecutorResult>>> + taskExecution : taskExecutions) { + taskExecution.thenAccept( + (getReceiptsResult) -> { + if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS + && getReceiptsResult.result().isPresent()) { + Map> taskResult = + getReceiptsResult.result().get(); + taskResult + .keySet() + .forEach( + (blockHeader) -> + getReceipts.merge( + blockHeader, + taskResult.get(blockHeader), + (initialReceipts, newReceipts) -> { + throw new IllegalStateException( + "Unexpectedly got receipts for block header already populated!"); + })); + } + }); } + taskExecutions.forEach(CompletableFuture::join); + // remove all the headers we found receipts for + headers.removeAll(getReceipts.keySet()); // repeat until all headers have receipts } while (!headers.isEmpty()); - - // verify that all blocks have receipts - if (blocks.size() != blockWithReceiptsList.size()) { - throw new IllegalStateException("Not all blocks have been matched to receipts!"); - } - return blockWithReceiptsList; + return combineBlocksAndReceipts(blocks, getReceipts); }); } else { @@ -93,11 +118,19 @@ public class DownloadReceiptsStep private List combineBlocksAndReceipts( final List blocks, final Map> receiptsByHeader) { return blocks.stream() - .filter((b) -> receiptsByHeader.containsKey(b.getHeader())) .map( block -> { final List receipts = receiptsByHeader.getOrDefault(block.getHeader(), emptyList()); + if (block.getBody().getTransactions().size() != receipts.size()) { + throw new IllegalStateException( + "PeerTask response code was success, but incorrect number of receipts returned. Header hash: " + + block.getHeader().getHash() + + ", Transactions: " + + block.getBody().getTransactions().size() + + ", receipts: " + + receipts.size()); + } return new BlockWithReceipts(block, receipts); }) .toList(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 15b1747bc7..9dbba7ef63 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -49,7 +49,8 @@ public class PeerTaskExecutorTest { @BeforeEach public void beforeTest() { mockCloser = MockitoAnnotations.openMocks(this); - peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem()); + peerTaskExecutor = + new PeerTaskExecutor(peerSelector, requestSender, null, new NoOpMetricsSystem()); } @AfterEach diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index 7af807c1c7..6d194cb8e0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -537,7 +537,7 @@ public class FastSyncActionsTest { protocolSchedule, protocolContext, ethContext, - new PeerTaskExecutor(null, null, new NoOpMetricsSystem()), + new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()), new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()), pivotBlockSelector, new NoOpMetricsSystem()); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index 0e5b5ec2c7..da82034eaa 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -111,7 +111,7 @@ public class FastSyncChainDownloaderTest { protocolSchedule, protocolContext, ethContext, - new PeerTaskExecutor(null, null, new NoOpMetricsSystem()), + new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()), syncState, new NoOpMetricsSystem(), new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()),