From 30ec5b7ba195e3a335db45c93fd4d6ba1eb6c1ce Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 19 Mar 2019 10:44:57 +1000 Subject: [PATCH] Pipeline Improvements (#1117) * Capture input and output metrics for each pipe rather than out input. * Replace the batching processor with a BatchingReadPipe that wraps a normal pipe and creates batching on the fly. Avoids needing an extra thread and synchronization overhead to create batches. Signed-off-by: Adrian Sutton --- .../ethereum/eth/manager/EthScheduler.java | 2 +- .../worldstate/WorldStateDownloadProcess.java | 36 ++--- .../worldstate/LoadLocalDataStepTest.java | 3 +- .../worldstate/WorldStateDownloaderTest.java | 2 +- .../services/pipeline/BatchingProcessor.java | 32 ----- .../services/pipeline/BatchingReadPipe.java | 73 ++++++++++ .../services/pipeline/CompleterStage.java | 11 +- .../pantheon/services/pipeline/Pipe.java | 33 +++-- .../pantheon/services/pipeline/Pipeline.java | 15 +- .../services/pipeline/PipelineBuilder.java | 113 ++++++++++----- .../pantheon/services/pipeline/ReadPipe.java | 16 +-- .../pipeline/BatchingProcessorTest.java | 60 -------- .../pipeline/BatchingReadPipeTest.java | 134 ++++++++++++++++++ .../services/pipeline/CompleterStageTest.java | 12 +- .../pipeline/FlatMapProcessorTest.java | 4 +- .../pipeline/IteratorSourceStageTest.java | 2 +- .../services/pipeline/MapProcessorTest.java | 4 +- .../pantheon/services/pipeline/PipeTest.java | 53 +++++-- .../pipeline/PipelineBuilderTest.java | 71 +++++----- .../pipeline/ProcessingStageTest.java | 4 +- 20 files changed, 432 insertions(+), 248 deletions(-) delete mode 100644 services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java create mode 100644 services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipe.java delete mode 100644 services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java create mode 100644 services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java index 75fb142a39..0337c00f2d 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java @@ -118,7 +118,7 @@ public class EthScheduler { return serviceFuture; } - public CompletableFuture startPipeline(final Pipeline pipeline) { + public CompletableFuture startPipeline(final Pipeline pipeline) { final CompletableFuture pipelineFuture = pipeline.start(servicesExecutor); serviceFutures.add(pipelineFuture); pipelineFuture.whenComplete((r, t) -> serviceFutures.remove(pipelineFuture)); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java index 503c92e8bb..52da694bc9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java @@ -23,6 +23,7 @@ import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.services.pipeline.Pipe; import tech.pegasys.pantheon.services.pipeline.Pipeline; +import tech.pegasys.pantheon.services.pipeline.PipelineBuilder; import tech.pegasys.pantheon.services.pipeline.WritePipe; import tech.pegasys.pantheon.services.tasks.Task; import tech.pegasys.pantheon.util.ExceptionUtils; @@ -75,13 +76,13 @@ import org.apache.logging.log4j.Logger; */ public class WorldStateDownloadProcess { private static final Logger LOG = LogManager.getLogger(); - private final Pipeline fetchDataPipeline; - private final Pipeline completionPipeline; + private final Pipeline> fetchDataPipeline; + private final Pipeline> completionPipeline; private final WritePipe> requestsToComplete; private WorldStateDownloadProcess( - final Pipeline fetchDataPipeline, - final Pipeline completionPipeline, + final Pipeline> fetchDataPipeline, + final Pipeline> completionPipeline, final WritePipe> requestsToComplete) { this.fetchDataPipeline = fetchDataPipeline; this.completionPipeline = completionPipeline; @@ -198,11 +199,20 @@ public class WorldStateDownloadProcess { MetricCategory.SYNCHRONIZER, "world_state_pipeline_processed_total", "Number of entries processed by each world state download pipeline stage", - "step"); + "step", + "action"); - final Pipe> requestsToComplete = - new Pipe<>(bufferCapacity, outputCounter.labels("requestDataAvailable")); - final Pipeline fetchDataPipeline = + final Pipeline> completionPipeline = + PipelineBuilder.>createPipeline( + "requestDataAvailable", bufferCapacity, outputCounter) + .andFinishWith( + "requestCompleteTask", + task -> + completeTaskStep.markAsCompleteOrFailed( + pivotBlockHeader, downloadState, task)); + + final Pipe> requestsToComplete = completionPipeline.getInputPipe(); + final Pipeline> fetchDataPipeline = createPipelineFrom( "requestDequeued", new TaskQueueIterator(downloadState), @@ -213,7 +223,7 @@ public class WorldStateDownloadProcess { task -> loadLocalDataStep.loadLocalData(task, requestsToComplete), 3, bufferCapacity) - .inBatches("batchCreate", hashCountPerRequest) + .inBatches(hashCountPerRequest) .thenProcessAsync( "batchDownloadData", requestTasks -> @@ -225,14 +235,6 @@ public class WorldStateDownloadProcess { .andFinishWith( "batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put)); - final Pipeline completionPipeline = - createPipelineFrom(requestsToComplete, outputCounter) - .andFinishWith( - "requestCompleteTask", - task -> - completeTaskStep.markAsCompleteOrFailed( - pivotBlockHeader, downloadState, task)); - return new WorldStateDownloadProcess( fetchDataPipeline, completionPipeline, requestsToComplete); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java index eb6336aadc..06f08ce624 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java @@ -40,7 +40,8 @@ public class LoadLocalDataStepTest { private final CodeNodeDataRequest request = NodeDataRequest.createCodeRequest(HASH); private final Task task = new StubTask(request); - private final Pipe> completedTasks = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe> completedTasks = + new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); private final LoadLocalDataStep loadLocalDataStep = new LoadLocalDataStep(worldStateStorage, new NoOpMetricsSystem()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index e4a9a0ef71..b82b895292 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -1011,7 +1011,7 @@ public class WorldStateDownloaderTest { } private void giveOtherThreadsAGo() { - LockSupport.parkNanos(10); + LockSupport.parkNanos(200); } @FunctionalInterface diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java deleted file mode 100644 index 2d91cf4531..0000000000 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.pipeline; - -import java.util.List; - -class BatchingProcessor implements Processor> { - - private final int maximumBatchSize; - - public BatchingProcessor(final int maximumBatchSize) { - this.maximumBatchSize = maximumBatchSize; - } - - @Override - public void processNextInput(final ReadPipe inputPipe, final WritePipe> outputPipe) { - final List batch = inputPipe.getBatch(maximumBatchSize); - if (!batch.isEmpty()) { - outputPipe.put(batch); - } - } -} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipe.java new file mode 100644 index 0000000000..3deb4dd75c --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipe.java @@ -0,0 +1,73 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import tech.pegasys.pantheon.metrics.Counter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class BatchingReadPipe implements ReadPipe> { + + private final ReadPipe input; + private final int maximumBatchSize; + private final Counter batchCounter; + + public BatchingReadPipe( + final ReadPipe input, final int maximumBatchSize, final Counter batchCounter) { + this.input = input; + this.maximumBatchSize = maximumBatchSize; + this.batchCounter = batchCounter; + } + + @Override + public boolean hasMore() { + return input.hasMore(); + } + + @Override + public List get() { + final T firstItem = input.get(); + if (firstItem == null) { + // Contract of get is to explicitly return null when no more items are available. + // An empty list is not a suitable thing to return here. + return null; + } + final List batch = new ArrayList<>(); + batch.add(firstItem); + input.drainTo(batch, maximumBatchSize - 1); + batchCounter.inc(); + return batch; + } + + @Override + public List poll() { + final List batch = new ArrayList<>(); + input.drainTo(batch, maximumBatchSize); + if (batch.isEmpty()) { + // Poll has to return null if the pipe is empty + return null; + } + batchCounter.inc(); + return batch; + } + + @Override + public void drainTo(final Collection> output, final int maxElements) { + final List nextBatch = poll(); + if (nextBatch != null) { + output.add(nextBatch); + } + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java index 0a14b36642..5851e01f87 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java @@ -12,26 +12,18 @@ */ package tech.pegasys.pantheon.services.pipeline; -import tech.pegasys.pantheon.metrics.Counter; - import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; class CompleterStage implements Stage { private final ReadPipe input; private final Consumer completer; - private final Counter outputCounter; private final String name; private final CompletableFuture future = new CompletableFuture<>(); - CompleterStage( - final String name, - final ReadPipe input, - final Consumer completer, - final Counter outputCounter) { + CompleterStage(final String name, final ReadPipe input, final Consumer completer) { this.input = input; this.completer = completer; - this.outputCounter = outputCounter; this.name = name; } @@ -41,7 +33,6 @@ class CompleterStage implements Stage { final T value = input.get(); if (value != null) { completer.accept(value); - outputCounter.inc(); } } future.complete(null); diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java index 44d94d9769..3939cfea5a 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java @@ -14,9 +14,7 @@ package tech.pegasys.pantheon.services.pipeline; import tech.pegasys.pantheon.metrics.Counter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -40,14 +38,16 @@ public class Pipe implements ReadPipe, WritePipe { private static final Logger LOG = LogManager.getLogger(); private final BlockingQueue queue; private final int capacity; - private final Counter itemCounter; + private final Counter inputCounter; + private final Counter outputCounter; private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean aborted = new AtomicBoolean(); - public Pipe(final int capacity, final Counter itemCounter) { + public Pipe(final int capacity, final Counter inputCounter, final Counter outputCounter) { queue = new ArrayBlockingQueue<>(capacity); this.capacity = capacity; - this.itemCounter = itemCounter; + this.inputCounter = inputCounter; + this.outputCounter = outputCounter; } @Override @@ -93,6 +93,7 @@ public class Pipe implements ReadPipe, WritePipe { while (hasMore()) { final T value = queue.poll(1, TimeUnit.SECONDS); if (value != null) { + outputCounter.inc(); return value; } } @@ -104,19 +105,17 @@ public class Pipe implements ReadPipe, WritePipe { @Override public T poll() { - return queue.poll(); + final T item = queue.poll(); + if (item != null) { + outputCounter.inc(); + } + return item; } @Override - public List getBatch(final int maximumBatchSize) { - final T nextItem = get(); - if (nextItem == null) { - return Collections.emptyList(); - } - final List batch = new ArrayList<>(); - batch.add(nextItem); - queue.drainTo(batch, maximumBatchSize - 1); - return batch; + public void drainTo(final Collection output, final int maxElements) { + final int count = queue.drainTo(output, maxElements); + outputCounter.inc(count); } @Override @@ -124,7 +123,7 @@ public class Pipe implements ReadPipe, WritePipe { while (isOpen()) { try { if (queue.offer(value, 1, TimeUnit.SECONDS)) { - itemCounter.inc(); + inputCounter.inc(); return; } } catch (final InterruptedException e) { diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java index 8cb3441604..83f0dd01fd 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java @@ -28,9 +28,9 @@ import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class Pipeline { +public class Pipeline { private static final Logger LOG = LogManager.getLogger(); - private final Pipe inputPipe; + private final Pipe inputPipe; private final Collection stages; private final Collection> pipes; private final CompleterStage completerStage; @@ -48,7 +48,7 @@ public class Pipeline { private volatile List> futures; Pipeline( - final Pipe inputPipe, + final Pipe inputPipe, final Collection stages, final Collection> pipes, final CompleterStage completerStage) { @@ -58,6 +58,15 @@ public class Pipeline { this.completerStage = completerStage; } + /** + * Get the input pipe for this pipeline. + * + * @return the input pipe. + */ + public Pipe getInputPipe() { + return inputPipe; + } + /** * Starts execution of the pipeline. Each stage in the pipeline requires a dedicated thread from * the supplied executor service. diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java index 483ea46cc9..c33f239a7d 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java @@ -37,26 +37,30 @@ import java.util.stream.Stream; * received by the consumer. The pipeline will halt immediately if an exception is thrown from any * processing stage. * + * @param the type of item input to the very start of this pipeline. * @param the output type of the last stage in the pipeline. */ -public class PipelineBuilder { +public class PipelineBuilder { - private final Pipe inputPipe; + private final Pipe inputPipe; private final Collection stages; private final Collection> pipes; + private final String lastStageName; private final ReadPipe pipeEnd; private final int bufferSize; private final LabelledMetric outputCounter; public PipelineBuilder( - final Pipe inputPipe, + final Pipe inputPipe, final Collection stages, final Collection> pipes, + final String lastStageName, final ReadPipe pipeEnd, final int bufferSize, final LabelledMetric outputCounter) { - this.outputCounter = outputCounter; checkArgument(!pipes.isEmpty(), "Must have at least one pipe in a pipeline"); + this.lastStageName = lastStageName; + this.outputCounter = outputCounter; this.inputPipe = inputPipe; this.stages = stages; this.pipes = pipes; @@ -72,36 +76,38 @@ public class PipelineBuilder { * @param sourceName the name of this stage. Used as the label for the output count metric. * @param source the source to pull items from for processing. * @param bufferSize the number of items to be buffered between each stage in the pipeline. - * @param outputCounter the counter to increment for each output of a stage. Must have a single - * label which will be filled with the stage name. + * @param itemCounter the counter to increment for each output of a stage. Must accept two labels, + * the stage name and action (output or drained). * @param the type of items input into the pipeline. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public static PipelineBuilder createPipelineFrom( + public static PipelineBuilder createPipelineFrom( final String sourceName, final Iterator source, final int bufferSize, - final LabelledMetric outputCounter) { - final Pipe pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName)); + final LabelledMetric itemCounter) { + final Pipe pipe = createPipe(bufferSize, sourceName, itemCounter); final IteratorSourceStage sourceStage = new IteratorSourceStage<>(sourceName, source, pipe); return new PipelineBuilder<>( - pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter); + pipe, singleton(sourceStage), singleton(pipe), sourceName, pipe, bufferSize, itemCounter); } /** * Create a new pipeline that processes inputs added to pipe. The pipeline completes when * pipe is closed and the last item has been reached the end of the pipeline. * - * @param pipe the pipe feeding the pipeline. + * @param sourceName the name of this stage. Used as the label for the output count metric. + * @param bufferSize the number of items to be buffered between each stage in the pipeline. * @param outputCounter the counter to increment for each output of a stage. Must have a single * label which will be filled with the stage name. * @param the type of items input into the pipeline. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public static PipelineBuilder createPipelineFrom( - final Pipe pipe, final LabelledMetric outputCounter) { + public static PipelineBuilder createPipeline( + final String sourceName, final int bufferSize, final LabelledMetric outputCounter) { + final Pipe pipe = createPipe(bufferSize, sourceName, outputCounter); return new PipelineBuilder<>( - pipe, emptyList(), singleton(pipe), pipe, pipe.getCapacity(), outputCounter); + pipe, emptyList(), singleton(pipe), sourceName, pipe, pipe.getCapacity(), outputCounter); } /** @@ -113,7 +119,7 @@ public class PipelineBuilder { * @param the output type for this processing step. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public PipelineBuilder thenProcess( + public PipelineBuilder thenProcess( final String stageName, final Function processor) { final Processor singleStepStage = new MapProcessor<>(processor); return addStage(singleStepStage, stageName); @@ -131,7 +137,7 @@ public class PipelineBuilder { * @param the output type for this processing step. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public PipelineBuilder thenProcessInParallel( + public PipelineBuilder thenProcessInParallel( final String stageName, final Function processor, final int numberOfThreads) { return thenProcessInParallel( stageName, () -> new MapProcessor<>(processor), numberOfThreads, bufferSize); @@ -154,7 +160,7 @@ public class PipelineBuilder { * @param the output type for this processing step. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public PipelineBuilder thenProcessAsync( + public PipelineBuilder thenProcessAsync( final String stageName, final Function> processor, final int maxConcurrency) { @@ -169,14 +175,22 @@ public class PipelineBuilder { * *

The output buffer size is reduced to bufferSize / maximumBatchSize + 1. * - * @param stageName the name of this stage. Used as the label for the output count metric. * @param maximumBatchSize the maximum number of items to include in a batch. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public PipelineBuilder> inBatches(final String stageName, final int maximumBatchSize) { + public PipelineBuilder> inBatches(final int maximumBatchSize) { checkArgument(maximumBatchSize > 0, "Maximum batch size must be greater than 0"); - return addStage( - new BatchingProcessor<>(maximumBatchSize), bufferSize / maximumBatchSize + 1, stageName); + return new PipelineBuilder<>( + inputPipe, + stages, + pipes, + lastStageName, + new BatchingReadPipe<>( + pipeEnd, + maximumBatchSize, + outputCounter.labels(lastStageName + "_outputPipe", "batches")), + bufferSize / maximumBatchSize + 1, + outputCounter); } /** @@ -184,7 +198,7 @@ public class PipelineBuilder { * is called and each item of the {@link Stream} it returns is output as an individual item. The * returned Stream may be empty to remove an item. * - *

This can be used to reverse the effect of {@link #inBatches(String, int)} with: + *

This can be used to reverse the effect of {@link #inBatches(int)} with: * *

thenFlatMap(List::stream, newBufferSize)
* @@ -194,7 +208,7 @@ public class PipelineBuilder { * @param the type of items to be output from this stage. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public PipelineBuilder thenFlatMap( + public PipelineBuilder thenFlatMap( final String stageName, final Function> mapper, final int newBufferSize) { return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName); } @@ -205,7 +219,7 @@ public class PipelineBuilder { * returned Stream may be empty to remove an item. Multiple threads process items in the pipeline * concurrently. * - *

This can be used to reverse the effect of {@link #inBatches(String, int)} with: + *

This can be used to reverse the effect of {@link #inBatches(int)} with: * *

thenFlatMap(List::stream, newBufferSize)
* @@ -216,7 +230,7 @@ public class PipelineBuilder { * @param the type of items to be output from this stage. * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ - public PipelineBuilder thenFlatMapInParallel( + public PipelineBuilder thenFlatMapInParallel( final String stageName, final Function> mapper, final int numberOfThreads, @@ -232,20 +246,17 @@ public class PipelineBuilder { * @param completer the {@link Consumer} that accepts the final output of the pipeline. * @return the constructed pipeline ready to execute. */ - public Pipeline andFinishWith(final String stageName, final Consumer completer) { - return new Pipeline( - inputPipe, - stages, - pipes, - new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName))); + public Pipeline andFinishWith(final String stageName, final Consumer completer) { + return new Pipeline<>( + inputPipe, stages, pipes, new CompleterStage<>(stageName, pipeEnd, completer)); } - private PipelineBuilder thenProcessInParallel( + private PipelineBuilder thenProcessInParallel( final String stageName, final Supplier> createProcessor, final int numberOfThreads, final int newBufferSize) { - final Pipe newPipeEnd = new Pipe<>(newBufferSize, outputCounter.labels(stageName)); + final Pipe newPipeEnd = createPipe(newBufferSize, stageName, outputCounter); final WritePipe outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads); final ArrayList newStages = new ArrayList<>(stages); for (int i = 0; i < numberOfThreads; i++) { @@ -254,24 +265,37 @@ public class PipelineBuilder { newStages.add(processStage); } return new PipelineBuilder<>( - inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, newBufferSize, outputCounter); + inputPipe, + newStages, + concat(pipes, newPipeEnd), + stageName, + newPipeEnd, + newBufferSize, + outputCounter); } - private PipelineBuilder addStage(final Processor processor, final String stageName) { + private PipelineBuilder addStage( + final Processor processor, final String stageName) { return addStage(processor, bufferSize, stageName); } - private PipelineBuilder addStage( + private PipelineBuilder addStage( final Processor processor, final int newBufferSize, final String stageName) { - final Pipe outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName)); + final Pipe outputPipe = createPipe(newBufferSize, stageName, outputCounter); final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor); return addStage(processStage, outputPipe); } - private PipelineBuilder addStage(final Stage stage, final Pipe outputPipe) { + private PipelineBuilder addStage(final Stage stage, final Pipe outputPipe) { final List newStages = concat(stages, stage); return new PipelineBuilder<>( - inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter); + inputPipe, + newStages, + concat(pipes, outputPipe), + stage.getName(), + outputPipe, + bufferSize, + outputCounter); } private List concat(final Collection existing, final X newItem) { @@ -279,4 +303,15 @@ public class PipelineBuilder { newList.add(newItem); return newList; } + + private static Pipe createPipe( + final int newBufferSize, + final String stageName, + final LabelledMetric outputCounter) { + final String labelName = stageName + "_outputPipe"; + return new Pipe<>( + newBufferSize, + outputCounter.labels(labelName, "added"), + outputCounter.labels(labelName, "removed")); + } } diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java index 026d95f64f..cab3413329 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java @@ -12,7 +12,7 @@ */ package tech.pegasys.pantheon.services.pipeline; -import java.util.List; +import java.util.Collection; /** * The interface used to read items from a pipe. @@ -47,15 +47,11 @@ public interface ReadPipe { T poll(); /** - * Get a batch of values from the pipe containing at most maximumBatchSize items. - * This method will block until at least one item is available but will not wait until the batch - * is full. + * Removes at most the given number of available elements from the pipe and adds them to the given + * collection. This method does not block. * - *

An empty list will be returned if the queue is closed or the thread interrupted while - * waiting for the next value. - * - * @param maximumBatchSize the maximum number of items to read. - * @return the batch that was read. + * @param output the collection to transfer elements into + * @param maxElements the maximum number of elements to transfer */ - List getBatch(int maximumBatchSize); + void drainTo(Collection output, int maxElements); } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java deleted file mode 100644 index 21fb184c8a..0000000000 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.pipeline; - -import static java.util.Arrays.asList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyZeroInteractions; -import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; - -import java.util.List; - -import org.junit.Test; - -public class BatchingProcessorTest { - - private final Pipe inputPipe = new Pipe<>(10, NO_OP_COUNTER); - private final Pipe> outputPipe = new Pipe<>(10, NO_OP_COUNTER); - private final BatchingProcessor stage = new BatchingProcessor<>(3); - - @Test - public void shouldCreateBatches() { - for (int i = 1; i <= 8; i++) { - inputPipe.put(i); - } - inputPipe.close(); - - stage.processNextInput(inputPipe, outputPipe); - - assertThat(outputPipe.poll()).isEqualTo(asList(1, 2, 3)); - assertThat(outputPipe.poll()).isNull(); - - stage.processNextInput(inputPipe, outputPipe); - assertThat(outputPipe.poll()).isEqualTo(asList(4, 5, 6)); - assertThat(outputPipe.poll()).isNull(); - - stage.processNextInput(inputPipe, outputPipe); - assertThat(outputPipe.poll()).isEqualTo(asList(7, 8)); - assertThat(outputPipe.poll()).isNull(); - } - - @Test - public void shouldNotOutputItemWhenInputIsClosed() { - @SuppressWarnings("unchecked") - final WritePipe> outputPipe = mock(WritePipe.class); - inputPipe.close(); - stage.processNextInput(inputPipe, outputPipe); - verifyZeroInteractions(outputPipe); - } -} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java new file mode 100644 index 0000000000..f4b6a7402c --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java @@ -0,0 +1,134 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; + +import tech.pegasys.pantheon.metrics.Counter; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +public class BatchingReadPipeTest { + + private final Pipe source = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Counter batchCounter = mock(Counter.class); + private final BatchingReadPipe batchingPipe = + new BatchingReadPipe<>(source, 3, batchCounter); + + @Test + public void shouldGetABatchOfAvailableItems() { + source.put("a"); + source.put("b"); + source.put("c"); + source.put("d"); + + assertThat(batchingPipe.get()).containsExactly("a", "b", "c"); + } + + @Test + public void shouldNotWaitToFillBatch() { + source.put("a"); + assertThat(batchingPipe.get()).containsExactly("a"); + } + + @Test + public void shouldPollForNextBatchWhenAvailable() { + source.put("a"); + assertThat(batchingPipe.poll()).containsExactly("a"); + } + + @Test + public void shouldReturnNullFromPollWhenNoItemsAvailable() { + assertThat(batchingPipe.poll()).isNull(); + } + + @Test + public void shouldHaveMoreItemsWhileSourceHasMoreItems() { + assertThat(batchingPipe.hasMore()).isTrue(); + + source.put("a"); + source.put("b"); + source.put("c"); + + assertThat(batchingPipe.hasMore()).isTrue(); + + source.close(); + + assertThat(batchingPipe.hasMore()).isTrue(); + + assertThat(batchingPipe.get()).containsExactly("a", "b", "c"); + + assertThat(batchingPipe.hasMore()).isFalse(); + + assertThat(batchingPipe.get()).isNull(); + assertThat(batchingPipe.poll()).isNull(); + } + + @Test + public void shouldAddAtMostOneItemWhenDraining() { + // Collecting a batch of batches is pretty silly so only ever drain one batch at a time. + source.put("a"); + source.put("b"); + source.put("c"); + source.put("1"); + source.put("2"); + source.put("3"); + + final List> output = new ArrayList<>(); + batchingPipe.drainTo(output, 6); + // Note still only 3 items in the batch. + assertThat(output).containsExactly(asList("a", "b", "c")); + } + + @Test + public void shouldCountBatchesReturnedFromGet() { + source.put("a"); + source.close(); + batchingPipe.get(); + assertThat(batchingPipe.get()).isNull(); + + verify(batchCounter, times(1)).inc(); + } + + @Test + public void shouldCountBatchesReturnedFromPoll() { + assertThat(batchingPipe.poll()).isNull(); + verifyZeroInteractions(batchCounter); + + source.put("a"); + batchingPipe.poll(); + + verify(batchCounter, times(1)).inc(); + } + + @Test + public void shouldCountBatchesReturnedFromDrainTo() { + final List> output = new ArrayList<>(); + batchingPipe.drainTo(output, 3); + verifyZeroInteractions(batchCounter); + + source.put("a"); + batchingPipe.drainTo(output, 3); + + verify(batchCounter, times(1)).inc(); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java index e7a1919ad7..01007c14c8 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java @@ -13,13 +13,8 @@ package tech.pegasys.pantheon.services.pipeline; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; -import tech.pegasys.pantheon.metrics.Counter; - import java.util.ArrayList; import java.util.List; @@ -27,11 +22,9 @@ import org.junit.Test; public class CompleterStageTest { - private final Pipe pipe = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe pipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); private final List output = new ArrayList<>(); - private final Counter outputCounter = mock(Counter.class); - private final CompleterStage stage = - new CompleterStage<>("name", pipe, output::add, outputCounter); + private final CompleterStage stage = new CompleterStage<>("name", pipe, output::add); @Test public void shouldAddItemsToOutputUntilPipeHasNoMore() { @@ -43,6 +36,5 @@ public class CompleterStageTest { stage.run(); assertThat(output).containsExactly("a", "b", "c"); - verify(outputCounter, times(3)).inc(); } } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java index 127f2ae30d..ab1de8f8dd 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java @@ -26,8 +26,8 @@ import org.junit.Test; public class FlatMapProcessorTest { - private final Pipe input = new Pipe<>(10, NO_OP_COUNTER); - private final Pipe output = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); @SuppressWarnings("unchecked") private final Function> mapper = mock(Function.class); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java index 3f47246723..c9a28a5edf 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java @@ -20,7 +20,7 @@ import org.junit.Test; public class IteratorSourceStageTest { - private final Pipe output = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); private final IteratorSourceStage stage = new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java index 3ccb84aef4..a8cedb1821 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java @@ -25,8 +25,8 @@ import org.junit.Test; public class MapProcessorTest { - private final Pipe input = new Pipe<>(10, NO_OP_COUNTER); - private final Pipe output = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); @SuppressWarnings("unchecked") private final Function processor = mock(Function.class); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java index bb163dd3fa..0ca2a1bd35 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java @@ -19,11 +19,15 @@ import static org.mockito.Mockito.verify; import tech.pegasys.pantheon.metrics.Counter; +import java.util.ArrayList; +import java.util.List; + import org.junit.Test; public class PipeTest { - private final Counter itemCounter = mock(Counter.class); - private final Pipe pipe = new Pipe<>(5, itemCounter); + private final Counter inputCounter = mock(Counter.class); + private final Counter outputCounter = mock(Counter.class); + private final Pipe pipe = new Pipe<>(5, inputCounter, outputCounter); @Test public void shouldNotHaveMoreWhenEmptyAndClosed() { @@ -52,19 +56,23 @@ public class PipeTest { } @Test - public void shouldLimitBatchMaximumSize() { + public void shouldLimitNumberOfItemsDrained() { pipe.put("a"); pipe.put("b"); pipe.put("c"); pipe.put("d"); - assertThat(pipe.getBatch(3)).containsExactly("a", "b", "c"); + final List output = new ArrayList<>(); + pipe.drainTo(output, 3); + assertThat(output).containsExactly("a", "b", "c"); } @Test public void shouldNotWaitToReachMaximumSizeBeforeReturningBatch() { pipe.put("a"); - assertThat(pipe.getBatch(3)).containsExactly("a"); + final List output = new ArrayList<>(); + pipe.drainTo(output, 3); + assertThat(output).containsExactly("a"); } @Test @@ -74,10 +82,39 @@ public class PipeTest { } @Test - public void shouldIncrementCounterWhenItemAddedToPipe() { + public void shouldIncrementInputCounterWhenItemAddedToPipe() { + pipe.put("A"); + verify(inputCounter).inc(); + pipe.put("B"); + verify(inputCounter, times(2)).inc(); + } + + @Test + public void shouldIncrementOutputCounterWhenItemRemovedToPipeWithPoll() { + pipe.put("A"); + pipe.put("B"); + pipe.poll(); + verify(outputCounter).inc(); + + pipe.poll(); + verify(outputCounter, times(2)).inc(); + + assertThat(pipe.poll()).isNull(); + verify(outputCounter, times(2)).inc(); + } + + @Test + public void shouldIncrementOutputCounterWhenItemRemovedToPipeWithGet() { pipe.put("A"); - verify(itemCounter).inc(); pipe.put("B"); - verify(itemCounter, times(2)).inc(); + pipe.close(); + pipe.get(); + verify(outputCounter).inc(); + + pipe.get(); + verify(outputCounter, times(2)).inc(); + + assertThat(pipe.get()).isNull(); + verify(outputCounter, times(2)).inc(); } } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java index 011d209e88..a109b7ed42 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java @@ -12,15 +12,15 @@ */ package tech.pegasys.pantheon.services.pipeline; -import static com.google.common.primitives.Ints.asList; +import static java.util.Arrays.asList; import static java.util.Collections.synchronizedList; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.waitAtMost; -import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_COUNTER; import tech.pegasys.pantheon.metrics.Counter; @@ -75,7 +75,7 @@ public class PipelineBuilderTest { @Test public void shouldPipeTasksFromSupplierToCompleter() throws Exception { final List output = new ArrayList<>(); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .andFinishWith("end", output::add); final CompletableFuture result = pipeline.start(executorService); @@ -86,7 +86,7 @@ public class PipelineBuilderTest { @Test public void shouldPassInputThroughIntermediateStage() throws Exception { final List output = new ArrayList<>(); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenProcess("toString", Object::toString) .andFinishWith("end", output::add); @@ -100,14 +100,15 @@ public class PipelineBuilderTest { @Test public void shouldCombineIntoBatches() throws Exception { - final Pipe input = new Pipe<>(20, NO_OP_COUNTER); - tasks.forEachRemaining(input::put); final BlockingQueue> output = new ArrayBlockingQueue<>(10); - final Pipeline pipeline = - PipelineBuilder.createPipelineFrom(input, NO_OP_LABELLED_COUNTER) - .inBatches("batch", 6) + final Pipeline pipeline = + PipelineBuilder.createPipeline("source", 20, NO_OP_LABELLED_COUNTER) + .inBatches(6) .andFinishWith("end", output::offer); + final Pipe input = pipeline.getInputPipe(); + tasks.forEachRemaining(input::put); + final CompletableFuture result = pipeline.start(executorService); assertThat(output.poll(10, SECONDS)).containsExactly(1, 2, 3, 4, 5, 6); @@ -131,7 +132,7 @@ public class PipelineBuilderTest { @Test public void shouldProcessAsync() throws Exception { final List output = new ArrayList<>(); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3) .andFinishWith("end", output::add); @@ -146,7 +147,7 @@ public class PipelineBuilderTest { public void shouldLimitInFlightProcessesWhenProcessingAsync() throws Exception { final List output = new ArrayList<>(); final List> futures = new CopyOnWriteArrayList<>(); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom( "input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_COUNTER) .thenProcessAsync( @@ -184,7 +185,7 @@ public class PipelineBuilderTest { @Test public void shouldFlatMapItems() throws Exception { final List output = new ArrayList<>(); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20) .andFinishWith("end", output::add); @@ -201,7 +202,7 @@ public class PipelineBuilderTest { public void shouldProcessInParallel() throws Exception { final List output = synchronizedList(new ArrayList<>()); final CountDownLatch latch = new CountDownLatch(1); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenProcessInParallel( "stageName", @@ -236,7 +237,7 @@ public class PipelineBuilderTest { public void shouldFlatMapInParallel() throws Exception { final List output = synchronizedList(new ArrayList<>()); final CountDownLatch latch = new CountDownLatch(1); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenFlatMapInParallel( "stageName", @@ -276,7 +277,7 @@ public class PipelineBuilderTest { final AtomicBoolean processorInterrupted = new AtomicBoolean(false); final List output = synchronizedList(new ArrayList<>()); final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenProcess( "stageName", @@ -312,7 +313,7 @@ public class PipelineBuilderTest { final AtomicBoolean processorInterrupted = new AtomicBoolean(false); final List output = synchronizedList(new ArrayList<>()); final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenProcess( "stageName", @@ -345,7 +346,7 @@ public class PipelineBuilderTest { @Test public void shouldAbortPipelineWhenProcessorThrowsException() { final RuntimeException expectedError = new RuntimeException("Oops"); - final Pipeline pipeline = + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) .thenProcess( "stageName", @@ -365,32 +366,38 @@ public class PipelineBuilderTest { } @Test - public void shouldTrackTaskCountMetric() throws Exception { + public void shouldTrackTaskCountMetrics() throws Exception { final Map counters = new ConcurrentHashMap<>(); final LabelledMetric labelledCounter = - labels -> counters.computeIfAbsent(labels[0], label -> new SimpleCounter()); - final Pipeline pipeline = + labels -> + counters.computeIfAbsent(labels[0] + "-" + labels[1], label -> new SimpleCounter()); + final Pipeline pipeline = PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter) .thenProcess("map", Function.identity()) .thenProcessInParallel("parallel", Function.identity(), 3) .thenProcessAsync("async", CompletableFuture::completedFuture, 3) - .inBatches("batch", 4) + .inBatches(4) .thenFlatMap("flatMap", List::stream, 10) .andFinishWith("finish", new ArrayList<>()::add); pipeline.start(executorService).get(10, SECONDS); - assertThat(counters) - .containsOnlyKeys("input", "map", "parallel", "async", "batch", "flatMap", "finish"); - assertThat(counters.get("input").count).hasValue(15); - assertThat(counters.get("map").count).hasValue(15); - assertThat(counters.get("parallel").count).hasValue(15); - assertThat(counters.get("async").count).hasValue(15); - assertThat(counters.get("flatMap").count).hasValue(15); - assertThat(counters.get("finish").count).hasValue(15); - // We don't know how many batches will be produced because it's timing dependent but it must - // be at least 4 to fit all the items and shouldn't be more than the items we put in. - assertThat(counters.get("batch").count).hasValueBetween(4, 15); + final List stepNames = asList("input", "map", "parallel", "async", "flatMap"); + final List expectedMetricNames = + Stream.concat( + Stream.of("async_outputPipe-batches"), + stepNames.stream() + .map(stageName -> stageName + "_outputPipe") + .flatMap( + metricName -> Stream.of(metricName + "-added", metricName + "-removed"))) + .collect(toList()); + assertThat(counters).containsOnlyKeys(expectedMetricNames); + + expectedMetricNames.stream() + .filter(name -> !name.endsWith("-batches")) + .forEach(metric -> assertThat(counters.get(metric).count).hasValue(15)); + + assertThat(counters.get("async_outputPipe-batches").count).hasValueBetween(4, 15); } private void waitForSize(final Collection collection, final int targetSize) { diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java index d800b442bf..593fcd0dc6 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java @@ -30,8 +30,8 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class ProcessingStageTest { - private final Pipe inputPipe = new Pipe<>(10, NO_OP_COUNTER); - private final Pipe outputPipe = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe inputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe outputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); @Mock private Processor singleStep; private ProcessingStage stage;