Add pipeline framework to make parallel processing simpler (#1077)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent d79e96640a
commit fde3a80f85
  1. 2
      metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java
  2. 41
      services/pipeline/build.gradle
  3. 98
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java
  4. 32
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java
  5. 48
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java
  6. 37
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessor.java
  7. 36
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java
  8. 32
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/MapProcessor.java
  9. 135
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java
  10. 140
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java
  11. 246
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java
  12. 36
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java
  13. 19
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Processor.java
  14. 61
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java
  15. 66
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipe.java
  16. 53
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/WritePipe.java
  17. 60
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java
  18. 48
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java
  19. 57
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java
  20. 42
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java
  21. 54
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java
  22. 83
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java
  23. 375
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java
  24. 77
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java
  25. 40
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipeTest.java
  26. 16
      services/pipeline/src/test/resources/log4j2.xml
  27. 1
      settings.gradle

@ -29,7 +29,7 @@ import io.prometheus.client.Collector;
public class NoOpMetricsSystem implements MetricsSystem {
private static final Counter NO_OP_COUNTER = new NoOpCounter();
public static final Counter NO_OP_COUNTER = new NoOpCounter();
private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0;
private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT;
public static final LabelledMetric<OperationTimer> NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER;

@ -0,0 +1,41 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
apply plugin: 'java-library'
jar {
baseName 'pantheon-pipeline'
manifest {
attributes(
'Specification-Title': baseName,
'Specification-Version': project.version,
'Implementation-Title': baseName,
'Implementation-Version': calculateVersion()
)
}
}
dependencies {
api project(':util')
implementation project(':metrics')
implementation 'org.apache.logging.log4j:log4j-api'
implementation 'com.google.guava:guava'
runtime 'org.apache.logging.log4j:log4j-core'
testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
testImplementation 'org.mockito:mockito-core'
}

@ -0,0 +1,98 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
class AsyncOperationProcessor<I, O> implements Processor<I, O> {
private static final Logger LOG = LogManager.getLogger();
private final Function<I, CompletableFuture<O>> processor;
private final Collection<CompletableFuture<O>> inProgress;
private final int maxConcurrency;
public AsyncOperationProcessor(
final Function<I, CompletableFuture<O>> processor, final int maxConcurrency) {
this.processor = processor;
this.maxConcurrency = maxConcurrency;
this.inProgress = new ArrayList<>(maxConcurrency);
}
@Override
public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe) {
if (inProgress.size() < maxConcurrency) {
final I value = inputPipe.get();
if (value != null) {
final CompletableFuture<O> future = processor.apply(value);
// When the future completes, interrupt so if we're waiting for new input we wake up and
// schedule the output.
final Thread stageThread = Thread.currentThread();
future.whenComplete((result, error) -> stageThread.interrupt());
inProgress.add(future);
}
outputCompletedTasks(0, outputPipe);
} else {
outputNextCompletedTask(outputPipe);
}
}
@Override
public void finalize(final WritePipe<O> outputPipe) {
while (!inProgress.isEmpty()) {
outputNextCompletedTask(outputPipe);
}
}
private void outputNextCompletedTask(final WritePipe<O> outputPipe) {
try {
waitForAnyFutureToComplete();
outputCompletedTasks(1, outputPipe);
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for processing to complete", e);
} catch (final ExecutionException e) {
LOG.error("Processing failed and we don't handle exceptions properly yet", e);
} catch (final TimeoutException e) {
// Ignore and go back around the loop.
}
}
@SuppressWarnings("rawtypes")
private void waitForAnyFutureToComplete()
throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0])).get(1, TimeUnit.SECONDS);
}
private void outputCompletedTasks(final int minTasksToOutput, final WritePipe<O> outputPipe) {
int outputTasks = 0;
for (final Iterator<CompletableFuture<O>> i = inProgress.iterator();
i.hasNext() && (outputTasks < minTasksToOutput || outputPipe.hasRemainingCapacity()); ) {
final CompletableFuture<O> process = i.next();
final O result = process.getNow(null);
if (result != null) {
outputPipe.put(result);
i.remove();
outputTasks++;
}
}
}
}

@ -0,0 +1,32 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.List;
class BatchingProcessor<T> implements Processor<T, List<T>> {
private final int maximumBatchSize;
public BatchingProcessor(final int maximumBatchSize) {
this.maximumBatchSize = maximumBatchSize;
}
@Override
public void processNextInput(final ReadPipe<T> inputPipe, final WritePipe<List<T>> outputPipe) {
final List<T> batch = inputPipe.getBatch(maximumBatchSize);
if (!batch.isEmpty()) {
outputPipe.put(batch);
}
}
}

@ -0,0 +1,48 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
class CompleterStage<T> implements Runnable {
private final ReadPipe<T> input;
private final Consumer<T> completer;
private final Counter outputCounter;
private final CompletableFuture<?> future = new CompletableFuture<>();
CompleterStage(
final ReadPipe<T> input, final Consumer<T> completer, final Counter outputCounter) {
this.input = input;
this.completer = completer;
this.outputCounter = outputCounter;
}
@Override
public void run() {
while (input.hasMore()) {
final T value = input.get();
if (value != null) {
completer.accept(value);
outputCounter.inc();
}
}
future.complete(null);
}
public CompletableFuture<?> getFuture() {
return future;
}
}

@ -0,0 +1,37 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Stream;
class FlatMapProcessor<I, O> implements Processor<I, O> {
private final Function<I, Stream<O>> mapper;
public FlatMapProcessor(final Function<I, Stream<O>> mapper) {
this.mapper = mapper;
}
@Override
public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe) {
final I value = inputPipe.get();
if (value != null) {
final Iterator<O> outputs = mapper.apply(value).iterator();
while (outputs.hasNext()) {
outputPipe.put(outputs.next());
}
}
}
}

@ -0,0 +1,36 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.Iterator;
class IteratorSourceStage<T> implements Runnable {
private final Iterator<T> source;
private final Pipe<T> pipe;
IteratorSourceStage(final Iterator<T> source, final Pipe<T> pipe) {
this.source = source;
this.pipe = pipe;
}
@Override
public void run() {
while (pipe.isOpen() && source.hasNext()) {
final T value = source.next();
if (value != null) {
pipe.put(value);
}
}
pipe.close();
}
}

@ -0,0 +1,32 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.function.Function;
class MapProcessor<I, O> implements Processor<I, O> {
private final Function<I, O> processor;
public MapProcessor(final Function<I, O> processor) {
this.processor = processor;
}
@Override
public void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe) {
final I value = inputPipe.get();
if (value != null) {
outputPipe.put(processor.apply(value));
}
}
}

@ -0,0 +1,135 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Forms the connection between two pipeline stages. A pipe is essentially a blocking queue with the
* added ability to signal when no further input is available because the pipe has been closed or
* the pipeline aborted.
*
* <p>In most cases a Pipe is used through one of two narrower interfaces it supports {@link
* ReadPipe} and {@link WritePipe}. These are designed to expose only the operations relevant to
* objects either reading from or publishing to the pipe respectively.
*
* @param <T> the type of item that flows through the pipe.
*/
public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
private static final Logger LOG = LogManager.getLogger();
private final BlockingQueue<T> queue;
private final int capacity;
private final Counter itemCounter;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
public Pipe(final int capacity, final Counter itemCounter) {
queue = new ArrayBlockingQueue<>(capacity);
this.capacity = capacity;
this.itemCounter = itemCounter;
}
@Override
public boolean isOpen() {
return !closed.get() && !aborted.get();
}
/**
* Get the number of items that can be queued inside this pipe.
*
* @return the pipe's capacity.
*/
public int getCapacity() {
return capacity;
}
@Override
public boolean hasRemainingCapacity() {
return queue.remainingCapacity() > 0 && isOpen();
}
@Override
public void close() {
closed.set(true);
}
@Override
public void abort() {
aborted.set(true);
}
@Override
public boolean hasMore() {
if (aborted.get()) {
return false;
}
return !closed.get() || !queue.isEmpty();
}
@Override
public T get() {
try {
while (hasMore()) {
final T value = queue.poll(1, TimeUnit.SECONDS);
if (value != null) {
return value;
}
}
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for next item", e);
}
return null;
}
@Override
public T poll() {
return queue.poll();
}
@Override
public List<T> getBatch(final int maximumBatchSize) {
final T nextItem = get();
if (nextItem == null) {
return Collections.emptyList();
}
final List<T> batch = new ArrayList<>();
batch.add(nextItem);
queue.drainTo(batch, maximumBatchSize - 1);
return batch;
}
@Override
public void put(final T value) {
while (isOpen()) {
try {
if (queue.offer(value, 1, TimeUnit.SECONDS)) {
itemCounter.inc();
return;
}
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting to add to output", e);
}
}
}
}

@ -0,0 +1,140 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static java.util.stream.Collectors.toList;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Pipeline {
private static final Logger LOG = LogManager.getLogger();
private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
/**
* Flags that the pipeline is being completed so that when we abort we can close the streams
* without the completion stage then marking the future successful before we finish the abort
* process and mark it as exceptionally completed. We can't just use synchronized because it winds
* up being the same thread coming in via a callback so already has the lock.
*/
private final AtomicBoolean completing = new AtomicBoolean(false);
private final CompletableFuture<Void> overallFuture = new CompletableFuture<>();
private volatile List<Future<?>> futures;
Pipeline(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
this.inputPipe = inputPipe;
this.stages = stages;
this.pipes = pipes;
this.completerStage = completerStage;
}
/**
* Starts execution of the pipeline. Each stage in the pipeline requires a dedicated thread from
* the supplied executor service.
*
* @param executorService the {@link ExecutorService} to execute each stage in.
* @return a future that will be completed when the pipeline completes. If the pipeline fails or
* is aborted the returned future will be completed exceptionally.
*/
public synchronized CompletableFuture<Void> start(final ExecutorService executorService) {
if (!started.compareAndSet(false, true)) {
return overallFuture;
}
futures =
Stream.concat(stages.stream(), Stream.of(completerStage))
.map(task -> runWithErrorHandling(executorService, task))
.collect(toList());
completerStage
.getFuture()
.whenComplete(
(result, error) -> {
if (completing.compareAndSet(false, true)) {
if (error != null) {
overallFuture.completeExceptionally(error);
} else {
overallFuture.complete(null);
}
}
});
overallFuture.exceptionally(
error -> {
if (ExceptionUtils.rootCause(error) instanceof CancellationException) {
abort();
}
return null;
});
return overallFuture;
}
/**
* Abort execution of this pipeline. The future returned by {@link #start(ExecutorService)} will
* be completed with a {@link CancellationException}.
*
* <p>A best effort is made to halt all processing by the pipeline immediately by interrupting
* each execution thread and pipes connecting each stage will no longer accept or provide further
* items.
*/
public void abort() {
final CancellationException exception = new CancellationException("Pipeline aborted");
abort(exception);
}
private Future<?> runWithErrorHandling(
final ExecutorService executorService, final Runnable task) {
return executorService.submit(
() -> {
try {
task.run();
} catch (final Throwable t) {
LOG.debug("Unhandled exception in pipeline. Aborting.", t);
try {
abort(t);
} catch (final Throwable t2) {
// Seems excessive but exceptions that propagate out of this method won't be logged
// because the executor just completes the future exceptionally and we never
// need to call get on it which would normally expose the error.
LOG.error("Failed to abort pipeline after error", t2);
}
}
});
}
private synchronized void abort(final Throwable error) {
if (completing.compareAndSet(false, true)) {
inputPipe.abort();
pipes.forEach(Pipe::abort);
futures.forEach(future -> future.cancel(true));
overallFuture.completeExceptionally(error);
}
}
}

@ -0,0 +1,246 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
/**
* Supports building a new pipeline. Pipelines are comprised of a source, various processing stages
* and a consumer, each of which run in their own thread.
*
* <p>The pipeline completes when all items from the source have passed through each stage and are
* received by the consumer. The pipeline will halt immediately if an exception is thrown from any
* processing stage.
*
* @param <T> the output type of the last stage in the pipeline.
*/
public class PipelineBuilder<T> {
private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Pipe<?>> pipes;
private final ReadPipe<T> pipeEnd;
private final int bufferSize;
private final LabelledMetric<Counter> outputCounter;
public PipelineBuilder(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Pipe<?>> pipes,
final ReadPipe<T> pipeEnd,
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
this.outputCounter = outputCounter;
checkArgument(!pipes.isEmpty(), "Must have at least one pipe in a pipeline");
this.inputPipe = inputPipe;
this.stages = stages;
this.pipes = pipes;
this.pipeEnd = pipeEnd;
this.bufferSize = bufferSize;
}
/**
* Create a new pipeline that processes inputs from <i>source</i>. The pipeline completes when
* <i>source</i> returns <code>false</code> from {@link Iterator#hasNext()} and the last item has
* been reached the end of the pipeline.
*
* @param sourceName the name of this stage. Used as the label for the output count metric.
* @param source the source to pull items from for processing.
* @param bufferSize the number of items to be buffered between each stage in the pipeline.
* @param outputCounter the counter to increment for each output of a stage. Must have a single
* label which will be filled with the stage name.
* @param <T> the type of items input into the pipeline.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T> createPipelineFrom(
final String sourceName,
final Iterator<T> source,
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
final Pipe<T> pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName));
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(source, pipe);
return new PipelineBuilder<>(
pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter);
}
/**
* Create a new pipeline that processes inputs added to <i>pipe</i>. The pipeline completes when
* <i>pipe</i> is closed and the last item has been reached the end of the pipeline.
*
* @param pipe the pipe feeding the pipeline.
* @param outputCounter the counter to increment for each output of a stage. Must have a single
* label which will be filled with the stage name.
* @param <T> the type of items input into the pipeline.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T> createPipelineFrom(
final Pipe<T> pipe, final LabelledMetric<Counter> outputCounter) {
return new PipelineBuilder<>(
pipe, emptyList(), singleton(pipe), pipe, pipe.getCapacity(), outputCounter);
}
/**
* Adds a 1-to-1 processing stage to the pipeline. A single thread processes each item in the
* pipeline with <i>processor</i> outputting its return value to the next stage.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @param <O> the output type for this processing step.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenProcess(
final String stageName, final Function<T, O> processor) {
final Processor<T, O> singleStepStage = new MapProcessor<>(processor);
return addStage(singleStepStage, stageName);
}
/**
* Adds a 1-to-1 processing stage to the pipeline. Multiple threads processes items in the
* pipeline concurrently with <i>processor</i> outputting its return value to the next stage.
*
* <p>Note: The order of items is not preserved.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @param numberOfThreads the number of threads to use for processing.
* @param <O> the output type for this processing step.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenProcessInParallel(
final String stageName, final Function<T, O> processor, final int numberOfThreads) {
final Pipe<O> newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName));
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Runnable> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
final Runnable processStage =
new ProcessingStage<>(pipeEnd, outputPipe, new MapProcessor<>(processor));
newStages.add(processStage);
}
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, bufferSize, outputCounter);
}
/**
* Adds a 1-to-1, asynchronous processing stage to the pipeline. A single thread reads items from
* the input and calls <i>processor</i> to begin processing. While a single thread is used to
* begin processing, up to <i>maxConcurrency</i> items may be in progress concurrently. When the
* returned {@link CompletableFuture} completes successfully the result is passed to the next
* stage.
*
* <p>If the returned {@link CompletableFuture} completes exceptionally the pipeline will abort.
*
* <p>Note: The order of items is not preserved.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @param maxConcurrency the maximum number of items being processed concurrently.
* @param <O> the output type for this processing step.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenProcessAsync(
final String stageName,
final Function<T, CompletableFuture<O>> processor,
final int maxConcurrency) {
return addStage(new AsyncOperationProcessor<>(processor, maxConcurrency), stageName);
}
/**
* Batches items into groups of at most <i>maximumBatchSize</i>. Batches are created eagerly to
* minimize delay so may not be full.
*
* <p>Order of items is preserved.
*
* <p>The output buffer size is reduced to <code>bufferSize / maximumBatchSize + 1</code>.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param maximumBatchSize the maximum number of items to include in a batch.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public PipelineBuilder<List<T>> inBatches(final String stageName, final int maximumBatchSize) {
checkArgument(maximumBatchSize > 0, "Maximum batch size must be greater than 0");
return addStage(
new BatchingProcessor<>(maximumBatchSize), bufferSize / maximumBatchSize + 1, stageName);
}
/**
* Adds a 1-to-many processing stage to the pipeline. For each item in the stream, <i>mapper</i>
* is called and each item of the {@link Stream} it returns is output as an individual item. The
* returned Stream may be empty to remove an item.
*
* <p>This can be used to reverse the effect of {@link #inBatches(String, int)} with:
*
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param mapper the function to process each item with.
* @param newBufferSize the output buffer size to use from this stage onwards.
* @param <O> the type of items to be output from this stage.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<O> thenFlatMap(
final String stageName, final Function<T, Stream<O>> mapper, final int newBufferSize) {
return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName);
}
/**
* End the pipeline with a {@link Consumer} that is the last stage of the pipeline.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param completer the {@link Consumer} that accepts the final output of the pipeline.
* @return the constructed pipeline ready to execute.
*/
public Pipeline andFinishWith(final String stageName, final Consumer<T> completer) {
return new Pipeline(
inputPipe,
stages,
pipes,
new CompleterStage<>(pipeEnd, completer, outputCounter.labels(stageName)));
}
private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final String stageName) {
return addStage(processor, bufferSize, stageName);
}
private <O> PipelineBuilder<O> addStage(
final Processor<T, O> processor, final int newBufferSize, final String stageName) {
final Pipe<O> outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName));
final Runnable processStage = new ProcessingStage<>(pipeEnd, outputPipe, processor);
return addStage(processStage, outputPipe);
}
private <O> PipelineBuilder<O> addStage(final Runnable stage, final Pipe<O> outputPipe) {
final List<Runnable> newStages = concat(stages, stage);
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter);
}
private <X> List<X> concat(final Collection<X> existing, final X newItem) {
final List<X> newList = new ArrayList<>(existing);
newList.add(newItem);
return newList;
}
}

@ -0,0 +1,36 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
class ProcessingStage<I, O> implements Runnable {
private final ReadPipe<I> inputPipe;
private final WritePipe<O> outputPipe;
private final Processor<I, O> processor;
public ProcessingStage(
final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe, final Processor<I, O> processor) {
this.inputPipe = inputPipe;
this.outputPipe = outputPipe;
this.processor = processor;
}
@Override
public void run() {
while (inputPipe.hasMore()) {
processor.processNextInput(inputPipe, outputPipe);
}
processor.finalize(outputPipe);
outputPipe.close();
}
}

@ -0,0 +1,19 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
interface Processor<I, O> {
void processNextInput(final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe);
default void finalize(final WritePipe<O> outputPipe) {}
}

@ -0,0 +1,61 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.List;
/**
* The interface used to read items from a pipe.
*
* @param <T> the type of input.
*/
public interface ReadPipe<T> {
/**
* Determines if this pipe has more items to be read. The pipe is considered to have no more items
* when it has either been aborted with {@link WritePipe#abort()} or if all queued items have been
* read and the pipe has been closed with {@link WritePipe#close()}.
*
* @return true if there are more items to process, otherwise false.
*/
boolean hasMore();
/**
* Get and remove the next item from this pipe. This method will block until the next item is
* available but may still return <code>null</code> if the pipe is closed or the thread
* interrupted while waiting.
*
* @return the next item or <code>null</code> if the pipe is closed or the thread interrupted.
*/
T get();
/**
* Get and remove the next item from this pipe without blocking if it is available.
*
* @return the next item or <code>null</code> if the pipe is empty.
*/
T poll();
/**
* Get a batch of values from the pipe containing at most <code>maximumBatchSize</code> items.
* This method will block until at least one item is available but will not wait until the batch
* is full.
*
* <p>An empty list will be returned if the queue is closed or the thread interrupted while
* waiting for the next value.
*
* @param maximumBatchSize the maximum number of items to read.
* @return the batch that was read.
*/
List<T> getBatch(int maximumBatchSize);
}

@ -0,0 +1,66 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A wrapper around an {@link WritePipe} which allows multiple stages to share the same write pipe.
* Most operations simply pass through to the underlying pipe but the underlying pipe is only closed
* when all stages have signalled this pipe should close.
*
* @param <T> the type of item in the pipe.
*/
class SharedWritePipe<T> implements WritePipe<T> {
private final WritePipe<T> delegate;
private final AtomicInteger remainingClosesRequired;
/**
* Creates a new SharedWritePipe.
*
* @param delegate the pipe to wrap.
* @param closesRequired the number of stages this output pipe will be shared with. The underlying
* pipe will only be closed when {@link #close()} is called this many times.
*/
public SharedWritePipe(final WritePipe<T> delegate, final int closesRequired) {
this.delegate = delegate;
this.remainingClosesRequired = new AtomicInteger(closesRequired);
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public void put(final T value) {
delegate.put(value);
}
@Override
public void close() {
if (remainingClosesRequired.decrementAndGet() == 0) {
delegate.close();
}
}
@Override
public void abort() {
delegate.abort();
}
@Override
public boolean hasRemainingCapacity() {
return delegate.hasRemainingCapacity();
}
}

@ -0,0 +1,53 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
/**
* The interface used to add items to a pipe.
*
* @param <T> the type of output.
*/
public interface WritePipe<T> {
/**
* Determine if this pipe is still open and accepting output.
*
* @return true if and only if the pipe is open.
*/
boolean isOpen();
/**
* Adds a new item to the pipe. This method will block until capacity is available in the pipe.
* The item will be discarded if the pipe is closed befoer capacity becomes available.
*
* @param value the value to add to the pipe.
*/
void put(T value);
/**
* Determine if this pipe has capacity to accept another item.
*
* @return true if the pipe has capacity to accept one more item.
*/
boolean hasRemainingCapacity();
/**
* Close this write pipe indicating that no further data will be published to it. When reading
* from the other end of this pipe {@link ReadPipe#hasMore()} will continue to return true until
* all the already queued data has been drained.
*/
void close();
/** Abort this pipe. The pipe is closed and any queued data is discarded. */
void abort();
}

@ -0,0 +1,60 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import java.util.List;
import org.junit.Test;
public class BatchingProcessorTest {
private final Pipe<Integer> inputPipe = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<List<Integer>> outputPipe = new Pipe<>(10, NO_OP_COUNTER);
private final BatchingProcessor<Integer> stage = new BatchingProcessor<>(3);
@Test
public void shouldCreateBatches() {
for (int i = 1; i <= 8; i++) {
inputPipe.put(i);
}
inputPipe.close();
stage.processNextInput(inputPipe, outputPipe);
assertThat(outputPipe.poll()).isEqualTo(asList(1, 2, 3));
assertThat(outputPipe.poll()).isNull();
stage.processNextInput(inputPipe, outputPipe);
assertThat(outputPipe.poll()).isEqualTo(asList(4, 5, 6));
assertThat(outputPipe.poll()).isNull();
stage.processNextInput(inputPipe, outputPipe);
assertThat(outputPipe.poll()).isEqualTo(asList(7, 8));
assertThat(outputPipe.poll()).isNull();
}
@Test
public void shouldNotOutputItemWhenInputIsClosed() {
@SuppressWarnings("unchecked")
final WritePipe<List<Integer>> outputPipe = mock(WritePipe.class);
inputPipe.close();
stage.processNextInput(inputPipe, outputPipe);
verifyZeroInteractions(outputPipe);
}
}

@ -0,0 +1,48 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import tech.pegasys.pantheon.metrics.Counter;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
public class CompleterStageTest {
private final Pipe<String> pipe = new Pipe<>(10, NO_OP_COUNTER);
private final List<String> output = new ArrayList<>();
private final Counter outputCounter = mock(Counter.class);
private final CompleterStage<String> stage =
new CompleterStage<>(pipe, output::add, outputCounter);
@Test
public void shouldAddItemsToOutputUntilPipeHasNoMore() {
pipe.put("a");
pipe.put("b");
pipe.put("c");
pipe.close();
stage.run();
assertThat(output).containsExactly("a", "b", "c");
verify(outputCounter, times(3)).inc();
}
}

@ -0,0 +1,57 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.Test;
public class FlatMapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);
@SuppressWarnings("unchecked")
private final Function<String, Stream<String>> mapper = mock(Function.class);
private final FlatMapProcessor<String, String> stage = new FlatMapProcessor<>(mapper);
@Test
public void shouldOutputItemsFromReturnedStreamIndividually() {
when(mapper.apply("A")).thenReturn(Stream.of("a", "b", "c"));
input.put("A");
stage.processNextInput(input, output);
assertThat(output.poll()).isEqualTo("a");
assertThat(output.poll()).isEqualTo("b");
assertThat(output.poll()).isEqualTo("c");
assertThat(output.poll()).isNull();
verify(mapper).apply("A");
}
@Test
public void shouldSkipProcessingWhenInputIsClosed() {
input.close();
stage.processNextInput(input, output);
verifyZeroInteractions(mapper);
}
}

@ -0,0 +1,42 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import com.google.common.collect.Iterators;
import org.junit.Test;
public class IteratorSourceStageTest {
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);
private final IteratorSourceStage<String> stage =
new IteratorSourceStage<>(Iterators.forArray("a", "b", "c", "d"), output);
@Test
public void shouldOutputEntriesThenClosePipe() {
stage.run();
assertThat(output.isOpen()).isFalse();
assertThat(output.hasMore()).isTrue();
assertThat(output.get()).isEqualTo("a");
assertThat(output.hasMore()).isTrue();
assertThat(output.get()).isEqualTo("b");
assertThat(output.hasMore()).isTrue();
assertThat(output.get()).isEqualTo("c");
assertThat(output.hasMore()).isTrue();
assertThat(output.get()).isEqualTo("d");
assertThat(output.hasMore()).isFalse();
}
}

@ -0,0 +1,54 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import java.util.function.Function;
import org.junit.Test;
public class MapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);
@SuppressWarnings("unchecked")
private final Function<String, String> processor = mock(Function.class);
private final MapProcessor<String, String> stage = new MapProcessor<>(processor);
@Test
public void shouldApplyFunctionToItems() {
when(processor.apply("A")).thenReturn("a");
input.put("A");
stage.processNextInput(input, output);
assertThat(output.hasMore()).isTrue();
assertThat(output.get()).isEqualTo("a");
verify(processor).apply("A");
}
@Test
public void shouldSkipProcessingWhenInputIsClosed() {
input.close();
stage.processNextInput(input, output);
verifyZeroInteractions(processor);
}
}

@ -0,0 +1,83 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import tech.pegasys.pantheon.metrics.Counter;
import org.junit.Test;
public class PipeTest {
private final Counter itemCounter = mock(Counter.class);
private final Pipe<String> pipe = new Pipe<>(5, itemCounter);
@Test
public void shouldNotHaveMoreWhenEmptyAndClosed() {
pipe.close();
assertThat(pipe.hasMore()).isFalse();
}
@Test
public void shouldHaveMoreWhenNotEmptyAndClosed() {
pipe.put("A");
pipe.close();
assertThat(pipe.hasMore()).isTrue();
pipe.get();
assertThat(pipe.hasMore()).isFalse();
}
@Test
public void shouldNotHaveMoreWhenAbortedEvenIfNotEmpty() {
pipe.put("A");
pipe.abort();
assertThat(pipe.hasMore()).isFalse();
}
@Test
public void shouldLimitBatchMaximumSize() {
pipe.put("a");
pipe.put("b");
pipe.put("c");
pipe.put("d");
assertThat(pipe.getBatch(3)).containsExactly("a", "b", "c");
}
@Test
public void shouldNotWaitToReachMaximumSizeBeforeReturningBatch() {
pipe.put("a");
assertThat(pipe.getBatch(3)).containsExactly("a");
}
@Test
public void shouldNotBeOpenAfterAbort() {
pipe.abort();
assertThat(pipe.isOpen()).isFalse();
}
@Test
public void shouldIncrementCounterWhenItemAddedToPipe() {
pipe.put("A");
verify(itemCounter).inc();
pipe.put("B");
verify(itemCounter, times(2)).inc();
}
}

@ -0,0 +1,375 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static com.google.common.primitives.Ints.asList;
import static java.util.Collections.synchronizedList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.waitAtMost;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_COUNTER;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.After;
import org.junit.Test;
public class PipelineBuilderTest {
private static final ThreadFactory THREAD_FACTORY =
new ThreadFactoryBuilder()
.setNameFormat(PipelineBuilderTest.class.getSimpleName() + "-%d")
.setDaemon(true)
.build();
private final Iterator<Integer> tasks =
asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15).iterator();
private final ExecutorService executorService = Executors.newCachedThreadPool(THREAD_FACTORY);
@After
public void afterClass() throws Exception {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, SECONDS)) {
fail("Executor service did not shut down cleanly");
}
}
@Test
public void shouldPipeTasksFromSupplierToCompleter() throws Exception {
final List<Integer> output = new ArrayList<>();
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
result.get(10, SECONDS);
assertThat(output).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
}
@Test
public void shouldPassInputThroughIntermediateStage() throws Exception {
final List<String> output = new ArrayList<>();
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess("toString", Object::toString)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
result.get(10, SECONDS);
assertThat(output)
.containsExactly(
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15");
}
@Test
public void shouldCombineIntoBatches() throws Exception {
final Pipe<Integer> input = new Pipe<>(20, NO_OP_COUNTER);
tasks.forEachRemaining(input::put);
final BlockingQueue<List<Integer>> output = new ArrayBlockingQueue<>(10);
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom(input, NO_OP_LABELLED_COUNTER)
.inBatches("batch", 6)
.andFinishWith("end", output::offer);
final CompletableFuture<?> result = pipeline.start(executorService);
assertThat(output.poll(10, SECONDS)).containsExactly(1, 2, 3, 4, 5, 6);
assertThat(output.poll(10, SECONDS)).containsExactly(7, 8, 9, 10, 11, 12);
assertThat(output.poll(10, SECONDS)).containsExactly(13, 14, 15);
assertThat(output).isEmpty();
assertThat(result).isNotDone();
// Should not wait to fill the batch.
input.put(16);
assertThat(output.poll(10, SECONDS)).containsExactly(16);
input.put(17);
assertThat(output.poll(10, SECONDS)).containsExactly(17);
input.close();
result.get(10, SECONDS);
assertThat(output).isEmpty();
}
@Test
public void shouldProcessAsync() throws Exception {
final List<String> output = new ArrayList<>();
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
result.get(10, SECONDS);
assertThat(output)
.containsExactlyInAnyOrder(
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15");
}
@Test
public void shouldLimitInFlightProcessesWhenProcessingAsync() throws Exception {
final List<String> output = new ArrayList<>();
final List<CompletableFuture<String>> futures = new CopyOnWriteArrayList<>();
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom(
"input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_COUNTER)
.thenProcessAsync(
"createFuture",
value -> {
final CompletableFuture<String> future = new CompletableFuture<>();
futures.add(future);
return future;
},
3)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
waitForSize(futures, 3);
assertThat(result).isNotDone();
// Completing one task should cause another to be started.
futures.get(1).complete("2");
waitForSize(futures, 4);
futures.get(0).complete("1");
futures.get(2).complete("3");
futures.get(3).complete("4");
waitForSize(futures, 7);
futures.get(4).complete("5");
futures.get(5).complete("6");
futures.get(6).complete("7");
result.get(10, SECONDS);
assertThat(output).containsExactly("2", "1", "3", "4", "5", "6", "7");
}
@Test
public void shouldFlatMapItems() throws Exception {
final List<Integer> output = new ArrayList<>();
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20)
.andFinishWith("end", output::add);
pipeline.start(executorService).get(10, SECONDS);
assertThat(output)
.containsExactly(
1, 2, 2, 4, 3, 6, 4, 8, 5, 10, 6, 12, 7, 14, 8, 16, 9, 18, 10, 20, 11, 22, 12, 24, 13,
26, 14, 28, 15, 30);
}
@Test
public void shouldProcessInParallel() throws Exception {
final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1);
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcessInParallel(
"stageName",
value -> {
if (value == 2) {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return value.toString();
},
2)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
// One thread will block but the other should process the remaining entries.
waitForSize(output, 14);
assertThat(result).isNotDone();
latch.countDown();
result.get(10, SECONDS);
assertThat(output)
.containsExactly(
"1", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "2");
}
@Test
public void shouldAbortPipeline() throws Exception {
final int allowProcessingUpTo = 5;
final AtomicBoolean processorInterrupted = new AtomicBoolean(false);
final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess(
"stageName",
value -> {
if (value > allowProcessingUpTo) {
try {
startedProcessingValueSix.countDown();
Thread.sleep(TimeUnit.MINUTES.toNanos(2));
} catch (final InterruptedException e) {
processorInterrupted.set(true);
}
}
return value;
})
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
startedProcessingValueSix.await(10, SECONDS);
waitForSize(output, allowProcessingUpTo);
pipeline.abort();
assertThatThrownBy(() -> result.get(10, SECONDS)).isInstanceOf(CancellationException.class);
assertThat(output).containsExactly(1, 2, 3, 4, 5);
waitAtMost(10, SECONDS).untilAsserted(() -> assertThat(processorInterrupted).isTrue());
}
@Test
public void shouldAbortPipelineWhenFutureIsCancelled() throws Exception {
final int allowProcessingUpTo = 5;
final AtomicBoolean processorInterrupted = new AtomicBoolean(false);
final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess(
"stageName",
value -> {
if (value > allowProcessingUpTo) {
try {
startedProcessingValueSix.countDown();
Thread.sleep(TimeUnit.MINUTES.toNanos(2));
} catch (final InterruptedException e) {
processorInterrupted.set(true);
}
}
return value;
})
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
startedProcessingValueSix.await(10, SECONDS);
waitForSize(output, allowProcessingUpTo);
result.cancel(false);
assertThatThrownBy(() -> result.get(10, SECONDS)).isInstanceOf(CancellationException.class);
assertThat(output).containsExactly(1, 2, 3, 4, 5);
waitAtMost(10, SECONDS).untilAsserted(() -> assertThat(processorInterrupted).isTrue());
}
@Test
public void shouldAbortPipelineWhenProcessorThrowsException() {
final RuntimeException expectedError = new RuntimeException("Oops");
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER)
.thenProcess(
"stageName",
(Function<Integer, Integer>)
value -> {
throw expectedError;
})
.andFinishWith("end", new ArrayList<Integer>()::add);
final CompletableFuture<?> result = pipeline.start(executorService);
assertThatThrownBy(() -> result.get(10, SECONDS))
.isInstanceOf(ExecutionException.class)
.hasRootCauseExactlyInstanceOf(RuntimeException.class)
.extracting(Throwable::getCause)
.isSameAs(expectedError);
}
@Test
public void shouldTrackTaskCountMetric() throws Exception {
final Map<String, SimpleCounter> counters = new ConcurrentHashMap<>();
final LabelledMetric<Counter> labelledCounter =
labels -> counters.computeIfAbsent(labels[0], label -> new SimpleCounter());
final Pipeline pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter)
.thenProcess("map", Function.identity())
.thenProcessInParallel("parallel", Function.identity(), 3)
.thenProcessAsync("async", CompletableFuture::completedFuture, 3)
.inBatches("batch", 4)
.thenFlatMap("flatMap", List::stream, 10)
.andFinishWith("finish", new ArrayList<>()::add);
pipeline.start(executorService).get(10, SECONDS);
assertThat(counters)
.containsOnlyKeys("input", "map", "parallel", "async", "batch", "flatMap", "finish");
assertThat(counters.get("input").count).hasValue(15);
assertThat(counters.get("map").count).hasValue(15);
assertThat(counters.get("parallel").count).hasValue(15);
assertThat(counters.get("async").count).hasValue(15);
assertThat(counters.get("flatMap").count).hasValue(15);
assertThat(counters.get("finish").count).hasValue(15);
// We don't know how many batches will be produced because it's timing dependent but it must
// be at least 4 to fit all the items and shouldn't be more than the items we put in.
assertThat(counters.get("batch").count).hasValueBetween(4, 15);
}
private void waitForSize(final Collection<?> collection, final int targetSize) {
waitAtMost(10, SECONDS).untilAsserted(() -> assertThat(collection).hasSize(targetSize));
}
private static class SimpleCounter implements Counter {
private final AtomicLong count = new AtomicLong(0);
@Override
public void inc() {
count.incrementAndGet();
}
@Override
public void inc(final long amount) {
count.addAndGet(amount);
}
}
}

@ -0,0 +1,77 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
import java.util.Locale;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ProcessingStageTest {
private final Pipe<String> inputPipe = new Pipe<>(10, NO_OP_COUNTER);
private final Pipe<String> outputPipe = new Pipe<>(10, NO_OP_COUNTER);
@Mock private Processor<String, String> singleStep;
private ProcessingStage<String, String> stage;
@Before
public void setUp() {
stage = new ProcessingStage<>(inputPipe, outputPipe, singleStep);
doAnswer(
invocation -> {
outputPipe.put(inputPipe.get().toLowerCase(Locale.UK));
return 1;
})
.when(singleStep)
.processNextInput(inputPipe, outputPipe);
}
@Test
public void shouldCallSingleStepStageForEachInput() {
inputPipe.put("A");
inputPipe.put("B");
inputPipe.put("C");
inputPipe.close();
stage.run();
assertThat(outputPipe.poll()).isEqualTo("a");
assertThat(outputPipe.poll()).isEqualTo("b");
assertThat(outputPipe.poll()).isEqualTo("c");
assertThat(outputPipe.poll()).isNull();
verify(singleStep, times(3)).processNextInput(inputPipe, outputPipe);
}
@Test
public void shouldFinalizeSingleStepStageAndCloseOutputPipeWhenInputCloses() {
inputPipe.close();
stage.run();
verify(singleStep).finalize(outputPipe);
verifyNoMoreInteractions(singleStep);
assertThat(outputPipe.isOpen()).isFalse();
}
}

@ -0,0 +1,40 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import org.junit.Test;
public class SharedWritePipeTest {
private static final int CLOSES_REQUIRED = 3;
@SuppressWarnings("unchecked")
private final WritePipe<String> delegate = mock(WritePipe.class);
private final SharedWritePipe<String> pipe = new SharedWritePipe<>(delegate, CLOSES_REQUIRED);
@Test
public void shouldOnlyCloseDelegatePipeWhenCloseCalledSpecifiedNumberOfTimes() {
for (int i = 0; i < CLOSES_REQUIRED - 1; i++) {
pipe.close();
verifyZeroInteractions(delegate);
}
pipe.close();
verify(delegate).close();
}
}

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Properties>
<Property name="root.log.level">INFO</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSSZZZ} | %t | %-5level | %c{1} | %msg%n" /> </Console>
</Appenders>
<Loggers>
<Root level="${sys:root.log.level}">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>

@ -35,6 +35,7 @@ include 'ethereum:trie'
include 'metrics'
include 'pantheon'
include 'services:kvstore'
include 'services:pipeline'
include 'services:queue'
include 'services:util'
include 'testutil'

Loading…
Cancel
Save