Do parallel extract signatures in the parallel block importer. (#844)

* Do parallel extract signatures in the parallel block importer.

* remove the extraction from FullSyncBlockHandler

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent c4a0d69af9
commit 5effe956ee
  1. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java
  2. 5
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java
  3. 21
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java
  4. 62
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelExtractTxSignaturesTask.java
  5. 13
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java

@ -23,4 +23,6 @@ public interface BlockHandler<B> {
CompletableFuture<List<B>> validateAndImportBlocks(final List<B> blocks);
long extractBlockNumber(final B block);
CompletableFuture<Void> executeParallelCalculations(List<B> blocks);
}

@ -129,4 +129,9 @@ public class FastSyncBlockHandler<C> implements BlockHandler<BlockWithReceipts>
public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) {
return blockWithReceipt.getHeader().getNumber();
}
@Override
public CompletableFuture<Void> executeParallelCalculations(final List<BlockWithReceipts> blocks) {
return CompletableFuture.completedFuture(null);
}
}

@ -17,6 +17,7 @@ import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PersistBlockTask;
@ -25,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -68,8 +70,7 @@ public class FullSyncBlockHandler<C> implements BlockHandler<Block> {
@Override
public CompletableFuture<List<Block>> downloadBlocks(final List<BlockHeader> headers) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer)
.run()
.thenCompose(this::extractTransactionSenders);
.run();
}
@Override
@ -77,17 +78,15 @@ public class FullSyncBlockHandler<C> implements BlockHandler<Block> {
return block.getHeader().getNumber();
}
private CompletableFuture<List<Block>> extractTransactionSenders(final List<Block> blocks) {
LOG.debug(
"Extracting sender {} to {}",
blocks.get(0).getHeader().getNumber(),
blocks.get(blocks.size() - 1).getHeader().getNumber());
@Override
public CompletableFuture<Void> executeParallelCalculations(final List<Block> blocks) {
final EthScheduler ethScheduler = ethContext.getScheduler();
final List<CompletableFuture<?>> calculations = new ArrayList<>();
for (final Block block : blocks) {
for (final Transaction transaction : block.getBody().getTransactions()) {
// This method internally performs the transaction sender extraction.
transaction.getSender();
for (final Transaction tx : block.getBody().getTransactions()) {
calculations.add(ethScheduler.scheduleComputationTask(tx::getSender));
}
}
return CompletableFuture.completedFuture(blocks);
return CompletableFuture.allOf(calculations.toArray(new CompletableFuture<?>[0]));
}
}

@ -0,0 +1,62 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask;
import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
class ParallelExtractTxSignaturesTask<B> extends AbstractPipelinedTask<List<B>, List<B>> {
private static final Logger LOG = LogManager.getLogger();
private final BlockHandler<B> blockHandler;
ParallelExtractTxSignaturesTask(
final BlockHandler<B> blockHandler,
final BlockingQueue<List<B>> inboundQueue,
final int outboundBacklogSize,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(inboundQueue, outboundBacklogSize, ethTasksTimer);
this.blockHandler = blockHandler;
}
@Override
protected Optional<List<B>> processStep(
final List<B> bodies, final Optional<List<B>> previousBodies) {
LOG.trace(
"Calculating fields for transactions between {} to {}",
blockHandler.extractBlockNumber(bodies.get(0)),
blockHandler.extractBlockNumber(bodies.get(bodies.size() - 1)));
try {
blockHandler.executeParallelCalculations(bodies).get();
} catch (final InterruptedException | ExecutionException e) {
result.get().completeExceptionally(e);
return Optional.empty();
}
LOG.debug(
"Calculated fields for transactions between {} to {}",
blockHandler.extractBlockNumber(bodies.get(0)),
blockHandler.extractBlockNumber(bodies.get(bodies.size() - 1)));
return Optional.of(bodies);
}
}

@ -123,10 +123,13 @@ public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B
final ParallelDownloadBodiesTask<B> downloadBodiesTask =
new ParallelDownloadBodiesTask<>(
blockHandler, validateHeadersTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer);
final ParallelExtractTxSignaturesTask<B> extractTxSignaturesTask =
new ParallelExtractTxSignaturesTask<>(
blockHandler, downloadBodiesTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer);
final ParallelValidateAndImportBodiesTask<B> validateAndImportBodiesTask =
new ParallelValidateAndImportBodiesTask<>(
blockHandler,
downloadBodiesTask.getOutboundQueue(),
extractTxSignaturesTask.getOutboundQueue(),
Integer.MAX_VALUE,
ethTasksTimer);
@ -141,6 +144,9 @@ public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B
final CompletableFuture<?> downloadBodiesFuture =
scheduler.scheduleServiceTask(downloadBodiesTask);
registerSubTask(downloadBodiesFuture);
final CompletableFuture<?> extractTxSignaturesFuture =
scheduler.scheduleServiceTask(extractTxSignaturesTask);
registerSubTask(extractTxSignaturesFuture);
final CompletableFuture<List<List<B>>> validateBodiesFuture =
scheduler.scheduleServiceTask(validateAndImportBodiesTask);
registerSubTask(validateBodiesFuture);
@ -149,7 +155,8 @@ public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B
downloadHeadersTask.shutdown();
downloadHeaderFuture.thenRun(validateHeadersTask::shutdown);
validateHeaderFuture.thenRun(downloadBodiesTask::shutdown);
downloadBodiesFuture.thenRun(validateAndImportBodiesTask::shutdown);
downloadBodiesFuture.thenRun(extractTxSignaturesTask::shutdown);
extractTxSignaturesFuture.thenRun(validateAndImportBodiesTask::shutdown);
final BiConsumer<? super Object, ? super Throwable> cancelOnException =
(s, e) -> {
@ -157,6 +164,7 @@ public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B
downloadHeadersTask.cancel();
validateHeadersTask.cancel();
downloadBodiesTask.cancel();
extractTxSignaturesTask.cancel();
validateAndImportBodiesTask.cancel();
result.get().completeExceptionally(e);
}
@ -165,6 +173,7 @@ public class ParallelImportChainSegmentTask<C, B> extends AbstractEthTask<List<B
downloadHeaderFuture.whenComplete(cancelOnException);
validateHeaderFuture.whenComplete(cancelOnException);
downloadBodiesFuture.whenComplete(cancelOnException);
extractTxSignaturesFuture.whenComplete(cancelOnException);
validateBodiesFuture.whenComplete(
(r, e) -> {
if (e != null) {

Loading…
Cancel
Save