From 4404cac632a7c5b50e6bff7f72cbe82d91aa7fc3 Mon Sep 17 00:00:00 2001 From: Danno Ferrin Date: Wed, 20 Feb 2019 12:35:36 -0700 Subject: [PATCH] Parallel downloader should stop on puts if requested. (#927) * move to an offer() instead of a put() on the downloader so that when the task is stopped the put will see that it is done and not wait forever. * remove peer based focus of the task. Let subtasks pick their peers. Signed-off-by: Adrian Sutton --- ...erTask.java => AbstractPipelinedTask.java} | 78 ++++++++++--------- .../tasks/ParallelDownloadBodiesTask.java | 13 +--- .../tasks/ParallelDownloadHeadersTask.java | 13 ++-- .../tasks/ParallelImportChainSegmentTask.java | 19 ++--- .../ParallelValidateAndImportBodiesTask.java | 11 +-- .../tasks/ParallelValidateHeadersTask.java | 13 +--- 6 files changed, 66 insertions(+), 81 deletions(-) rename ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/{AbstractPipelinedPeerTask.java => AbstractPipelinedTask.java} (58%) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java similarity index 58% rename from ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedPeerTask.java rename to ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java index 7aeadb626e..a3b1275530 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java @@ -12,8 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager.task; -import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; @@ -28,61 +26,69 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public abstract class AbstractPipelinedPeerTask extends AbstractPeerTask> { +public abstract class AbstractPipelinedTask extends AbstractEthTask> { private static final Logger LOG = LogManager.getLogger(); static final int TIMEOUT_MS = 1000; - private BlockingQueue inboundQueue; - private BlockingQueue outboundQueue; - private List results; + private final BlockingQueue inboundQueue; + private final BlockingQueue outboundQueue; + private final List results; private boolean shuttingDown = false; - private AtomicReference processingException = new AtomicReference<>(null); + private final AtomicReference processingException = new AtomicReference<>(null); - protected AbstractPipelinedPeerTask( + protected AbstractPipelinedTask( final BlockingQueue inboundQueue, final int outboundBacklogSize, - final EthContext ethContext, final LabelledMetric ethTasksTimer) { - super(ethContext, ethTasksTimer); + super(ethTasksTimer); this.inboundQueue = inboundQueue; outboundQueue = new LinkedBlockingQueue<>(outboundBacklogSize); results = new ArrayList<>(); } @Override - protected void executeTaskWithPeer(final EthPeer peer) { + protected void executeTask() { Optional previousInput = Optional.empty(); - while (!isDone() && processingException.get() == null) { - if (shuttingDown && inboundQueue.isEmpty()) { - break; - } - final I input; - try { - input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); - if (input == null) { - // timed out waiting for a result + try { + while (!isDone() && processingException.get() == null) { + if (shuttingDown && inboundQueue.isEmpty()) { + break; + } + final I input; + try { + input = inboundQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (input == null) { + // timed out waiting for a result + continue; + } + } catch (final InterruptedException e) { + // this is expected continue; } - } catch (final InterruptedException e) { - // this is expected - continue; + final Optional output = processStep(input, previousInput); + output.ifPresent( + o -> { + while (!isDone()) { + try { + if (outboundQueue.offer(o, 1, TimeUnit.SECONDS)) { + results.add(o); + break; + } + } catch (final InterruptedException e) { + processingException.compareAndSet(null, e); + break; + } + } + }); + previousInput = Optional.of(input); } - final Optional output = processStep(input, previousInput, peer); - output.ifPresent( - o -> { - try { - outboundQueue.put(o); - } catch (final InterruptedException e) { - processingException.compareAndSet(null, e); - } - results.add(o); - }); - previousInput = Optional.of(input); + } catch (final RuntimeException e) { + processingException.compareAndSet(null, e); } if (processingException.get() == null) { - result.get().complete(new PeerTaskResult<>(peer, results)); + result.get().complete(results); } else { result.get().completeExceptionally(processingException.get()); } @@ -105,5 +111,5 @@ public abstract class AbstractPipelinedPeerTask extends AbstractPeerTask processStep(I input, Optional previousInput, EthPeer peer); + protected abstract Optional processStep(I input, Optional previousInput); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadBodiesTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadBodiesTask.java index b2ab6ad3b6..8b4e5b3405 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadBodiesTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadBodiesTask.java @@ -13,9 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks; import tech.pegasys.pantheon.ethereum.core.BlockHeader; -import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask; import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; @@ -29,7 +27,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ParallelDownloadBodiesTask - extends AbstractPipelinedPeerTask, List> { + extends AbstractPipelinedTask, List> { private static final Logger LOG = LogManager.getLogger(); private final BlockHandler blockHandler; @@ -38,18 +36,15 @@ public class ParallelDownloadBodiesTask final BlockHandler blockHandler, final BlockingQueue> inboundQueue, final int outboundBacklogSize, - final EthContext ethContext, final LabelledMetric ethTasksTimer) { - super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + super(inboundQueue, outboundBacklogSize, ethTasksTimer); this.blockHandler = blockHandler; } @Override protected Optional> processStep( - final List headers, - final Optional> previousHeaders, - final EthPeer peer) { + final List headers, final Optional> previousHeaders) { LOG.trace( "Downloading bodies {} to {}", headers.get(0).getNumber(), diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java index 78cb303a49..9c18238dbf 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelDownloadHeadersTask.java @@ -15,8 +15,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks; import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; @@ -32,11 +31,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ParallelDownloadHeadersTask - extends AbstractPipelinedPeerTask> { + extends AbstractPipelinedTask> { private static final Logger LOG = LogManager.getLogger(); private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; + private final EthContext ethContext; ParallelDownloadHeadersTask( final BlockingQueue inboundQueue, @@ -45,17 +45,17 @@ public class ParallelDownloadHeadersTask final ProtocolContext protocolContext, final EthContext ethContext, final LabelledMetric ethTasksTimer) { - super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + super(inboundQueue, outboundBacklogSize, ethTasksTimer); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; + this.ethContext = ethContext; } @Override protected Optional> processStep( final BlockHeader nextCheckpointHeader, - final Optional previousCheckpointHeader, - final EthPeer peer) { + final Optional previousCheckpointHeader) { if (!previousCheckpointHeader.isPresent()) { return Optional.empty(); } @@ -73,7 +73,6 @@ public class ParallelDownloadHeadersTask nextCheckpointHeader, segmentLength, ethTasksTimer); - downloadTask.assignPeer(peer); final CompletableFuture> headerFuture = executeSubTask(downloadTask::run); final List headers = Lists.newArrayList(previousCheckpointHeader.get()); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java index caa2f6d130..8a0107d417 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java @@ -17,7 +17,6 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask; -import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask; import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -120,21 +119,15 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask downloadBodiesTask = new ParallelDownloadBodiesTask<>( - blockHandler, - validateHeadersTask.getOutboundQueue(), - maxActiveChunks, - ethContext, - ethTasksTimer); + blockHandler, validateHeadersTask.getOutboundQueue(), maxActiveChunks, ethTasksTimer); final ParallelValidateAndImportBodiesTask validateAndImportBodiesTask = new ParallelValidateAndImportBodiesTask<>( blockHandler, downloadBodiesTask.getOutboundQueue(), Integer.MAX_VALUE, - ethContext, ethTasksTimer); // Start the pipeline. @@ -148,15 +141,15 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask downloadBodiesFuture = scheduler.scheduleServiceTask(downloadBodiesTask); registerSubTask(downloadBodiesFuture); - final CompletableFuture>>> validateBodiesFuture = + final CompletableFuture>> validateBodiesFuture = scheduler.scheduleServiceTask(validateAndImportBodiesTask); registerSubTask(validateBodiesFuture); // Hook in pipeline completion signaling. downloadHeadersTask.shutdown(); - downloadHeaderFuture.thenRun(() -> validateHeadersTask.shutdown()); - validateHeaderFuture.thenRun(() -> downloadBodiesTask.shutdown()); - downloadBodiesFuture.thenRun(() -> validateAndImportBodiesTask.shutdown()); + downloadHeaderFuture.thenRun(validateHeadersTask::shutdown); + validateHeaderFuture.thenRun(downloadBodiesTask::shutdown); + downloadBodiesFuture.thenRun(validateAndImportBodiesTask::shutdown); final BiConsumer cancelOnException = (s, e) -> { @@ -179,7 +172,7 @@ public class ParallelImportChainSegmentTask extends AbstractEthTask importedBlocks = - validateBodiesFuture.get().getResult().stream() + validateBodiesFuture.get().stream() .flatMap(Collection::stream) .collect(Collectors.toList()); result.get().complete(importedBlocks); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java index a2eafd4c08..51637b3a83 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java @@ -12,9 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync.tasks; -import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask; import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; @@ -29,7 +27,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ParallelValidateAndImportBodiesTask - extends AbstractPipelinedPeerTask, List> { + extends AbstractPipelinedTask, List> { private static final Logger LOG = LogManager.getLogger(); private final BlockHandler blockHandler; @@ -38,16 +36,15 @@ public class ParallelValidateAndImportBodiesTask final BlockHandler blockHandler, final BlockingQueue> inboundQueue, final int outboundBacklogSize, - final EthContext ethContext, final LabelledMetric ethTasksTimer) { - super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + super(inboundQueue, outboundBacklogSize, ethTasksTimer); this.blockHandler = blockHandler; } @Override protected Optional> processStep( - final List blocks, final Optional> previousBlocks, final EthPeer peer) { + final List blocks, final Optional> previousBlocks) { final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0)); final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1)); LOG.debug("Starting import of chain segment {} to {}", firstBlock, lastBlock); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateHeadersTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateHeadersTask.java index 5feeb5947b..0401d3b894 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateHeadersTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateHeadersTask.java @@ -14,9 +14,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks; import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.BlockHeader; -import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; -import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; -import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask; import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; @@ -33,7 +31,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ParallelValidateHeadersTask - extends AbstractPipelinedPeerTask, List> { + extends AbstractPipelinedTask, List> { private static final Logger LOG = LogManager.getLogger(); private final ProtocolSchedule protocolSchedule; @@ -46,9 +44,8 @@ public class ParallelValidateHeadersTask final int outboundBacklogSize, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, - final EthContext ethContext, final LabelledMetric ethTasksTimer) { - super(inboundQueue, outboundBacklogSize, ethContext, ethTasksTimer); + super(inboundQueue, outboundBacklogSize, ethTasksTimer); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; @@ -57,9 +54,7 @@ public class ParallelValidateHeadersTask @Override protected Optional> processStep( - final List headers, - final Optional> previousHeaders, - final EthPeer peer) { + final List headers, final Optional> previousHeaders) { LOG.debug( "Validating Headers {} to {}", headers.get(0).getNumber(),