[NC-2100] Parallel Processing File Import Performance (#683)

* reactive by hand

* restore final checks

* rename variable

* explicitly shutdown the executors.

* fix log4j traps
try CompletableFuture.allOf

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent 7c835db064
commit 0fe372a22f
  1. 112
      pantheon/src/main/java/tech/pegasys/pantheon/util/BlockImporter.java
  2. 6
      pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java

@ -12,15 +12,15 @@
*/
package tech.pegasys.pantheon.util;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.logging.log4j.LogManager.getLogger;
import tech.pegasys.pantheon.controller.PantheonController;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.blockcreation.AbstractBlockCreator;
import tech.pegasys.pantheon.ethereum.blockcreation.BlockMiner;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
@ -31,6 +31,12 @@ import tech.pegasys.pantheon.util.uint.UInt256;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import com.google.common.base.MoreObjects;
import org.apache.logging.log4j.Logger;
@ -38,6 +44,12 @@ import org.apache.logging.log4j.Logger;
/** Pantheon Block Import Util. */
public class BlockImporter {
private static final Logger LOG = getLogger();
private final Semaphore blockBacklog = new Semaphore(2);
private final ExecutorService validationExecutor = Executors.newCachedThreadPool();
private final ExecutorService importExecutor = Executors.newSingleThreadExecutor();
/**
* Imports blocks that are stored as concatenated RLP sections in the given file into Pantheon's
* block storage.
@ -45,15 +57,15 @@ public class BlockImporter {
* @param blocks Path to the file containing the blocks
* @param pantheonController the PantheonController that defines blockchain behavior
* @param <C> the consensus context type
* @param <M> the type of miner being used within the executing pantheon
* @return the import result
* @throws IOException On Failure
*/
public <C, M extends BlockMiner<C, ? extends AbstractBlockCreator<C>>>
BlockImporter.ImportResult importBlockchain(
public <C> BlockImporter.ImportResult importBlockchain(
final Path blocks, final PantheonController<C> pantheonController) throws IOException {
final ProtocolSchedule<C> protocolSchedule = pantheonController.getProtocolSchedule();
final ProtocolContext<C> context = pantheonController.getProtocolContext();
final MutableBlockchain blockchain = context.getBlockchain();
int count = 0;
try (final RawBlockIterator iterator =
new RawBlockIterator(
@ -61,9 +73,8 @@ public class BlockImporter {
rlp ->
BlockHeader.readFrom(
rlp, ScheduleBasedBlockHashFunction.create(protocolSchedule)))) {
final MutableBlockchain blockchain = context.getBlockchain();
int count = 1;
BlockHeader previousHeader = null;
CompletableFuture<Void> previousBlockFuture = null;
while (iterator.hasNext()) {
final Block block = iterator.next();
final BlockHeader header = block.getHeader();
@ -80,14 +91,89 @@ public class BlockImporter {
previousHeader = lookupPreviousHeader(blockchain, header);
}
final ProtocolSpec<C> protocolSpec = protocolSchedule.getByBlockNumber(header.getNumber());
final BlockHeader lastHeader = previousHeader;
final CompletableFuture<Void> validationFuture =
CompletableFuture.runAsync(
() -> validateBlock(protocolSpec, context, lastHeader, header), validationExecutor);
final CompletableFuture<Void> extractingFuture =
CompletableFuture.runAsync(() -> extractSignatures(block));
final CompletableFuture<Void> calculationFutures;
if (previousBlockFuture == null) {
calculationFutures = extractingFuture;
} else {
calculationFutures = CompletableFuture.allOf(extractingFuture, previousBlockFuture);
}
try {
blockBacklog.acquire();
} catch (final InterruptedException e) {
LOG.error("Interrupted adding to backlog.", e);
break;
}
previousBlockFuture =
validationFuture.runAfterBothAsync(
calculationFutures,
() -> evaluateBlock(context, block, header, protocolSpec),
importExecutor);
++count;
previousHeader = header;
}
if (previousBlockFuture != null) {
previousBlockFuture.join();
}
return new BlockImporter.ImportResult(blockchain.getChainHead().getTotalDifficulty(), count);
} finally {
validationExecutor.shutdownNow();
try {
validationExecutor.awaitTermination(5, SECONDS);
} catch (final Exception e) {
LOG.error("Error shutting down validatorExecutor.", e);
}
importExecutor.shutdownNow();
try {
importExecutor.awaitTermination(5, SECONDS);
} catch (final Exception e) {
LOG.error("Error shutting down importExecutor", e);
}
pantheonController.close();
}
}
private void extractSignatures(final Block block) {
final List<CompletableFuture<Void>> futures =
new ArrayList<>(block.getBody().getTransactions().size());
for (final Transaction tx : block.getBody().getTransactions()) {
futures.add(CompletableFuture.runAsync(tx::getSender, validationExecutor));
}
for (final CompletableFuture<Void> future : futures) {
future.join();
}
}
private <C> void validateBlock(
final ProtocolSpec<C> protocolSpec,
final ProtocolContext<C> context,
final BlockHeader previousHeader,
final BlockHeader header) {
final BlockHeaderValidator<C> blockHeaderValidator = protocolSpec.getBlockHeaderValidator();
final boolean validHeader =
blockHeaderValidator.validateHeader(
header, previousHeader, context, HeaderValidationMode.FULL);
if (!validHeader) {
throw new IllegalStateException(
"Invalid header at block number " + header.getNumber() + ".");
throw new IllegalStateException("Invalid header at block number " + header.getNumber() + ".");
}
}
private <C> void evaluateBlock(
final ProtocolContext<C> context,
final Block block,
final BlockHeader header,
final ProtocolSpec<C> protocolSpec) {
try {
final tech.pegasys.pantheon.ethereum.core.BlockImporter<C> blockImporter =
protocolSpec.getBlockImporter();
final boolean blockImported =
@ -96,12 +182,8 @@ public class BlockImporter {
throw new IllegalStateException(
"Invalid block at block number " + header.getNumber() + ".");
}
++count;
previousHeader = header;
}
return new BlockImporter.ImportResult(blockchain.getChainHead().getTotalDifficulty(), count);
} finally {
pantheonController.close();
blockBacklog.release();
}
}
@ -121,7 +203,7 @@ public class BlockImporter {
public final UInt256 td;
public final int count;
final int count;
ImportResult(final UInt256 td, final int count) {
this.td = td;

@ -60,7 +60,8 @@ public final class BlockImporterTest {
PrivacyParameters.noPrivacy());
final BlockImporter.ImportResult result =
blockImporter.importBlockchain(source, targetController);
assertThat(result.count).isEqualTo(1000);
// Don't count the Genesis block
assertThat(result.count).isEqualTo(999);
assertThat(result.td).isEqualTo(UInt256.of(21991996248790L));
}
@ -93,6 +94,7 @@ public final class BlockImporterTest {
PrivacyParameters.noPrivacy());
final BlockImporter.ImportResult result = blockImporter.importBlockchain(source, controller);
assertThat(result.count).isEqualTo(959);
// Don't count the Genesis block
assertThat(result.count).isEqualTo(958);
}
}

Loading…
Cancel
Save