7311: Rework DownloadReceiptsStep

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7638/head
Matilda Clerke 2 months ago
parent aca80585f4
commit 4d59b10c6a
  1. 2
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  3. 67
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java
  4. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java
  5. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java
  6. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java

@ -659,7 +659,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final PeerSelector peerSelector = new DefaultPeerSelector(currentProtocolSpecSupplier); final PeerSelector peerSelector = new DefaultPeerSelector(currentProtocolSpecSupplier);
ethPeers.streamAllPeers().forEach(peerSelector::addPeer); ethPeers.streamAllPeers().forEach(peerSelector::addPeer);
final PeerTaskExecutor peerTaskExecutor = final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(peerSelector, new PeerTaskRequestSender(), metricsSystem); new PeerTaskExecutor(peerSelector, new PeerTaskRequestSender(), scheduler, metricsSystem);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode()); final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint); final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask; package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -37,15 +38,18 @@ public class PeerTaskExecutor {
public static final int NO_RETRIES = 1; public static final int NO_RETRIES = 1;
private final PeerSelector peerSelector; private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender; private final PeerTaskRequestSender requestSender;
private final EthScheduler ethScheduler;
private final LabelledMetric<OperationTimer> requestTimer; private final LabelledMetric<OperationTimer> requestTimer;
public PeerTaskExecutor( public PeerTaskExecutor(
final PeerSelector peerSelector, final PeerSelector peerSelector,
final PeerTaskRequestSender requestSender, final PeerTaskRequestSender requestSender,
final EthScheduler ethScheduler,
final MetricsSystem metricsSystem) { final MetricsSystem metricsSystem) {
this.peerSelector = peerSelector; this.peerSelector = peerSelector;
this.requestSender = requestSender; this.requestSender = requestSender;
this.ethScheduler = ethScheduler;
requestTimer = requestTimer =
metricsSystem.createLabelledTimer( metricsSystem.createLabelledTimer(
BesuMetricCategory.PEERS, BesuMetricCategory.PEERS,
@ -81,7 +85,13 @@ public class PeerTaskExecutor {
} }
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAsync(final PeerTask<T> peerTask) { public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAsync(final PeerTask<T> peerTask) {
return CompletableFuture.supplyAsync(() -> execute(peerTask)); return ethScheduler.scheduleSyncWorkerTask(
() -> CompletableFuture.completedFuture(execute(peerTask)));
}
public <T> Collection<CompletableFuture<PeerTaskExecutorResult<T>>> executeBatchAsync(
final Collection<PeerTask<T>> peerTasks) {
return peerTasks.stream().map(this::executeAsync).toList();
} }
public <T> PeerTaskExecutorResult<T> executeAgainstPeer( public <T> PeerTaskExecutorResult<T> executeAgainstPeer(

@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
@ -32,11 +33,15 @@ import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;
import com.google.common.collect.Lists;
public class DownloadReceiptsStep public class DownloadReceiptsStep
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> { implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> {
private final EthContext ethContext; private final EthContext ethContext;
@ -61,26 +66,46 @@ public class DownloadReceiptsStep
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
return CompletableFuture.supplyAsync( return CompletableFuture.supplyAsync(
() -> { () -> {
List<BlockWithReceipts> blockWithReceiptsList = new ArrayList<>(headers.size()); Map<BlockHeader, List<TransactionReceipt>> getReceipts = new ConcurrentHashMap<>();
do { do {
GetReceiptsFromPeerTask getReceiptsFromPeerTask = List<List<BlockHeader>> blockHeaderSubLists = Lists.partition(headers, 20);
new GetReceiptsFromPeerTask(headers, new BodyValidator()); List<PeerTask<Map<BlockHeader, List<TransactionReceipt>>>> tasks = new ArrayList<>();
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> getReceiptsResult = for (List<BlockHeader> blockHeaderSubList : blockHeaderSubLists) {
peerTaskExecutor.execute(getReceiptsFromPeerTask); tasks.add(new GetReceiptsFromPeerTask(blockHeaderSubList, new BodyValidator()));
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS }
&& getReceiptsResult.result().isPresent()) { Collection<
// remove all the headers we found receipts for CompletableFuture<
headers.removeAll(getReceiptsResult.result().get().keySet()); PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>>>
blockWithReceiptsList.addAll(combineBlocksAndReceipts(blocks, getReceiptsResult.result().get())); taskExecutions = peerTaskExecutor.executeBatchAsync(tasks);
for (CompletableFuture<
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>>
taskExecution : taskExecutions) {
taskExecution.thenAccept(
(getReceiptsResult) -> {
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& getReceiptsResult.result().isPresent()) {
Map<BlockHeader, List<TransactionReceipt>> taskResult =
getReceiptsResult.result().get();
taskResult
.keySet()
.forEach(
(blockHeader) ->
getReceipts.merge(
blockHeader,
taskResult.get(blockHeader),
(initialReceipts, newReceipts) -> {
throw new IllegalStateException(
"Unexpectedly got receipts for block header already populated!");
}));
}
});
} }
taskExecutions.forEach(CompletableFuture::join);
// remove all the headers we found receipts for
headers.removeAll(getReceipts.keySet());
// repeat until all headers have receipts // repeat until all headers have receipts
} while (!headers.isEmpty()); } while (!headers.isEmpty());
return combineBlocksAndReceipts(blocks, getReceipts);
// verify that all blocks have receipts
if (blocks.size() != blockWithReceiptsList.size()) {
throw new IllegalStateException("Not all blocks have been matched to receipts!");
}
return blockWithReceiptsList;
}); });
} else { } else {
@ -93,11 +118,19 @@ public class DownloadReceiptsStep
private List<BlockWithReceipts> combineBlocksAndReceipts( private List<BlockWithReceipts> combineBlocksAndReceipts(
final List<Block> blocks, final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader) { final List<Block> blocks, final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader) {
return blocks.stream() return blocks.stream()
.filter((b) -> receiptsByHeader.containsKey(b.getHeader()))
.map( .map(
block -> { block -> {
final List<TransactionReceipt> receipts = final List<TransactionReceipt> receipts =
receiptsByHeader.getOrDefault(block.getHeader(), emptyList()); receiptsByHeader.getOrDefault(block.getHeader(), emptyList());
if (block.getBody().getTransactions().size() != receipts.size()) {
throw new IllegalStateException(
"PeerTask response code was success, but incorrect number of receipts returned. Header hash: "
+ block.getHeader().getHash()
+ ", Transactions: "
+ block.getBody().getTransactions().size()
+ ", receipts: "
+ receipts.size());
}
return new BlockWithReceipts(block, receipts); return new BlockWithReceipts(block, receipts);
}) })
.toList(); .toList();

@ -49,7 +49,8 @@ public class PeerTaskExecutorTest {
@BeforeEach @BeforeEach
public void beforeTest() { public void beforeTest() {
mockCloser = MockitoAnnotations.openMocks(this); mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem()); peerTaskExecutor =
new PeerTaskExecutor(peerSelector, requestSender, null, new NoOpMetricsSystem());
} }
@AfterEach @AfterEach

@ -537,7 +537,7 @@ public class FastSyncActionsTest {
protocolSchedule, protocolSchedule,
protocolContext, protocolContext,
ethContext, ethContext,
new PeerTaskExecutor(null, null, new NoOpMetricsSystem()), new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()), new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()),
pivotBlockSelector, pivotBlockSelector,
new NoOpMetricsSystem()); new NoOpMetricsSystem());

@ -111,7 +111,7 @@ public class FastSyncChainDownloaderTest {
protocolSchedule, protocolSchedule,
protocolContext, protocolContext,
ethContext, ethContext,
new PeerTaskExecutor(null, null, new NoOpMetricsSystem()), new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
syncState, syncState,
new NoOpMetricsSystem(), new NoOpMetricsSystem(),
new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()), new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()),

Loading…
Cancel
Save