diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/util/BlockImporter.java b/pantheon/src/main/java/tech/pegasys/pantheon/util/BlockImporter.java index 38dfc47502..0cc6c38455 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/util/BlockImporter.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/util/BlockImporter.java @@ -12,15 +12,15 @@ */ package tech.pegasys.pantheon.util; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.logging.log4j.LogManager.getLogger; import tech.pegasys.pantheon.controller.PantheonController; import tech.pegasys.pantheon.ethereum.ProtocolContext; -import tech.pegasys.pantheon.ethereum.blockcreation.AbstractBlockCreator; -import tech.pegasys.pantheon.ethereum.blockcreation.BlockMiner; import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -31,6 +31,12 @@ import tech.pegasys.pantheon.util.uint.UInt256; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import com.google.common.base.MoreObjects; import org.apache.logging.log4j.Logger; @@ -38,6 +44,12 @@ import org.apache.logging.log4j.Logger; /** Pantheon Block Import Util. */ public class BlockImporter { private static final Logger LOG = getLogger(); + + private final Semaphore blockBacklog = new Semaphore(2); + + private final ExecutorService validationExecutor = Executors.newCachedThreadPool(); + private final ExecutorService importExecutor = Executors.newSingleThreadExecutor(); + /** * Imports blocks that are stored as concatenated RLP sections in the given file into Pantheon's * block storage. @@ -45,15 +57,15 @@ public class BlockImporter { * @param blocks Path to the file containing the blocks * @param pantheonController the PantheonController that defines blockchain behavior * @param the consensus context type - * @param the type of miner being used within the executing pantheon * @return the import result * @throws IOException On Failure */ - public >> - BlockImporter.ImportResult importBlockchain( - final Path blocks, final PantheonController pantheonController) throws IOException { + public BlockImporter.ImportResult importBlockchain( + final Path blocks, final PantheonController pantheonController) throws IOException { final ProtocolSchedule protocolSchedule = pantheonController.getProtocolSchedule(); final ProtocolContext context = pantheonController.getProtocolContext(); + final MutableBlockchain blockchain = context.getBlockchain(); + int count = 0; try (final RawBlockIterator iterator = new RawBlockIterator( @@ -61,9 +73,8 @@ public class BlockImporter { rlp -> BlockHeader.readFrom( rlp, ScheduleBasedBlockHashFunction.create(protocolSchedule)))) { - final MutableBlockchain blockchain = context.getBlockchain(); - int count = 1; BlockHeader previousHeader = null; + CompletableFuture previousBlockFuture = null; while (iterator.hasNext()) { final Block block = iterator.next(); final BlockHeader header = block.getHeader(); @@ -80,31 +91,102 @@ public class BlockImporter { previousHeader = lookupPreviousHeader(blockchain, header); } final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(header.getNumber()); - final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); - final boolean validHeader = - blockHeaderValidator.validateHeader( - header, previousHeader, context, HeaderValidationMode.FULL); - if (!validHeader) { - throw new IllegalStateException( - "Invalid header at block number " + header.getNumber() + "."); + final BlockHeader lastHeader = previousHeader; + + final CompletableFuture validationFuture = + CompletableFuture.runAsync( + () -> validateBlock(protocolSpec, context, lastHeader, header), validationExecutor); + + final CompletableFuture extractingFuture = + CompletableFuture.runAsync(() -> extractSignatures(block)); + + final CompletableFuture calculationFutures; + if (previousBlockFuture == null) { + calculationFutures = extractingFuture; + } else { + calculationFutures = CompletableFuture.allOf(extractingFuture, previousBlockFuture); } - final tech.pegasys.pantheon.ethereum.core.BlockImporter blockImporter = - protocolSpec.getBlockImporter(); - final boolean blockImported = - blockImporter.importBlock(context, block, HeaderValidationMode.NONE); - if (!blockImported) { - throw new IllegalStateException( - "Invalid block at block number " + header.getNumber() + "."); + + try { + blockBacklog.acquire(); + } catch (final InterruptedException e) { + LOG.error("Interrupted adding to backlog.", e); + break; } + previousBlockFuture = + validationFuture.runAfterBothAsync( + calculationFutures, + () -> evaluateBlock(context, block, header, protocolSpec), + importExecutor); + ++count; previousHeader = header; } + if (previousBlockFuture != null) { + previousBlockFuture.join(); + } return new BlockImporter.ImportResult(blockchain.getChainHead().getTotalDifficulty(), count); } finally { + validationExecutor.shutdownNow(); + try { + validationExecutor.awaitTermination(5, SECONDS); + } catch (final Exception e) { + LOG.error("Error shutting down validatorExecutor.", e); + } + importExecutor.shutdownNow(); + try { + importExecutor.awaitTermination(5, SECONDS); + } catch (final Exception e) { + LOG.error("Error shutting down importExecutor", e); + } pantheonController.close(); } } + private void extractSignatures(final Block block) { + final List> futures = + new ArrayList<>(block.getBody().getTransactions().size()); + for (final Transaction tx : block.getBody().getTransactions()) { + futures.add(CompletableFuture.runAsync(tx::getSender, validationExecutor)); + } + for (final CompletableFuture future : futures) { + future.join(); + } + } + + private void validateBlock( + final ProtocolSpec protocolSpec, + final ProtocolContext context, + final BlockHeader previousHeader, + final BlockHeader header) { + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + final boolean validHeader = + blockHeaderValidator.validateHeader( + header, previousHeader, context, HeaderValidationMode.FULL); + if (!validHeader) { + throw new IllegalStateException("Invalid header at block number " + header.getNumber() + "."); + } + } + + private void evaluateBlock( + final ProtocolContext context, + final Block block, + final BlockHeader header, + final ProtocolSpec protocolSpec) { + try { + final tech.pegasys.pantheon.ethereum.core.BlockImporter blockImporter = + protocolSpec.getBlockImporter(); + final boolean blockImported = + blockImporter.importBlock(context, block, HeaderValidationMode.NONE); + if (!blockImported) { + throw new IllegalStateException( + "Invalid block at block number " + header.getNumber() + "."); + } + } finally { + blockBacklog.release(); + } + } + private BlockHeader lookupPreviousHeader( final MutableBlockchain blockchain, final BlockHeader header) { return blockchain @@ -121,7 +203,7 @@ public class BlockImporter { public final UInt256 td; - public final int count; + final int count; ImportResult(final UInt256 td, final int count) { this.td = td; diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java index 2bc49c2415..262f54df61 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java @@ -60,7 +60,8 @@ public final class BlockImporterTest { PrivacyParameters.noPrivacy()); final BlockImporter.ImportResult result = blockImporter.importBlockchain(source, targetController); - assertThat(result.count).isEqualTo(1000); + // Don't count the Genesis block + assertThat(result.count).isEqualTo(999); assertThat(result.td).isEqualTo(UInt256.of(21991996248790L)); } @@ -93,6 +94,7 @@ public final class BlockImporterTest { PrivacyParameters.noPrivacy()); final BlockImporter.ImportResult result = blockImporter.importBlockchain(source, controller); - assertThat(result.count).isEqualTo(959); + // Don't count the Genesis block + assertThat(result.count).isEqualTo(958); } }