7311: Fix DownloadReceiptsStep when using peer task system

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7638/head
Matilda Clerke 2 months ago
parent 24e73a85c8
commit f077206c66
  1. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java
  2. 49
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java
  3. 10
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java

@ -93,6 +93,10 @@ public class CheckpointDownloadBlockStep {
.orElseThrow(
() ->
new IllegalStateException("PeerTask response code was success, but empty"));
if (block.getBody().getTransactions().size() != transactionReceipts.size()) {
throw new IllegalStateException(
"PeerTask response code was success, but incorrect number of receipts returned");
}
BlockWithReceipts blockWithReceipts = new BlockWithReceipts(block, transactionReceipts);
futureReceipts.complete(Optional.of(blockWithReceipts));
} else {

@ -30,9 +30,8 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.FutureUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -59,26 +58,41 @@ public class DownloadReceiptsStep
@Override
public CompletableFuture<List<BlockWithReceipts>> apply(final List<Block> blocks) {
final List<BlockHeader> headers = blocks.stream().map(Block::getHeader).collect(toList());
final CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> getReceipts;
final Map<BlockHeader, List<TransactionReceipt>> getReceipts;
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
getReceipts = new HashMap<BlockHeader, List<TransactionReceipt>>();
do {
GetReceiptsFromPeerTask getReceiptsFromPeerTask =
new GetReceiptsFromPeerTask(headers, new BodyValidator());
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> getReceiptsResult =
peerTaskExecutor.execute(getReceiptsFromPeerTask);
if (getReceiptsResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& getReceiptsResult.getResult().isPresent()) {
getReceipts = CompletableFuture.completedFuture(getReceiptsResult.getResult().get());
} else {
getReceipts = CompletableFuture.completedFuture(Collections.emptyMap());
Map<BlockHeader, List<TransactionReceipt>> receiptsResult =
getReceiptsResult.getResult().get();
receiptsResult
.keySet()
.forEach(
(bh) ->
getReceipts.merge(
bh,
receiptsResult.get(bh),
(initialReceipts, newReceipts) -> {
throw new IllegalStateException(
"Unexpectedly got receipts for block header already populated!");
}));
}
// remove all the headers we found receipts for
headers.removeAll(getReceipts.keySet());
// repeat until all headers have receipts
} while (!headers.isEmpty());
return CompletableFuture.completedFuture(combineBlocksAndReceipts(blocks, getReceipts));
} else {
getReceipts = GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run();
return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem)
.run()
.thenApply((receipts) -> combineBlocksAndReceipts(blocks, receipts));
}
final CompletableFuture<List<BlockWithReceipts>> combineWithBlocks =
getReceipts.thenApply(
receiptsByHeader -> combineBlocksAndReceipts(blocks, receiptsByHeader));
FutureUtils.propagateCancellation(combineWithBlocks, getReceipts);
return combineWithBlocks;
}
private List<BlockWithReceipts> combineBlocksAndReceipts(
@ -88,8 +102,17 @@ public class DownloadReceiptsStep
block -> {
final List<TransactionReceipt> receipts =
receiptsByHeader.getOrDefault(block.getHeader(), emptyList());
if (block.getBody().getTransactions().size() != receipts.size()) {
throw new IllegalStateException(
"PeerTask response code was success, but incorrect number of receipts returned. Header hash: "
+ block.getHeader().getHash()
+ ", Transactions: "
+ block.getBody().getTransactions().size()
+ ", receipts: "
+ receipts.size());
}
return new BlockWithReceipts(block, receipts);
})
.collect(toList());
.toList();
}
}

@ -21,10 +21,12 @@ import static org.mockito.Mockito.mock;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
@ -81,7 +83,7 @@ public class DownloadReceiptsStepTest {
}
@Test
public void shouldDownloadReceiptsForBlocks() throws IllegalAccessException {
public void shouldDownloadReceiptsForBlocks() {
DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(
ethProtocolManager.ethContext(),
@ -106,7 +108,7 @@ public class DownloadReceiptsStepTest {
@Test
public void shouldDownloadReceiptsForBlocksUsingPeerTaskSystem()
throws IllegalAccessException, ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException {
DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(
ethProtocolManager.ethContext(),
@ -150,6 +152,10 @@ public class DownloadReceiptsStepTest {
final Block block = Mockito.mock(Block.class);
final BlockHeader blockHeader = Mockito.mock(BlockHeader.class);
Mockito.when(block.getHeader()).thenAnswer((invocationOnMock) -> blockHeader);
final BlockBody blockBody = Mockito.mock(BlockBody.class);
Mockito.when(block.getBody()).thenAnswer((invocationOnMock) -> blockBody);
Mockito.when(blockBody.getTransactions())
.thenAnswer((invocationOnMock) -> List.of(Mockito.mock(Transaction.class)));
return block;
}
}

Loading…
Cancel
Save