From 5effe956eebe8162af9fc264930564a76ebb6738 Mon Sep 17 00:00:00 2001 From: Danno Ferrin Date: Wed, 20 Feb 2019 13:53:53 -0700 Subject: [PATCH] Do parallel extract signatures in the parallel block importer. (#844) * Do parallel extract signatures in the parallel block importer. * remove the extraction from FullSyncBlockHandler Signed-off-by: Adrian Sutton --- .../ethereum/eth/sync/BlockHandler.java | 2 + .../sync/fastsync/FastSyncBlockHandler.java | 5 ++ .../sync/fullsync/FullSyncBlockHandler.java | 21 +++---- .../ParallelExtractTxSignaturesTask.java | 62 +++++++++++++++++++ .../tasks/ParallelImportChainSegmentTask.java | 13 +++- 5 files changed, 90 insertions(+), 13 deletions(-) create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelExtractTxSignaturesTask.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java index b02e2089ea..0b36dd1dcf 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java @@ -23,4 +23,6 @@ public interface BlockHandler { CompletableFuture> validateAndImportBlocks(final List blocks); long extractBlockNumber(final B block); + + CompletableFuture executeParallelCalculations(List blocks); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java index ce9382a6a4..dea0d41a81 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java @@ -129,4 +129,9 @@ public class FastSyncBlockHandler implements BlockHandler public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) { return blockWithReceipt.getHeader().getNumber(); } + + @Override + public CompletableFuture executeParallelCalculations(final List blocks) { + return CompletableFuture.completedFuture(null); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java index 5c9571af44..5551bab73b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java @@ -17,6 +17,7 @@ import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask; @@ -25,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -68,8 +70,7 @@ public class FullSyncBlockHandler implements BlockHandler { @Override public CompletableFuture> downloadBlocks(final List headers) { return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer) - .run() - .thenCompose(this::extractTransactionSenders); + .run(); } @Override @@ -77,17 +78,15 @@ public class FullSyncBlockHandler implements BlockHandler { return block.getHeader().getNumber(); } - private CompletableFuture> extractTransactionSenders(final List blocks) { - LOG.debug( - "Extracting sender {} to {}", - blocks.get(0).getHeader().getNumber(), - blocks.get(blocks.size() - 1).getHeader().getNumber()); + @Override + public CompletableFuture executeParallelCalculations(final List blocks) { + final EthScheduler ethScheduler = ethContext.getScheduler(); + final List> calculations = new ArrayList<>(); for (final Block block : blocks) { - for (final Transaction transaction : block.getBody().getTransactions()) { - // This method internally performs the transaction sender extraction. - transaction.getSender(); + for (final Transaction tx : block.getBody().getTransactions()) { + calculations.add(ethScheduler.scheduleComputationTask(tx::getSender)); } } - return CompletableFuture.completedFuture(blocks); + return CompletableFuture.allOf(calculations.toArray(new CompletableFuture[0])); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelExtractTxSignaturesTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelExtractTxSignaturesTask.java new file mode 100644 index 0000000000..5844f90bbe --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelExtractTxSignaturesTask.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask; +import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +class ParallelExtractTxSignaturesTask extends AbstractPipelinedTask, List> { + private static final Logger LOG = LogManager.getLogger(); + + private final BlockHandler blockHandler; + + ParallelExtractTxSignaturesTask( + final BlockHandler blockHandler, + final BlockingQueue> inboundQueue, + final int outboundBacklogSize, + final LabelledMetric ethTasksTimer) { + super(inboundQueue, outboundBacklogSize, ethTasksTimer); + this.blockHandler = blockHandler; + } + + @Override + protected Optional> processStep( + final List bodies, final Optional> previousBodies) { + LOG.trace( + "Calculating fields for transactions between {} to {}", + blockHandler.extractBlockNumber(bodies.get(0)), + blockHandler.extractBlockNumber(bodies.get(bodies.size() - 1))); + + try { + blockHandler.executeParallelCalculations(bodies).get(); + } catch (final InterruptedException | ExecutionException e) { + result.get().completeExceptionally(e); + return Optional.empty(); + } + LOG.debug( + "Calculated fields for transactions between {} to {}", + blockHandler.extractBlockNumber(bodies.get(0)), + blockHandler.extractBlockNumber(bodies.get(bodies.size() - 1))); + return Optional.of(bodies); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java index 8a0107d417..06ce255f55 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java @@ -123,10 +123,13 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask downloadBodiesTask = new ParallelDownloadBodiesTask<>( blockHandler, validateHeadersTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer); + final ParallelExtractTxSignaturesTask extractTxSignaturesTask = + new ParallelExtractTxSignaturesTask<>( + blockHandler, downloadBodiesTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer); final ParallelValidateAndImportBodiesTask validateAndImportBodiesTask = new ParallelValidateAndImportBodiesTask<>( blockHandler, - downloadBodiesTask.getOutboundQueue(), + extractTxSignaturesTask.getOutboundQueue(), Integer.MAX_VALUE, ethTasksTimer); @@ -141,6 +144,9 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask downloadBodiesFuture = scheduler.scheduleServiceTask(downloadBodiesTask); registerSubTask(downloadBodiesFuture); + final CompletableFuture extractTxSignaturesFuture = + scheduler.scheduleServiceTask(extractTxSignaturesTask); + registerSubTask(extractTxSignaturesFuture); final CompletableFuture>> validateBodiesFuture = scheduler.scheduleServiceTask(validateAndImportBodiesTask); registerSubTask(validateBodiesFuture); @@ -149,7 +155,8 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask cancelOnException = (s, e) -> { @@ -157,6 +164,7 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask extends AbstractEthTask { if (e != null) {