From f077206c66e339c8afd12ee8c46dc2a3b8da3fa4 Mon Sep 17 00:00:00 2001 From: Matilda Clerke Date: Wed, 25 Sep 2024 15:38:15 +1000 Subject: [PATCH] 7311: Fix DownloadReceiptsStep when using peer task system Signed-off-by: Matilda Clerke --- .../CheckpointDownloadBlockStep.java | 4 ++ .../sync/fastsync/DownloadReceiptsStep.java | 63 +++++++++++++------ .../fastsync/DownloadReceiptsStepTest.java | 10 ++- 3 files changed, 55 insertions(+), 22 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java index 9b742af9f7..603d4448a6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java @@ -93,6 +93,10 @@ public class CheckpointDownloadBlockStep { .orElseThrow( () -> new IllegalStateException("PeerTask response code was success, but empty")); + if (block.getBody().getTransactions().size() != transactionReceipts.size()) { + throw new IllegalStateException( + "PeerTask response code was success, but incorrect number of receipts returned"); + } BlockWithReceipts blockWithReceipts = new BlockWithReceipts(block, transactionReceipts); futureReceipts.complete(Optional.of(blockWithReceipts)); } else { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index a9ef69a86d..5e0a44cee2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -30,9 +30,8 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask; import org.hyperledger.besu.ethereum.mainnet.BodyValidator; import org.hyperledger.besu.plugin.services.MetricsSystem; -import org.hyperledger.besu.util.FutureUtils; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -59,26 +58,41 @@ public class DownloadReceiptsStep @Override public CompletableFuture> apply(final List blocks) { final List headers = blocks.stream().map(Block::getHeader).collect(toList()); - final CompletableFuture>> getReceipts; + final Map> getReceipts; if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { - GetReceiptsFromPeerTask getReceiptsFromPeerTask = - new GetReceiptsFromPeerTask(headers, new BodyValidator()); - PeerTaskExecutorResult>> getReceiptsResult = - peerTaskExecutor.execute(getReceiptsFromPeerTask); - if (getReceiptsResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS - && getReceiptsResult.getResult().isPresent()) { - getReceipts = CompletableFuture.completedFuture(getReceiptsResult.getResult().get()); - } else { - getReceipts = CompletableFuture.completedFuture(Collections.emptyMap()); - } + getReceipts = new HashMap>(); + do { + GetReceiptsFromPeerTask getReceiptsFromPeerTask = + new GetReceiptsFromPeerTask(headers, new BodyValidator()); + PeerTaskExecutorResult>> getReceiptsResult = + peerTaskExecutor.execute(getReceiptsFromPeerTask); + if (getReceiptsResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS + && getReceiptsResult.getResult().isPresent()) { + Map> receiptsResult = + getReceiptsResult.getResult().get(); + receiptsResult + .keySet() + .forEach( + (bh) -> + getReceipts.merge( + bh, + receiptsResult.get(bh), + (initialReceipts, newReceipts) -> { + throw new IllegalStateException( + "Unexpectedly got receipts for block header already populated!"); + })); + } + // remove all the headers we found receipts for + headers.removeAll(getReceipts.keySet()); + // repeat until all headers have receipts + } while (!headers.isEmpty()); + return CompletableFuture.completedFuture(combineBlocksAndReceipts(blocks, getReceipts)); + } else { - getReceipts = GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run(); + return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem) + .run() + .thenApply((receipts) -> combineBlocksAndReceipts(blocks, receipts)); } - final CompletableFuture> combineWithBlocks = - getReceipts.thenApply( - receiptsByHeader -> combineBlocksAndReceipts(blocks, receiptsByHeader)); - FutureUtils.propagateCancellation(combineWithBlocks, getReceipts); - return combineWithBlocks; } private List combineBlocksAndReceipts( @@ -88,8 +102,17 @@ public class DownloadReceiptsStep block -> { final List receipts = receiptsByHeader.getOrDefault(block.getHeader(), emptyList()); + if (block.getBody().getTransactions().size() != receipts.size()) { + throw new IllegalStateException( + "PeerTask response code was success, but incorrect number of receipts returned. Header hash: " + + block.getHeader().getHash() + + ", Transactions: " + + block.getBody().getTransactions().size() + + ", receipts: " + + receipts.size()); + } return new BlockWithReceipts(block, receipts); }) - .collect(toList()); + .toList(); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java index c6f4015160..4270251e6c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java @@ -21,10 +21,12 @@ import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture; +import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; @@ -81,7 +83,7 @@ public class DownloadReceiptsStepTest { } @Test - public void shouldDownloadReceiptsForBlocks() throws IllegalAccessException { + public void shouldDownloadReceiptsForBlocks() { DownloadReceiptsStep downloadReceiptsStep = new DownloadReceiptsStep( ethProtocolManager.ethContext(), @@ -106,7 +108,7 @@ public class DownloadReceiptsStepTest { @Test public void shouldDownloadReceiptsForBlocksUsingPeerTaskSystem() - throws IllegalAccessException, ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { DownloadReceiptsStep downloadReceiptsStep = new DownloadReceiptsStep( ethProtocolManager.ethContext(), @@ -150,6 +152,10 @@ public class DownloadReceiptsStepTest { final Block block = Mockito.mock(Block.class); final BlockHeader blockHeader = Mockito.mock(BlockHeader.class); Mockito.when(block.getHeader()).thenAnswer((invocationOnMock) -> blockHeader); + final BlockBody blockBody = Mockito.mock(BlockBody.class); + Mockito.when(block.getBody()).thenAnswer((invocationOnMock) -> blockBody); + Mockito.when(blockBody.getTransactions()) + .thenAnswer((invocationOnMock) -> List.of(Mockito.mock(Transaction.class))); return block; } }