|
|
|
@ -37,9 +37,6 @@ import java.util.Map; |
|
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
|
import java.util.function.Function; |
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
public class DownloadReceiptsStep |
|
|
|
|
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> { |
|
|
|
|
|
|
|
|
@ -65,37 +62,8 @@ public class DownloadReceiptsStep |
|
|
|
|
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { |
|
|
|
|
return ethContext |
|
|
|
|
.getScheduler() |
|
|
|
|
.scheduleServiceTask( |
|
|
|
|
() -> { |
|
|
|
|
Map<BlockHeader, List<TransactionReceipt>> getReceipts = new HashMap<>(); |
|
|
|
|
do { |
|
|
|
|
GetReceiptsFromPeerTask task = |
|
|
|
|
new GetReceiptsFromPeerTask(headers, new BodyValidator()); |
|
|
|
|
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> |
|
|
|
|
getReceiptsResult = peerTaskExecutor.execute(task); |
|
|
|
|
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS |
|
|
|
|
&& getReceiptsResult.result().isPresent()) { |
|
|
|
|
Map<BlockHeader, List<TransactionReceipt>> taskResult = |
|
|
|
|
getReceiptsResult.result().get(); |
|
|
|
|
taskResult |
|
|
|
|
.keySet() |
|
|
|
|
.forEach( |
|
|
|
|
(blockHeader) -> |
|
|
|
|
getReceipts.merge( |
|
|
|
|
blockHeader, |
|
|
|
|
taskResult.get(blockHeader), |
|
|
|
|
(initialReceipts, newReceipts) -> { |
|
|
|
|
throw new IllegalStateException( |
|
|
|
|
"Unexpectedly got receipts for block header already populated!"); |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
// remove all the headers we found receipts for
|
|
|
|
|
headers.removeAll(getReceipts.keySet()); |
|
|
|
|
// repeat until all headers have receipts
|
|
|
|
|
} while (!headers.isEmpty()); |
|
|
|
|
return CompletableFuture.completedFuture( |
|
|
|
|
combineBlocksAndReceipts(blocks, getReceipts)); |
|
|
|
|
}); |
|
|
|
|
.scheduleServiceTask(() -> getReceiptsWithPeerTaskSystem(headers)) |
|
|
|
|
.thenApply((receipts) -> combineBlocksAndReceipts(blocks, receipts)); |
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem) |
|
|
|
@ -104,6 +72,35 @@ public class DownloadReceiptsStep |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> |
|
|
|
|
getReceiptsWithPeerTaskSystem(final List<BlockHeader> headers) { |
|
|
|
|
Map<BlockHeader, List<TransactionReceipt>> getReceipts = new HashMap<>(); |
|
|
|
|
do { |
|
|
|
|
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(headers, new BodyValidator()); |
|
|
|
|
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> getReceiptsResult = |
|
|
|
|
peerTaskExecutor.execute(task); |
|
|
|
|
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS |
|
|
|
|
&& getReceiptsResult.result().isPresent()) { |
|
|
|
|
Map<BlockHeader, List<TransactionReceipt>> taskResult = getReceiptsResult.result().get(); |
|
|
|
|
taskResult |
|
|
|
|
.keySet() |
|
|
|
|
.forEach( |
|
|
|
|
(blockHeader) -> |
|
|
|
|
getReceipts.merge( |
|
|
|
|
blockHeader, |
|
|
|
|
taskResult.get(blockHeader), |
|
|
|
|
(initialReceipts, newReceipts) -> { |
|
|
|
|
throw new IllegalStateException( |
|
|
|
|
"Unexpectedly got receipts for block header already populated!"); |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
// remove all the headers we found receipts for
|
|
|
|
|
headers.removeAll(getReceipts.keySet()); |
|
|
|
|
// repeat until all headers have receipts
|
|
|
|
|
} while (!headers.isEmpty()); |
|
|
|
|
return CompletableFuture.completedFuture(getReceipts); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private List<BlockWithReceipts> combineBlocksAndReceipts( |
|
|
|
|
final List<Block> blocks, final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader) { |
|
|
|
|
return blocks.stream() |
|
|
|
|