Pipeline Improvements (#1117)

* Capture input and output metrics for each pipe rather than out input.

* Replace the batching processor with a BatchingReadPipe that wraps a normal pipe and creates batching on the fly.

Avoids needing an extra thread and synchronization overhead to create batches.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 8ef56242dd
commit 30ec5b7ba1
  1. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java
  2. 36
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java
  3. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java
  4. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java
  5. 32
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java
  6. 73
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipe.java
  7. 11
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java
  8. 33
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java
  9. 15
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java
  10. 113
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java
  11. 16
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java
  12. 60
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java
  13. 134
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java
  14. 12
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java
  15. 4
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java
  16. 2
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java
  17. 4
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java
  18. 53
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java
  19. 71
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java
  20. 4
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java

@ -118,7 +118,7 @@ public class EthScheduler {
return serviceFuture;
}
public CompletableFuture<Void> startPipeline(final Pipeline pipeline) {
public CompletableFuture<Void> startPipeline(final Pipeline<?> pipeline) {
final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor);
serviceFutures.add(pipelineFuture);
pipelineFuture.whenComplete((r, t) -> serviceFutures.remove(pipelineFuture));

@ -23,6 +23,7 @@ import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipe;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;
import tech.pegasys.pantheon.services.pipeline.WritePipe;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
@ -75,13 +76,13 @@ import org.apache.logging.log4j.Logger;
*/
public class WorldStateDownloadProcess {
private static final Logger LOG = LogManager.getLogger();
private final Pipeline fetchDataPipeline;
private final Pipeline completionPipeline;
private final Pipeline<Task<NodeDataRequest>> fetchDataPipeline;
private final Pipeline<Task<NodeDataRequest>> completionPipeline;
private final WritePipe<Task<NodeDataRequest>> requestsToComplete;
private WorldStateDownloadProcess(
final Pipeline fetchDataPipeline,
final Pipeline completionPipeline,
final Pipeline<Task<NodeDataRequest>> fetchDataPipeline,
final Pipeline<Task<NodeDataRequest>> completionPipeline,
final WritePipe<Task<NodeDataRequest>> requestsToComplete) {
this.fetchDataPipeline = fetchDataPipeline;
this.completionPipeline = completionPipeline;
@ -198,11 +199,20 @@ public class WorldStateDownloadProcess {
MetricCategory.SYNCHRONIZER,
"world_state_pipeline_processed_total",
"Number of entries processed by each world state download pipeline stage",
"step");
"step",
"action");
final Pipe<Task<NodeDataRequest>> requestsToComplete =
new Pipe<>(bufferCapacity, outputCounter.labels("requestDataAvailable"));
final Pipeline fetchDataPipeline =
final Pipeline<Task<NodeDataRequest>> completionPipeline =
PipelineBuilder.<Task<NodeDataRequest>>createPipeline(
"requestDataAvailable", bufferCapacity, outputCounter)
.andFinishWith(
"requestCompleteTask",
task ->
completeTaskStep.markAsCompleteOrFailed(
pivotBlockHeader, downloadState, task));
final Pipe<Task<NodeDataRequest>> requestsToComplete = completionPipeline.getInputPipe();
final Pipeline<Task<NodeDataRequest>> fetchDataPipeline =
createPipelineFrom(
"requestDequeued",
new TaskQueueIterator(downloadState),
@ -213,7 +223,7 @@ public class WorldStateDownloadProcess {
task -> loadLocalDataStep.loadLocalData(task, requestsToComplete),
3,
bufferCapacity)
.inBatches("batchCreate", hashCountPerRequest)
.inBatches(hashCountPerRequest)
.thenProcessAsync(
"batchDownloadData",
requestTasks ->
@ -225,14 +235,6 @@ public class WorldStateDownloadProcess {
.andFinishWith(
"batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put));
final Pipeline completionPipeline =
createPipelineFrom(requestsToComplete, outputCounter)
.andFinishWith(
"requestCompleteTask",
task ->
completeTaskStep.markAsCompleteOrFailed(
pivotBlockHeader, downloadState, task));
return new WorldStateDownloadProcess(
fetchDataPipeline, completionPipeline, requestsToComplete);
}

@ -40,7 +40,8 @@ public class LoadLocalDataStepTest {
private final CodeNodeDataRequest request = NodeDataRequest.createCodeRequest(HASH);
private final Task<NodeDataRequest> task = new StubTask(request);
private final Pipe<Task<NodeDataRequest>> completedTasks = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<Task<NodeDataRequest>> completedTasks =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final LoadLocalDataStep loadLocalDataStep =
new LoadLocalDataStep(worldStateStorage, new NoOpMetricsSystem());

@ -1011,7 +1011,7 @@ public class WorldStateDownloaderTest {
}
private void giveOtherThreadsAGo() {
LockSupport.parkNanos(10);
LockSupport.parkNanos(200);
}
@FunctionalInterface

@ -1,32 +0,0 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.List;
class BatchingProcessor<T> implements Processor<T, List<T>> {
private final int maximumBatchSize;
public BatchingProcessor(final int maximumBatchSize) {
this.maximumBatchSize = maximumBatchSize;
}
@Override
public void processNextInput(final ReadPipe<T> inputPipe, final WritePipe<List<T>> outputPipe) {
final List<T> batch = inputPipe.getBatch(maximumBatchSize);
if (!batch.isEmpty()) {
outputPipe.put(batch);
}
}
}

@ -0,0 +1,73 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class BatchingReadPipe<T> implements ReadPipe<List<T>> {
private final ReadPipe<T> input;
private final int maximumBatchSize;
private final Counter batchCounter;
public BatchingReadPipe(
final ReadPipe<T> input, final int maximumBatchSize, final Counter batchCounter) {
this.input = input;
this.maximumBatchSize = maximumBatchSize;
this.batchCounter = batchCounter;
}
@Override
public boolean hasMore() {
return input.hasMore();
}
@Override
public List<T> get() {
final T firstItem = input.get();
if (firstItem == null) {
// Contract of get is to explicitly return null when no more items are available.
// An empty list is not a suitable thing to return here.
return null;
}
final List<T> batch = new ArrayList<>();
batch.add(firstItem);
input.drainTo(batch, maximumBatchSize - 1);
batchCounter.inc();
return batch;
}
@Override
public List<T> poll() {
final List<T> batch = new ArrayList<>();
input.drainTo(batch, maximumBatchSize);
if (batch.isEmpty()) {
// Poll has to return null if the pipe is empty
return null;
}
batchCounter.inc();
return batch;
}
@Override
public void drainTo(final Collection<List<T>> output, final int maxElements) {
final List<T> nextBatch = poll();
if (nextBatch != null) {
output.add(nextBatch);
}
}
}

@ -12,26 +12,18 @@
*/
package tech.pegasys.pantheon.services.pipeline;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
class CompleterStage<T> implements Stage {
private final ReadPipe<T> input;
private final Consumer<T> completer;
private final Counter outputCounter;
private final String name;
private final CompletableFuture<?> future = new CompletableFuture<>();
CompleterStage(
final String name,
final ReadPipe<T> input,
final Consumer<T> completer,
final Counter outputCounter) {
CompleterStage(final String name, final ReadPipe<T> input, final Consumer<T> completer) {
this.input = input;
this.completer = completer;
this.outputCounter = outputCounter;
this.name = name;
}
@ -41,7 +33,6 @@ class CompleterStage<T> implements Stage {
final T value = input.get();
if (value != null) {
completer.accept(value);
outputCounter.inc();
}
}
future.complete(null);

@ -14,9 +14,7 @@ package tech.pegasys.pantheon.services.pipeline;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@ -40,14 +38,16 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
private static final Logger LOG = LogManager.getLogger();
private final BlockingQueue<T> queue;
private final int capacity;
private final Counter itemCounter;
private final Counter inputCounter;
private final Counter outputCounter;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
public Pipe(final int capacity, final Counter itemCounter) {
public Pipe(final int capacity, final Counter inputCounter, final Counter outputCounter) {
queue = new ArrayBlockingQueue<>(capacity);
this.capacity = capacity;
this.itemCounter = itemCounter;
this.inputCounter = inputCounter;
this.outputCounter = outputCounter;
}
@Override
@ -93,6 +93,7 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
while (hasMore()) {
final T value = queue.poll(1, TimeUnit.SECONDS);
if (value != null) {
outputCounter.inc();
return value;
}
}
@ -104,19 +105,17 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
@Override
public T poll() {
return queue.poll();
final T item = queue.poll();
if (item != null) {
outputCounter.inc();
}
return item;
}
@Override
public List<T> getBatch(final int maximumBatchSize) {
final T nextItem = get();
if (nextItem == null) {
return Collections.emptyList();
}
final List<T> batch = new ArrayList<>();
batch.add(nextItem);
queue.drainTo(batch, maximumBatchSize - 1);
return batch;
public void drainTo(final Collection<T> output, final int maxElements) {
final int count = queue.drainTo(output, maxElements);
outputCounter.inc(count);
}
@Override
@ -124,7 +123,7 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
while (isOpen()) {
try {
if (queue.offer(value, 1, TimeUnit.SECONDS)) {
itemCounter.inc();
inputCounter.inc();
return;
}
} catch (final InterruptedException e) {

@ -28,9 +28,9 @@ import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Pipeline {
public class Pipeline<I> {
private static final Logger LOG = LogManager.getLogger();
private final Pipe<?> inputPipe;
private final Pipe<I> inputPipe;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
@ -48,7 +48,7 @@ public class Pipeline {
private volatile List<Future<?>> futures;
Pipeline(
final Pipe<?> inputPipe,
final Pipe<I> inputPipe,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
@ -58,6 +58,15 @@ public class Pipeline {
this.completerStage = completerStage;
}
/**
* Get the input pipe for this pipeline.
*
* @return the input pipe.
*/
public Pipe<I> getInputPipe() {
return inputPipe;
}
/**
* Starts execution of the pipeline. Each stage in the pipeline requires a dedicated thread from
* the supplied executor service.

@ -37,26 +37,30 @@ import java.util.stream.Stream;
* received by the consumer. The pipeline will halt immediately if an exception is thrown from any
* processing stage.
*
* @param <I> the type of item input to the very start of this pipeline.
* @param <T> the output type of the last stage in the pipeline.
*/
public class PipelineBuilder<T> {
public class PipelineBuilder<I, T> {
private final Pipe<?> inputPipe;
private final Pipe<I> inputPipe;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final String lastStageName;
private final ReadPipe<T> pipeEnd;
private final int bufferSize;
private final LabelledMetric<Counter> outputCounter;
public PipelineBuilder(
final Pipe<?> inputPipe,
final Pipe<I> inputPipe,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final String lastStageName,
final ReadPipe<T> pipeEnd,
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
this.outputCounter = outputCounter;
checkArgument(!pipes.isEmpty(), "Must have at least one pipe in a pipeline");
this.lastStageName = lastStageName;
this.outputCounter = outputCounter;
this.inputPipe = inputPipe;
this.stages = stages;
this.pipes = pipes;
@ -72,36 +76,38 @@ public class PipelineBuilder<T> {
* @param sourceName the name of this stage. Used as the label for the output count metric.
* @param source the source to pull items from for processing.
* @param bufferSize the number of items to be buffered between each stage in the pipeline.
* @param outputCounter the counter to increment for each output of a stage. Must have a single
* label which will be filled with the stage name.
* @param itemCounter the counter to increment for each output of a stage. Must accept two labels,
* the stage name and action (output or drained).
* @param <T> the type of items input into the pipeline.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T> createPipelineFrom(
public static <T> PipelineBuilder<T, T> createPipelineFrom(
final String sourceName,
final Iterator<T> source,
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
final Pipe<T> pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName));
final LabelledMetric<Counter> itemCounter) {
final Pipe<T> pipe = createPipe(bufferSize, sourceName, itemCounter);
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(sourceName, source, pipe);
return new PipelineBuilder<>(
pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter);
pipe, singleton(sourceStage), singleton(pipe), sourceName, pipe, bufferSize, itemCounter);
}
/**
* Create a new pipeline that processes inputs added to <i>pipe</i>. The pipeline completes when
* <i>pipe</i> is closed and the last item has been reached the end of the pipeline.
*
* @param pipe the pipe feeding the pipeline.
* @param sourceName the name of this stage. Used as the label for the output count metric.
* @param bufferSize the number of items to be buffered between each stage in the pipeline.
* @param outputCounter the counter to increment for each output of a stage. Must have a single
* label which will be filled with the stage name.
* @param <T> the type of items input into the pipeline.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T> createPipelineFrom(
final Pipe<T> pipe, final LabelledMetric<Counter> outputCounter) {
public static <T> PipelineBuilder<T, T> createPipeline(
final String sourceName, final int bufferSize, final LabelledMetric<Counter> outputCounter) {
final Pipe<T> pipe = createPipe(bufferSize, sourceName, outputCounter);
return new PipelineBuilder<>(
pipe, emptyList(), singleton(pipe), pipe, pipe.getCapacity(), outputCounter);
pipe, emptyList(), singleton(pipe), sourceName, pipe, pipe.getCapacity(), outputCounter);
}
/**
@ -113,7 +119,7 @@ public class PipelineBuilder<T> {
* @param <O> the output type for this processing step.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenProcess(
public <O> PipelineBuilder<I, O> thenProcess(
final String stageName, final Function<T, O> processor) {
final Processor<T, O> singleStepStage = new MapProcessor<>(processor);
return addStage(singleStepStage, stageName);
@ -131,7 +137,7 @@ public class PipelineBuilder<T> {
* @param <O> the output type for this processing step.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenProcessInParallel(
public <O> PipelineBuilder<I, O> thenProcessInParallel(
final String stageName, final Function<T, O> processor, final int numberOfThreads) {
return thenProcessInParallel(
stageName, () -> new MapProcessor<>(processor), numberOfThreads, bufferSize);
@ -154,7 +160,7 @@ public class PipelineBuilder<T> {
* @param <O> the output type for this processing step.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenProcessAsync(
public <O> PipelineBuilder<I, O> thenProcessAsync(
final String stageName,
final Function<T, CompletableFuture<O>> processor,
final int maxConcurrency) {
@ -169,14 +175,22 @@ public class PipelineBuilder<T> {
*
* <p>The output buffer size is reduced to <code>bufferSize / maximumBatchSize + 1</code>.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param maximumBatchSize the maximum number of items to include in a batch.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public PipelineBuilder<List<T>> inBatches(final String stageName, final int maximumBatchSize) {
public PipelineBuilder<I, List<T>> inBatches(final int maximumBatchSize) {
checkArgument(maximumBatchSize > 0, "Maximum batch size must be greater than 0");
return addStage(
new BatchingProcessor<>(maximumBatchSize), bufferSize / maximumBatchSize + 1, stageName);
return new PipelineBuilder<>(
inputPipe,
stages,
pipes,
lastStageName,
new BatchingReadPipe<>(
pipeEnd,
maximumBatchSize,
outputCounter.labels(lastStageName + "_outputPipe", "batches")),
bufferSize / maximumBatchSize + 1,
outputCounter);
}
/**
@ -184,7 +198,7 @@ public class PipelineBuilder<T> {
* is called and each item of the {@link Stream} it returns is output as an individual item. The
* returned Stream may be empty to remove an item.
*
* <p>This can be used to reverse the effect of {@link #inBatches(String, int)} with:
* <p>This can be used to reverse the effect of {@link #inBatches(int)} with:
*
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
*
@ -194,7 +208,7 @@ public class PipelineBuilder<T> {
* @param <O> the type of items to be output from this stage.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenFlatMap(
public <O> PipelineBuilder<I, O> thenFlatMap(
final String stageName, final Function<T, Stream<O>> mapper, final int newBufferSize) {
return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName);
}
@ -205,7 +219,7 @@ public class PipelineBuilder<T> {
* returned Stream may be empty to remove an item. Multiple threads process items in the pipeline
* concurrently.
*
* <p>This can be used to reverse the effect of {@link #inBatches(String, int)} with:
* <p>This can be used to reverse the effect of {@link #inBatches(int)} with:
*
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
*
@ -216,7 +230,7 @@ public class PipelineBuilder<T> {
* @param <O> the type of items to be output from this stage.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenFlatMapInParallel(
public <O> PipelineBuilder<I, O> thenFlatMapInParallel(
final String stageName,
final Function<T, Stream<O>> mapper,
final int numberOfThreads,
@ -232,20 +246,17 @@ public class PipelineBuilder<T> {
* @param completer the {@link Consumer} that accepts the final output of the pipeline.
* @return the constructed pipeline ready to execute.
*/
public Pipeline andFinishWith(final String stageName, final Consumer<T> completer) {
return new Pipeline(
inputPipe,
stages,
pipes,
new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName)));
public Pipeline<I> andFinishWith(final String stageName, final Consumer<T> completer) {
return new Pipeline<>(
inputPipe, stages, pipes, new CompleterStage<>(stageName, pipeEnd, completer));
}
private <O> PipelineBuilder<O> thenProcessInParallel(
private <O> PipelineBuilder<I, O> thenProcessInParallel(
final String stageName,
final Supplier<Processor<T, O>> createProcessor,
final int numberOfThreads,
final int newBufferSize) {
final Pipe<O> newPipeEnd = new Pipe<>(newBufferSize, outputCounter.labels(stageName));
final Pipe<O> newPipeEnd = createPipe(newBufferSize, stageName, outputCounter);
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Stage> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
@ -254,24 +265,37 @@ public class PipelineBuilder<T> {
newStages.add(processStage);
}
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, newBufferSize, outputCounter);
inputPipe,
newStages,
concat(pipes, newPipeEnd),
stageName,
newPipeEnd,
newBufferSize,
outputCounter);
}
private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final String stageName) {
private <O> PipelineBuilder<I, O> addStage(
final Processor<T, O> processor, final String stageName) {
return addStage(processor, bufferSize, stageName);
}
private <O> PipelineBuilder<O> addStage(
private <O> PipelineBuilder<I, O> addStage(
final Processor<T, O> processor, final int newBufferSize, final String stageName) {
final Pipe<O> outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName));
final Pipe<O> outputPipe = createPipe(newBufferSize, stageName, outputCounter);
final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor);
return addStage(processStage, outputPipe);
}
private <O> PipelineBuilder<O> addStage(final Stage stage, final Pipe<O> outputPipe) {
private <O> PipelineBuilder<I, O> addStage(final Stage stage, final Pipe<O> outputPipe) {
final List<Stage> newStages = concat(stages, stage);
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter);
inputPipe,
newStages,
concat(pipes, outputPipe),
stage.getName(),
outputPipe,
bufferSize,
outputCounter);
}
private <X> List<X> concat(final Collection<X> existing, final X newItem) {
@ -279,4 +303,15 @@ public class PipelineBuilder<T> {
newList.add(newItem);
return newList;
}
private static <O> Pipe<O> createPipe(
final int newBufferSize,
final String stageName,
final LabelledMetric<Counter> outputCounter) {
final String labelName = stageName + "_outputPipe";
return new Pipe<>(
newBufferSize,
outputCounter.labels(labelName, "added"),
outputCounter.labels(labelName, "removed"));
}
}

@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.List;
import java.util.Collection;
/**
* The interface used to read items from a pipe.
@ -47,15 +47,11 @@ public interface ReadPipe<T> {
T poll();
/**
* Get a batch of values from the pipe containing at most <code>maximumBatchSize</code> items.
* This method will block until at least one item is available but will not wait until the batch
* is full.
* Removes at most the given number of available elements from the pipe and adds them to the given
* collection. This method does not block.
*
* <p>An empty list will be returned if the queue is closed or the thread interrupted while
* waiting for the next value.
*
* @param maximumBatchSize the maximum number of items to read.
* @return the batch that was read.
* @param output the collection to transfer elements into
* @param maxElements the maximum number of elements to transfer
*/
List<T> getBatch(int maximumBatchSize);
void drainTo(Collection<T> output, int maxElements);
}

@ -1,60 +0,0 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import java.util.List;
import org.junit.Test;
public class BatchingProcessorTest {
private final Pipe<Integer> inputPipe = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<List<Integer>> outputPipe = new Pipe<>(10, NO_OP_COUNTER);
private final BatchingProcessor<Integer> stage = new BatchingProcessor<>(3);
@Test
public void shouldCreateBatches() {
for (int i = 1; i <= 8; i++) {
inputPipe.put(i);
}
inputPipe.close();
stage.processNextInput(inputPipe, outputPipe);
assertThat(outputPipe.poll()).isEqualTo(asList(1, 2, 3));
assertThat(outputPipe.poll()).isNull();
stage.processNextInput(inputPipe, outputPipe);
assertThat(outputPipe.poll()).isEqualTo(asList(4, 5, 6));
assertThat(outputPipe.poll()).isNull();
stage.processNextInput(inputPipe, outputPipe);
assertThat(outputPipe.poll()).isEqualTo(asList(7, 8));
assertThat(outputPipe.poll()).isNull();
}
@Test
public void shouldNotOutputItemWhenInputIsClosed() {
@SuppressWarnings("unchecked")
final WritePipe<List<Integer>> outputPipe = mock(WritePipe.class);
inputPipe.close();
stage.processNextInput(inputPipe, outputPipe);
verifyZeroInteractions(outputPipe);
}
}

@ -0,0 +1,134 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
public class BatchingReadPipeTest {
private final Pipe<String> source = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Counter batchCounter = mock(Counter.class);
private final BatchingReadPipe<String> batchingPipe =
new BatchingReadPipe<>(source, 3, batchCounter);
@Test
public void shouldGetABatchOfAvailableItems() {
source.put("a");
source.put("b");
source.put("c");
source.put("d");
assertThat(batchingPipe.get()).containsExactly("a", "b", "c");
}
@Test
public void shouldNotWaitToFillBatch() {
source.put("a");
assertThat(batchingPipe.get()).containsExactly("a");
}
@Test
public void shouldPollForNextBatchWhenAvailable() {
source.put("a");
assertThat(batchingPipe.poll()).containsExactly("a");
}
@Test
public void shouldReturnNullFromPollWhenNoItemsAvailable() {
assertThat(batchingPipe.poll()).isNull();
}
@Test
public void shouldHaveMoreItemsWhileSourceHasMoreItems() {
assertThat(batchingPipe.hasMore()).isTrue();
source.put("a");
source.put("b");
source.put("c");
assertThat(batchingPipe.hasMore()).isTrue();
source.close();
assertThat(batchingPipe.hasMore()).isTrue();
assertThat(batchingPipe.get()).containsExactly("a", "b", "c");
assertThat(batchingPipe.hasMore()).isFalse();
assertThat(batchingPipe.get()).isNull();
assertThat(batchingPipe.poll()).isNull();
}
@Test
public void shouldAddAtMostOneItemWhenDraining() {
// Collecting a batch of batches is pretty silly so only ever drain one batch at a time.
source.put("a");
source.put("b");
source.put("c");
source.put("1");
source.put("2");
source.put("3");
final List<List<String>> output = new ArrayList<>();
batchingPipe.drainTo(output, 6);
// Note still only 3 items in the batch.
assertThat(output).containsExactly(asList("a", "b", "c"));
}
@Test
public void shouldCountBatchesReturnedFromGet() {
source.put("a");
source.close();
batchingPipe.get();
assertThat(batchingPipe.get()).isNull();
verify(batchCounter, times(1)).inc();
}
@Test
public void shouldCountBatchesReturnedFromPoll() {
assertThat(batchingPipe.poll()).isNull();
verifyZeroInteractions(batchCounter);
source.put("a");
batchingPipe.poll();
verify(batchCounter, times(1)).inc();
}
@Test
public void shouldCountBatchesReturnedFromDrainTo() {
final List<List<String>> output = new ArrayList<>();
batchingPipe.drainTo(output, 3);
verifyZeroInteractions(batchCounter);
source.put("a");
batchingPipe.drainTo(output, 3);
verify(batchCounter, times(1)).inc();
}
}

@ -13,13 +13,8 @@
package tech.pegasys.pantheon.services.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.ArrayList;
import java.util.List;
@ -27,11 +22,9 @@ import org.junit.Test;
public class CompleterStageTest {
private final Pipe<String> pipe = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> pipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final List<String> output = new ArrayList<>();
private final Counter outputCounter = mock(Counter.class);
private final CompleterStage<String> stage =
new CompleterStage<>("name", pipe, output::add, outputCounter);
private final CompleterStage<String> stage = new CompleterStage<>("name", pipe, output::add);
@Test
public void shouldAddItemsToOutputUntilPipeHasNoMore() {
@ -43,6 +36,5 @@ public class CompleterStageTest {
stage.run();
assertThat(output).containsExactly("a", "b", "c");
verify(outputCounter, times(3)).inc();
}
}

@ -26,8 +26,8 @@ import org.junit.Test;
public class FlatMapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
@SuppressWarnings("unchecked")
private final Function<String, Stream<String>> mapper = mock(Function.class);

@ -20,7 +20,7 @@ import org.junit.Test;
public class IteratorSourceStageTest {
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final IteratorSourceStage<String> stage =
new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output);

@ -25,8 +25,8 @@ import org.junit.Test;
public class MapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
@SuppressWarnings("unchecked")
private final Function<String, String> processor = mock(Function.class);

@ -19,11 +19,15 @@ import static org.mockito.Mockito.verify;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
public class PipeTest {
private final Counter itemCounter = mock(Counter.class);
private final Pipe<String> pipe = new Pipe<>(5, itemCounter);
private final Counter inputCounter = mock(Counter.class);
private final Counter outputCounter = mock(Counter.class);
private final Pipe<String> pipe = new Pipe<>(5, inputCounter, outputCounter);
@Test
public void shouldNotHaveMoreWhenEmptyAndClosed() {
@ -52,19 +56,23 @@ public class PipeTest {
}
@Test
public void shouldLimitBatchMaximumSize() {
public void shouldLimitNumberOfItemsDrained() {
pipe.put("a");
pipe.put("b");
pipe.put("c");
pipe.put("d");
assertThat(pipe.getBatch(3)).containsExactly("a", "b", "c");
final List<String> output = new ArrayList<>();
pipe.drainTo(output, 3);
assertThat(output).containsExactly("a", "b", "c");
}
@Test
public void shouldNotWaitToReachMaximumSizeBeforeReturningBatch() {
pipe.put("a");
assertThat(pipe.getBatch(3)).containsExactly("a");
final List<String> output = new ArrayList<>();
pipe.drainTo(output, 3);
assertThat(output).containsExactly("a");
}
@Test
@ -74,10 +82,39 @@ public class PipeTest {
}
@Test
public void shouldIncrementCounterWhenItemAddedToPipe() {
public void shouldIncrementInputCounterWhenItemAddedToPipe() {
pipe.put("A");
verify(inputCounter).inc();
pipe.put("B");
verify(inputCounter, times(2)).inc();
}
@Test
public void shouldIncrementOutputCounterWhenItemRemovedToPipeWithPoll() {
pipe.put("A");
pipe.put("B");
pipe.poll();
verify(outputCounter).inc();
pipe.poll();
verify(outputCounter, times(2)).inc();
assertThat(pipe.poll()).isNull();
verify(outputCounter, times(2)).inc();
}
@Test
public void shouldIncrementOutputCounterWhenItemRemovedToPipeWithGet() {
pipe.put("A");
verify(itemCounter).inc();
pipe.put("B");
verify(itemCounter, times(2)).inc();
pipe.close();
pipe.get();
verify(outputCounter).inc();
pipe.get();
verify(outputCounter, times(2)).inc();
assertThat(pipe.get()).isNull();
verify(outputCounter, times(2)).inc();
}
}

@ -12,15 +12,15 @@
*/
package tech.pegasys.pantheon.services.pipeline;
import static com.google.common.primitives.Ints.asList;
import static java.util.Arrays.asList;
import static java.util.Collections.synchronizedList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.waitAtMost;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_COUNTER;
import tech.pegasys.pantheon.metrics.Counter;
@ -75,7 +75,7 @@ public class PipelineBuilderTest {
@Test
public void shouldPipeTasksFromSupplierToCompleter() throws Exception {
final List<Integer> output = new ArrayList<>();
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
@ -86,7 +86,7 @@ public class PipelineBuilderTest {
@Test
public void shouldPassInputThroughIntermediateStage() throws Exception {
final List<String> output = new ArrayList<>();
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess("toString", Object::toString)
.andFinishWith("end", output::add);
@ -100,14 +100,15 @@ public class PipelineBuilderTest {
@Test
public void shouldCombineIntoBatches() throws Exception {
final Pipe<Integer> input = new Pipe<>(20, NO_OP_COUNTER);
tasks.forEachRemaining(input::put);
final BlockingQueue<List<Integer>> output = new ArrayBlockingQueue<>(10);
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom(input, NO_OP_LABELLED_COUNTER)
.inBatches("batch", 6)
final Pipeline<Integer> pipeline =
PipelineBuilder.<Integer>createPipeline("source", 20, NO_OP_LABELLED_COUNTER)
.inBatches(6)
.andFinishWith("end", output::offer);
final Pipe<Integer> input = pipeline.getInputPipe();
tasks.forEachRemaining(input::put);
final CompletableFuture<?> result = pipeline.start(executorService);
assertThat(output.poll(10, SECONDS)).containsExactly(1, 2, 3, 4, 5, 6);
@ -131,7 +132,7 @@ public class PipelineBuilderTest {
@Test
public void shouldProcessAsync() throws Exception {
final List<String> output = new ArrayList<>();
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3)
.andFinishWith("end", output::add);
@ -146,7 +147,7 @@ public class PipelineBuilderTest {
public void shouldLimitInFlightProcessesWhenProcessingAsync() throws Exception {
final List<String> output = new ArrayList<>();
final List<CompletableFuture<String>> futures = new CopyOnWriteArrayList<>();
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom(
"input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_COUNTER)
.thenProcessAsync(
@ -184,7 +185,7 @@ public class PipelineBuilderTest {
@Test
public void shouldFlatMapItems() throws Exception {
final List<Integer> output = new ArrayList<>();
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20)
.andFinishWith("end", output::add);
@ -201,7 +202,7 @@ public class PipelineBuilderTest {
public void shouldProcessInParallel() throws Exception {
final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1);
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcessInParallel(
"stageName",
@ -236,7 +237,7 @@ public class PipelineBuilderTest {
public void shouldFlatMapInParallel() throws Exception {
final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1);
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenFlatMapInParallel(
"stageName",
@ -276,7 +277,7 @@ public class PipelineBuilderTest {
final AtomicBoolean processorInterrupted = new AtomicBoolean(false);
final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess(
"stageName",
@ -312,7 +313,7 @@ public class PipelineBuilderTest {
final AtomicBoolean processorInterrupted = new AtomicBoolean(false);
final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess(
"stageName",
@ -345,7 +346,7 @@ public class PipelineBuilderTest {
@Test
public void shouldAbortPipelineWhenProcessorThrowsException() {
final RuntimeException expectedError = new RuntimeException("Oops");
final Pipeline pipeline =
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess(
"stageName",
@ -365,32 +366,38 @@ public class PipelineBuilderTest {
}
@Test
public void shouldTrackTaskCountMetric() throws Exception {
public void shouldTrackTaskCountMetrics() throws Exception {
final Map<String, SimpleCounter> counters = new ConcurrentHashMap<>();
final LabelledMetric<Counter> labelledCounter =
labels -> counters.computeIfAbsent(labels[0], label -> new SimpleCounter());
final Pipeline pipeline =
labels ->
counters.computeIfAbsent(labels[0] + "-" + labels[1], label -> new SimpleCounter());
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter)
.thenProcess("map", Function.identity())
.thenProcessInParallel("parallel", Function.identity(), 3)
.thenProcessAsync("async", CompletableFuture::completedFuture, 3)
.inBatches("batch", 4)
.inBatches(4)
.thenFlatMap("flatMap", List::stream, 10)
.andFinishWith("finish", new ArrayList<>()::add);
pipeline.start(executorService).get(10, SECONDS);
assertThat(counters)
.containsOnlyKeys("input", "map", "parallel", "async", "batch", "flatMap", "finish");
assertThat(counters.get("input").count).hasValue(15);
assertThat(counters.get("map").count).hasValue(15);
assertThat(counters.get("parallel").count).hasValue(15);
assertThat(counters.get("async").count).hasValue(15);
assertThat(counters.get("flatMap").count).hasValue(15);
assertThat(counters.get("finish").count).hasValue(15);
// We don't know how many batches will be produced because it's timing dependent but it must
// be at least 4 to fit all the items and shouldn't be more than the items we put in.
assertThat(counters.get("batch").count).hasValueBetween(4, 15);
final List<String> stepNames = asList("input", "map", "parallel", "async", "flatMap");
final List<String> expectedMetricNames =
Stream.concat(
Stream.of("async_outputPipe-batches"),
stepNames.stream()
.map(stageName -> stageName + "_outputPipe")
.flatMap(
metricName -> Stream.of(metricName + "-added", metricName + "-removed")))
.collect(toList());
assertThat(counters).containsOnlyKeys(expectedMetricNames);
expectedMetricNames.stream()
.filter(name -> !name.endsWith("-batches"))
.forEach(metric -> assertThat(counters.get(metric).count).hasValue(15));
assertThat(counters.get("async_outputPipe-batches").count).hasValueBetween(4, 15);
}
private void waitForSize(final Collection<?> collection, final int targetSize) {

@ -30,8 +30,8 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ProcessingStageTest {
private final Pipe<String> inputPipe = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> outputPipe = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> inputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> outputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
@Mock private Processor<String, String> singleStep;
private ProcessingStage<String, String> stage;

Loading…
Cancel
Save