diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java index 52a965e2b1..3c1242fe71 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java @@ -29,7 +29,7 @@ import io.prometheus.client.Collector; public class NoOpMetricsSystem implements MetricsSystem { - private static final Counter NO_OP_COUNTER = new NoOpCounter(); + public static final Counter NO_OP_COUNTER = new NoOpCounter(); private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0; private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT; public static final LabelledMetric NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER; diff --git a/services/pipeline/build.gradle b/services/pipeline/build.gradle new file mode 100644 index 0000000000..f5f687727e --- /dev/null +++ b/services/pipeline/build.gradle @@ -0,0 +1,41 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +apply plugin: 'java-library' + +jar { + baseName 'pantheon-pipeline' + manifest { + attributes( + 'Specification-Title': baseName, + 'Specification-Version': project.version, + 'Implementation-Title': baseName, + 'Implementation-Version': calculateVersion() + ) + } +} + +dependencies { + api project(':util') + implementation project(':metrics') + + implementation 'org.apache.logging.log4j:log4j-api' + implementation 'com.google.guava:guava' + + runtime 'org.apache.logging.log4j:log4j-core' + + testImplementation 'junit:junit' + testImplementation 'org.assertj:assertj-core' + testImplementation 'org.awaitility:awaitility' + testImplementation 'org.mockito:mockito-core' +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java new file mode 100644 index 0000000000..f33341daf2 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +class AsyncOperationProcessor implements Processor { + private static final Logger LOG = LogManager.getLogger(); + private final Function> processor; + private final Collection> inProgress; + private final int maxConcurrency; + + public AsyncOperationProcessor( + final Function> processor, final int maxConcurrency) { + this.processor = processor; + this.maxConcurrency = maxConcurrency; + this.inProgress = new ArrayList<>(maxConcurrency); + } + + @Override + public void processNextInput(final ReadPipe inputPipe, final WritePipe outputPipe) { + if (inProgress.size() < maxConcurrency) { + final I value = inputPipe.get(); + if (value != null) { + final CompletableFuture future = processor.apply(value); + // When the future completes, interrupt so if we're waiting for new input we wake up and + // schedule the output. + final Thread stageThread = Thread.currentThread(); + future.whenComplete((result, error) -> stageThread.interrupt()); + inProgress.add(future); + } + + outputCompletedTasks(0, outputPipe); + } else { + outputNextCompletedTask(outputPipe); + } + } + + @Override + public void finalize(final WritePipe outputPipe) { + while (!inProgress.isEmpty()) { + outputNextCompletedTask(outputPipe); + } + } + + private void outputNextCompletedTask(final WritePipe outputPipe) { + try { + waitForAnyFutureToComplete(); + outputCompletedTasks(1, outputPipe); + } catch (final InterruptedException e) { + LOG.trace("Interrupted while waiting for processing to complete", e); + } catch (final ExecutionException e) { + LOG.error("Processing failed and we don't handle exceptions properly yet", e); + } catch (final TimeoutException e) { + // Ignore and go back around the loop. + } + } + + @SuppressWarnings("rawtypes") + private void waitForAnyFutureToComplete() + throws InterruptedException, ExecutionException, TimeoutException { + CompletableFuture.anyOf(inProgress.toArray(new CompletableFuture[0])).get(1, TimeUnit.SECONDS); + } + + private void outputCompletedTasks(final int minTasksToOutput, final WritePipe outputPipe) { + int outputTasks = 0; + for (final Iterator> i = inProgress.iterator(); + i.hasNext() && (outputTasks < minTasksToOutput || outputPipe.hasRemainingCapacity()); ) { + final CompletableFuture process = i.next(); + final O result = process.getNow(null); + if (result != null) { + outputPipe.put(result); + i.remove(); + outputTasks++; + } + } + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java new file mode 100644 index 0000000000..2d91cf4531 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessor.java @@ -0,0 +1,32 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import java.util.List; + +class BatchingProcessor implements Processor> { + + private final int maximumBatchSize; + + public BatchingProcessor(final int maximumBatchSize) { + this.maximumBatchSize = maximumBatchSize; + } + + @Override + public void processNextInput(final ReadPipe inputPipe, final WritePipe> outputPipe) { + final List batch = inputPipe.getBatch(maximumBatchSize); + if (!batch.isEmpty()) { + outputPipe.put(batch); + } + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java new file mode 100644 index 0000000000..4d83fcc058 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import tech.pegasys.pantheon.metrics.Counter; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +class CompleterStage implements Runnable { + private final ReadPipe input; + private final Consumer completer; + private final Counter outputCounter; + private final CompletableFuture future = new CompletableFuture<>(); + + CompleterStage( + final ReadPipe input, final Consumer completer, final Counter outputCounter) { + this.input = input; + this.completer = completer; + this.outputCounter = outputCounter; + } + + @Override + public void run() { + while (input.hasMore()) { + final T value = input.get(); + if (value != null) { + completer.accept(value); + outputCounter.inc(); + } + } + future.complete(null); + } + + public CompletableFuture getFuture() { + return future; + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessor.java new file mode 100644 index 0000000000..b791e7ecdd --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessor.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import java.util.Iterator; +import java.util.function.Function; +import java.util.stream.Stream; + +class FlatMapProcessor implements Processor { + + private final Function> mapper; + + public FlatMapProcessor(final Function> mapper) { + this.mapper = mapper; + } + + @Override + public void processNextInput(final ReadPipe inputPipe, final WritePipe outputPipe) { + final I value = inputPipe.get(); + if (value != null) { + final Iterator outputs = mapper.apply(value).iterator(); + while (outputs.hasNext()) { + outputPipe.put(outputs.next()); + } + } + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java new file mode 100644 index 0000000000..a393476e8b --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java @@ -0,0 +1,36 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import java.util.Iterator; + +class IteratorSourceStage implements Runnable { + private final Iterator source; + private final Pipe pipe; + + IteratorSourceStage(final Iterator source, final Pipe pipe) { + this.source = source; + this.pipe = pipe; + } + + @Override + public void run() { + while (pipe.isOpen() && source.hasNext()) { + final T value = source.next(); + if (value != null) { + pipe.put(value); + } + } + pipe.close(); + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/MapProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/MapProcessor.java new file mode 100644 index 0000000000..720d146473 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/MapProcessor.java @@ -0,0 +1,32 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import java.util.function.Function; + +class MapProcessor implements Processor { + + private final Function processor; + + public MapProcessor(final Function processor) { + this.processor = processor; + } + + @Override + public void processNextInput(final ReadPipe inputPipe, final WritePipe outputPipe) { + final I value = inputPipe.get(); + if (value != null) { + outputPipe.put(processor.apply(value)); + } + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java new file mode 100644 index 0000000000..44d94d9769 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java @@ -0,0 +1,135 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import tech.pegasys.pantheon.metrics.Counter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Forms the connection between two pipeline stages. A pipe is essentially a blocking queue with the + * added ability to signal when no further input is available because the pipe has been closed or + * the pipeline aborted. + * + *

In most cases a Pipe is used through one of two narrower interfaces it supports {@link + * ReadPipe} and {@link WritePipe}. These are designed to expose only the operations relevant to + * objects either reading from or publishing to the pipe respectively. + * + * @param the type of item that flows through the pipe. + */ +public class Pipe implements ReadPipe, WritePipe { + private static final Logger LOG = LogManager.getLogger(); + private final BlockingQueue queue; + private final int capacity; + private final Counter itemCounter; + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean aborted = new AtomicBoolean(); + + public Pipe(final int capacity, final Counter itemCounter) { + queue = new ArrayBlockingQueue<>(capacity); + this.capacity = capacity; + this.itemCounter = itemCounter; + } + + @Override + public boolean isOpen() { + return !closed.get() && !aborted.get(); + } + + /** + * Get the number of items that can be queued inside this pipe. + * + * @return the pipe's capacity. + */ + public int getCapacity() { + return capacity; + } + + @Override + public boolean hasRemainingCapacity() { + return queue.remainingCapacity() > 0 && isOpen(); + } + + @Override + public void close() { + closed.set(true); + } + + @Override + public void abort() { + aborted.set(true); + } + + @Override + public boolean hasMore() { + if (aborted.get()) { + return false; + } + return !closed.get() || !queue.isEmpty(); + } + + @Override + public T get() { + try { + while (hasMore()) { + final T value = queue.poll(1, TimeUnit.SECONDS); + if (value != null) { + return value; + } + } + } catch (final InterruptedException e) { + LOG.trace("Interrupted while waiting for next item", e); + } + return null; + } + + @Override + public T poll() { + return queue.poll(); + } + + @Override + public List getBatch(final int maximumBatchSize) { + final T nextItem = get(); + if (nextItem == null) { + return Collections.emptyList(); + } + final List batch = new ArrayList<>(); + batch.add(nextItem); + queue.drainTo(batch, maximumBatchSize - 1); + return batch; + } + + @Override + public void put(final T value) { + while (isOpen()) { + try { + if (queue.offer(value, 1, TimeUnit.SECONDS)) { + itemCounter.inc(); + return; + } + } catch (final InterruptedException e) { + LOG.trace("Interrupted while waiting to add to output", e); + } + } + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java new file mode 100644 index 0000000000..c4cc851474 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java @@ -0,0 +1,140 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static java.util.stream.Collectors.toList; + +import tech.pegasys.pantheon.util.ExceptionUtils; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class Pipeline { + private static final Logger LOG = LogManager.getLogger(); + private final Pipe inputPipe; + private final Collection stages; + private final Collection> pipes; + private final CompleterStage completerStage; + private final AtomicBoolean started = new AtomicBoolean(false); + + /** + * Flags that the pipeline is being completed so that when we abort we can close the streams + * without the completion stage then marking the future successful before we finish the abort + * process and mark it as exceptionally completed. We can't just use synchronized because it winds + * up being the same thread coming in via a callback so already has the lock. + */ + private final AtomicBoolean completing = new AtomicBoolean(false); + + private final CompletableFuture overallFuture = new CompletableFuture<>(); + private volatile List> futures; + + Pipeline( + final Pipe inputPipe, + final Collection stages, + final Collection> pipes, + final CompleterStage completerStage) { + this.inputPipe = inputPipe; + this.stages = stages; + this.pipes = pipes; + this.completerStage = completerStage; + } + + /** + * Starts execution of the pipeline. Each stage in the pipeline requires a dedicated thread from + * the supplied executor service. + * + * @param executorService the {@link ExecutorService} to execute each stage in. + * @return a future that will be completed when the pipeline completes. If the pipeline fails or + * is aborted the returned future will be completed exceptionally. + */ + public synchronized CompletableFuture start(final ExecutorService executorService) { + if (!started.compareAndSet(false, true)) { + return overallFuture; + } + futures = + Stream.concat(stages.stream(), Stream.of(completerStage)) + .map(task -> runWithErrorHandling(executorService, task)) + .collect(toList()); + completerStage + .getFuture() + .whenComplete( + (result, error) -> { + if (completing.compareAndSet(false, true)) { + if (error != null) { + overallFuture.completeExceptionally(error); + } else { + overallFuture.complete(null); + } + } + }); + overallFuture.exceptionally( + error -> { + if (ExceptionUtils.rootCause(error) instanceof CancellationException) { + abort(); + } + return null; + }); + return overallFuture; + } + + /** + * Abort execution of this pipeline. The future returned by {@link #start(ExecutorService)} will + * be completed with a {@link CancellationException}. + * + *

A best effort is made to halt all processing by the pipeline immediately by interrupting + * each execution thread and pipes connecting each stage will no longer accept or provide further + * items. + */ + public void abort() { + final CancellationException exception = new CancellationException("Pipeline aborted"); + abort(exception); + } + + private Future runWithErrorHandling( + final ExecutorService executorService, final Runnable task) { + return executorService.submit( + () -> { + try { + task.run(); + } catch (final Throwable t) { + LOG.debug("Unhandled exception in pipeline. Aborting.", t); + try { + abort(t); + } catch (final Throwable t2) { + // Seems excessive but exceptions that propagate out of this method won't be logged + // because the executor just completes the future exceptionally and we never + // need to call get on it which would normally expose the error. + LOG.error("Failed to abort pipeline after error", t2); + } + } + }); + } + + private synchronized void abort(final Throwable error) { + if (completing.compareAndSet(false, true)) { + inputPipe.abort(); + pipes.forEach(Pipe::abort); + futures.forEach(future -> future.cancel(true)); + overallFuture.completeExceptionally(error); + } + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java new file mode 100644 index 0000000000..666378b103 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java @@ -0,0 +1,246 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Collections.emptyList; +import static java.util.Collections.singleton; + +import tech.pegasys.pantheon.metrics.Counter; +import tech.pegasys.pantheon.metrics.LabelledMetric; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * Supports building a new pipeline. Pipelines are comprised of a source, various processing stages + * and a consumer, each of which run in their own thread. + * + *

The pipeline completes when all items from the source have passed through each stage and are + * received by the consumer. The pipeline will halt immediately if an exception is thrown from any + * processing stage. + * + * @param the output type of the last stage in the pipeline. + */ +public class PipelineBuilder { + + private final Pipe inputPipe; + private final Collection stages; + private final Collection> pipes; + private final ReadPipe pipeEnd; + private final int bufferSize; + private final LabelledMetric outputCounter; + + public PipelineBuilder( + final Pipe inputPipe, + final Collection stages, + final Collection> pipes, + final ReadPipe pipeEnd, + final int bufferSize, + final LabelledMetric outputCounter) { + this.outputCounter = outputCounter; + checkArgument(!pipes.isEmpty(), "Must have at least one pipe in a pipeline"); + this.inputPipe = inputPipe; + this.stages = stages; + this.pipes = pipes; + this.pipeEnd = pipeEnd; + this.bufferSize = bufferSize; + } + + /** + * Create a new pipeline that processes inputs from source. The pipeline completes when + * source returns false from {@link Iterator#hasNext()} and the last item has + * been reached the end of the pipeline. + * + * @param sourceName the name of this stage. Used as the label for the output count metric. + * @param source the source to pull items from for processing. + * @param bufferSize the number of items to be buffered between each stage in the pipeline. + * @param outputCounter the counter to increment for each output of a stage. Must have a single + * label which will be filled with the stage name. + * @param the type of items input into the pipeline. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public static PipelineBuilder createPipelineFrom( + final String sourceName, + final Iterator source, + final int bufferSize, + final LabelledMetric outputCounter) { + final Pipe pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName)); + final IteratorSourceStage sourceStage = new IteratorSourceStage<>(source, pipe); + return new PipelineBuilder<>( + pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter); + } + + /** + * Create a new pipeline that processes inputs added to pipe. The pipeline completes when + * pipe is closed and the last item has been reached the end of the pipeline. + * + * @param pipe the pipe feeding the pipeline. + * @param outputCounter the counter to increment for each output of a stage. Must have a single + * label which will be filled with the stage name. + * @param the type of items input into the pipeline. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public static PipelineBuilder createPipelineFrom( + final Pipe pipe, final LabelledMetric outputCounter) { + return new PipelineBuilder<>( + pipe, emptyList(), singleton(pipe), pipe, pipe.getCapacity(), outputCounter); + } + + /** + * Adds a 1-to-1 processing stage to the pipeline. A single thread processes each item in the + * pipeline with processor outputting its return value to the next stage. + * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param processor the processing to apply to each item. + * @param the output type for this processing step. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder thenProcess( + final String stageName, final Function processor) { + final Processor singleStepStage = new MapProcessor<>(processor); + return addStage(singleStepStage, stageName); + } + + /** + * Adds a 1-to-1 processing stage to the pipeline. Multiple threads processes items in the + * pipeline concurrently with processor outputting its return value to the next stage. + * + *

Note: The order of items is not preserved. + * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param processor the processing to apply to each item. + * @param numberOfThreads the number of threads to use for processing. + * @param the output type for this processing step. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder thenProcessInParallel( + final String stageName, final Function processor, final int numberOfThreads) { + final Pipe newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName)); + final WritePipe outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads); + final ArrayList newStages = new ArrayList<>(stages); + for (int i = 0; i < numberOfThreads; i++) { + final Runnable processStage = + new ProcessingStage<>(pipeEnd, outputPipe, new MapProcessor<>(processor)); + newStages.add(processStage); + } + return new PipelineBuilder<>( + inputPipe, newStages, concat(pipes, newPipeEnd), newPipeEnd, bufferSize, outputCounter); + } + + /** + * Adds a 1-to-1, asynchronous processing stage to the pipeline. A single thread reads items from + * the input and calls processor to begin processing. While a single thread is used to + * begin processing, up to maxConcurrency items may be in progress concurrently. When the + * returned {@link CompletableFuture} completes successfully the result is passed to the next + * stage. + * + *

If the returned {@link CompletableFuture} completes exceptionally the pipeline will abort. + * + *

Note: The order of items is not preserved. + * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param processor the processing to apply to each item. + * @param maxConcurrency the maximum number of items being processed concurrently. + * @param the output type for this processing step. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder thenProcessAsync( + final String stageName, + final Function> processor, + final int maxConcurrency) { + return addStage(new AsyncOperationProcessor<>(processor, maxConcurrency), stageName); + } + + /** + * Batches items into groups of at most maximumBatchSize. Batches are created eagerly to + * minimize delay so may not be full. + * + *

Order of items is preserved. + * + *

The output buffer size is reduced to bufferSize / maximumBatchSize + 1. + * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param maximumBatchSize the maximum number of items to include in a batch. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder> inBatches(final String stageName, final int maximumBatchSize) { + checkArgument(maximumBatchSize > 0, "Maximum batch size must be greater than 0"); + return addStage( + new BatchingProcessor<>(maximumBatchSize), bufferSize / maximumBatchSize + 1, stageName); + } + + /** + * Adds a 1-to-many processing stage to the pipeline. For each item in the stream, mapper + * is called and each item of the {@link Stream} it returns is output as an individual item. The + * returned Stream may be empty to remove an item. + * + *

This can be used to reverse the effect of {@link #inBatches(String, int)} with: + * + *

thenFlatMap(List::stream, newBufferSize)
+ * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param mapper the function to process each item with. + * @param newBufferSize the output buffer size to use from this stage onwards. + * @param the type of items to be output from this stage. + * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. + */ + public PipelineBuilder thenFlatMap( + final String stageName, final Function> mapper, final int newBufferSize) { + return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName); + } + + /** + * End the pipeline with a {@link Consumer} that is the last stage of the pipeline. + * + * @param stageName the name of this stage. Used as the label for the output count metric. + * @param completer the {@link Consumer} that accepts the final output of the pipeline. + * @return the constructed pipeline ready to execute. + */ + public Pipeline andFinishWith(final String stageName, final Consumer completer) { + return new Pipeline( + inputPipe, + stages, + pipes, + new CompleterStage<>(pipeEnd, completer, outputCounter.labels(stageName))); + } + + private PipelineBuilder addStage(final Processor processor, final String stageName) { + return addStage(processor, bufferSize, stageName); + } + + private PipelineBuilder addStage( + final Processor processor, final int newBufferSize, final String stageName) { + final Pipe outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName)); + final Runnable processStage = new ProcessingStage<>(pipeEnd, outputPipe, processor); + return addStage(processStage, outputPipe); + } + + private PipelineBuilder addStage(final Runnable stage, final Pipe outputPipe) { + final List newStages = concat(stages, stage); + return new PipelineBuilder<>( + inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter); + } + + private List concat(final Collection existing, final X newItem) { + final List newList = new ArrayList<>(existing); + newList.add(newItem); + return newList; + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java new file mode 100644 index 0000000000..1f73c7021f --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java @@ -0,0 +1,36 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +class ProcessingStage implements Runnable { + + private final ReadPipe inputPipe; + private final WritePipe outputPipe; + private final Processor processor; + + public ProcessingStage( + final ReadPipe inputPipe, final WritePipe outputPipe, final Processor processor) { + this.inputPipe = inputPipe; + this.outputPipe = outputPipe; + this.processor = processor; + } + + @Override + public void run() { + while (inputPipe.hasMore()) { + processor.processNextInput(inputPipe, outputPipe); + } + processor.finalize(outputPipe); + outputPipe.close(); + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Processor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Processor.java new file mode 100644 index 0000000000..ab515e3fc7 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Processor.java @@ -0,0 +1,19 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +interface Processor { + void processNextInput(final ReadPipe inputPipe, final WritePipe outputPipe); + + default void finalize(final WritePipe outputPipe) {} +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java new file mode 100644 index 0000000000..026d95f64f --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ReadPipe.java @@ -0,0 +1,61 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import java.util.List; + +/** + * The interface used to read items from a pipe. + * + * @param the type of input. + */ +public interface ReadPipe { + + /** + * Determines if this pipe has more items to be read. The pipe is considered to have no more items + * when it has either been aborted with {@link WritePipe#abort()} or if all queued items have been + * read and the pipe has been closed with {@link WritePipe#close()}. + * + * @return true if there are more items to process, otherwise false. + */ + boolean hasMore(); + + /** + * Get and remove the next item from this pipe. This method will block until the next item is + * available but may still return null if the pipe is closed or the thread + * interrupted while waiting. + * + * @return the next item or null if the pipe is closed or the thread interrupted. + */ + T get(); + + /** + * Get and remove the next item from this pipe without blocking if it is available. + * + * @return the next item or null if the pipe is empty. + */ + T poll(); + + /** + * Get a batch of values from the pipe containing at most maximumBatchSize items. + * This method will block until at least one item is available but will not wait until the batch + * is full. + * + *

An empty list will be returned if the queue is closed or the thread interrupted while + * waiting for the next value. + * + * @param maximumBatchSize the maximum number of items to read. + * @return the batch that was read. + */ + List getBatch(int maximumBatchSize); +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipe.java new file mode 100644 index 0000000000..bdc3b2436f --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipe.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A wrapper around an {@link WritePipe} which allows multiple stages to share the same write pipe. + * Most operations simply pass through to the underlying pipe but the underlying pipe is only closed + * when all stages have signalled this pipe should close. + * + * @param the type of item in the pipe. + */ +class SharedWritePipe implements WritePipe { + private final WritePipe delegate; + private final AtomicInteger remainingClosesRequired; + + /** + * Creates a new SharedWritePipe. + * + * @param delegate the pipe to wrap. + * @param closesRequired the number of stages this output pipe will be shared with. The underlying + * pipe will only be closed when {@link #close()} is called this many times. + */ + public SharedWritePipe(final WritePipe delegate, final int closesRequired) { + this.delegate = delegate; + this.remainingClosesRequired = new AtomicInteger(closesRequired); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public void put(final T value) { + delegate.put(value); + } + + @Override + public void close() { + if (remainingClosesRequired.decrementAndGet() == 0) { + delegate.close(); + } + } + + @Override + public void abort() { + delegate.abort(); + } + + @Override + public boolean hasRemainingCapacity() { + return delegate.hasRemainingCapacity(); + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/WritePipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/WritePipe.java new file mode 100644 index 0000000000..ea07a3c609 --- /dev/null +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/WritePipe.java @@ -0,0 +1,53 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +/** + * The interface used to add items to a pipe. + * + * @param the type of output. + */ +public interface WritePipe { + + /** + * Determine if this pipe is still open and accepting output. + * + * @return true if and only if the pipe is open. + */ + boolean isOpen(); + + /** + * Adds a new item to the pipe. This method will block until capacity is available in the pipe. + * The item will be discarded if the pipe is closed befoer capacity becomes available. + * + * @param value the value to add to the pipe. + */ + void put(T value); + + /** + * Determine if this pipe has capacity to accept another item. + * + * @return true if the pipe has capacity to accept one more item. + */ + boolean hasRemainingCapacity(); + + /** + * Close this write pipe indicating that no further data will be published to it. When reading + * from the other end of this pipe {@link ReadPipe#hasMore()} will continue to return true until + * all the already queued data has been drained. + */ + void close(); + + /** Abort this pipe. The pipe is closed and any queued data is discarded. */ + void abort(); +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java new file mode 100644 index 0000000000..21fb184c8a --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingProcessorTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; + +import java.util.List; + +import org.junit.Test; + +public class BatchingProcessorTest { + + private final Pipe inputPipe = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe> outputPipe = new Pipe<>(10, NO_OP_COUNTER); + private final BatchingProcessor stage = new BatchingProcessor<>(3); + + @Test + public void shouldCreateBatches() { + for (int i = 1; i <= 8; i++) { + inputPipe.put(i); + } + inputPipe.close(); + + stage.processNextInput(inputPipe, outputPipe); + + assertThat(outputPipe.poll()).isEqualTo(asList(1, 2, 3)); + assertThat(outputPipe.poll()).isNull(); + + stage.processNextInput(inputPipe, outputPipe); + assertThat(outputPipe.poll()).isEqualTo(asList(4, 5, 6)); + assertThat(outputPipe.poll()).isNull(); + + stage.processNextInput(inputPipe, outputPipe); + assertThat(outputPipe.poll()).isEqualTo(asList(7, 8)); + assertThat(outputPipe.poll()).isNull(); + } + + @Test + public void shouldNotOutputItemWhenInputIsClosed() { + @SuppressWarnings("unchecked") + final WritePipe> outputPipe = mock(WritePipe.class); + inputPipe.close(); + stage.processNextInput(inputPipe, outputPipe); + verifyZeroInteractions(outputPipe); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java new file mode 100644 index 0000000000..ddf9dcbe81 --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; + +import tech.pegasys.pantheon.metrics.Counter; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +public class CompleterStageTest { + + private final Pipe pipe = new Pipe<>(10, NO_OP_COUNTER); + private final List output = new ArrayList<>(); + private final Counter outputCounter = mock(Counter.class); + private final CompleterStage stage = + new CompleterStage<>(pipe, output::add, outputCounter); + + @Test + public void shouldAddItemsToOutputUntilPipeHasNoMore() { + pipe.put("a"); + pipe.put("b"); + pipe.put("c"); + pipe.close(); + + stage.run(); + + assertThat(output).containsExactly("a", "b", "c"); + verify(outputCounter, times(3)).inc(); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java new file mode 100644 index 0000000000..127f2ae30d --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; + +import java.util.function.Function; +import java.util.stream.Stream; + +import org.junit.Test; + +public class FlatMapProcessorTest { + + private final Pipe input = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER); + + @SuppressWarnings("unchecked") + private final Function> mapper = mock(Function.class); + + private final FlatMapProcessor stage = new FlatMapProcessor<>(mapper); + + @Test + public void shouldOutputItemsFromReturnedStreamIndividually() { + when(mapper.apply("A")).thenReturn(Stream.of("a", "b", "c")); + input.put("A"); + + stage.processNextInput(input, output); + + assertThat(output.poll()).isEqualTo("a"); + assertThat(output.poll()).isEqualTo("b"); + assertThat(output.poll()).isEqualTo("c"); + assertThat(output.poll()).isNull(); + verify(mapper).apply("A"); + } + + @Test + public void shouldSkipProcessingWhenInputIsClosed() { + input.close(); + stage.processNextInput(input, output); + verifyZeroInteractions(mapper); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java new file mode 100644 index 0000000000..202ad5d088 --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; + +import com.google.common.collect.Iterators; +import org.junit.Test; + +public class IteratorSourceStageTest { + + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER); + + private final IteratorSourceStage stage = + new IteratorSourceStage<>(Iterators.forArray("a", "b", "c", "d"), output); + + @Test + public void shouldOutputEntriesThenClosePipe() { + stage.run(); + assertThat(output.isOpen()).isFalse(); + assertThat(output.hasMore()).isTrue(); + assertThat(output.get()).isEqualTo("a"); + assertThat(output.hasMore()).isTrue(); + assertThat(output.get()).isEqualTo("b"); + assertThat(output.hasMore()).isTrue(); + assertThat(output.get()).isEqualTo("c"); + assertThat(output.hasMore()).isTrue(); + assertThat(output.get()).isEqualTo("d"); + assertThat(output.hasMore()).isFalse(); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java new file mode 100644 index 0000000000..3ccb84aef4 --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; + +import java.util.function.Function; + +import org.junit.Test; + +public class MapProcessorTest { + + private final Pipe input = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER); + + @SuppressWarnings("unchecked") + private final Function processor = mock(Function.class); + + private final MapProcessor stage = new MapProcessor<>(processor); + + @Test + public void shouldApplyFunctionToItems() { + when(processor.apply("A")).thenReturn("a"); + input.put("A"); + + stage.processNextInput(input, output); + + assertThat(output.hasMore()).isTrue(); + assertThat(output.get()).isEqualTo("a"); + verify(processor).apply("A"); + } + + @Test + public void shouldSkipProcessingWhenInputIsClosed() { + input.close(); + stage.processNextInput(input, output); + verifyZeroInteractions(processor); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java new file mode 100644 index 0000000000..bb163dd3fa --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import tech.pegasys.pantheon.metrics.Counter; + +import org.junit.Test; + +public class PipeTest { + private final Counter itemCounter = mock(Counter.class); + private final Pipe pipe = new Pipe<>(5, itemCounter); + + @Test + public void shouldNotHaveMoreWhenEmptyAndClosed() { + pipe.close(); + assertThat(pipe.hasMore()).isFalse(); + } + + @Test + public void shouldHaveMoreWhenNotEmptyAndClosed() { + pipe.put("A"); + pipe.close(); + + assertThat(pipe.hasMore()).isTrue(); + + pipe.get(); + + assertThat(pipe.hasMore()).isFalse(); + } + + @Test + public void shouldNotHaveMoreWhenAbortedEvenIfNotEmpty() { + pipe.put("A"); + pipe.abort(); + + assertThat(pipe.hasMore()).isFalse(); + } + + @Test + public void shouldLimitBatchMaximumSize() { + pipe.put("a"); + pipe.put("b"); + pipe.put("c"); + pipe.put("d"); + + assertThat(pipe.getBatch(3)).containsExactly("a", "b", "c"); + } + + @Test + public void shouldNotWaitToReachMaximumSizeBeforeReturningBatch() { + pipe.put("a"); + assertThat(pipe.getBatch(3)).containsExactly("a"); + } + + @Test + public void shouldNotBeOpenAfterAbort() { + pipe.abort(); + assertThat(pipe.isOpen()).isFalse(); + } + + @Test + public void shouldIncrementCounterWhenItemAddedToPipe() { + pipe.put("A"); + verify(itemCounter).inc(); + pipe.put("B"); + verify(itemCounter, times(2)).inc(); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java new file mode 100644 index 0000000000..c01ab867dd --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java @@ -0,0 +1,375 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static com.google.common.primitives.Ints.asList; +import static java.util.Collections.synchronizedList; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.waitAtMost; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_COUNTER; + +import tech.pegasys.pantheon.metrics.Counter; +import tech.pegasys.pantheon.metrics.LabelledMetric; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Stream; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.After; +import org.junit.Test; + +public class PipelineBuilderTest { + + private static final ThreadFactory THREAD_FACTORY = + new ThreadFactoryBuilder() + .setNameFormat(PipelineBuilderTest.class.getSimpleName() + "-%d") + .setDaemon(true) + .build(); + private final Iterator tasks = + asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15).iterator(); + + private final ExecutorService executorService = Executors.newCachedThreadPool(THREAD_FACTORY); + + @After + public void afterClass() throws Exception { + executorService.shutdownNow(); + if (!executorService.awaitTermination(10, SECONDS)) { + fail("Executor service did not shut down cleanly"); + } + } + + @Test + public void shouldPipeTasksFromSupplierToCompleter() throws Exception { + final List output = new ArrayList<>(); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .andFinishWith("end", output::add); + final CompletableFuture result = pipeline.start(executorService); + result.get(10, SECONDS); + assertThat(output).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15); + } + + @Test + public void shouldPassInputThroughIntermediateStage() throws Exception { + final List output = new ArrayList<>(); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenProcess("toString", Object::toString) + .andFinishWith("end", output::add); + + final CompletableFuture result = pipeline.start(executorService); + result.get(10, SECONDS); + assertThat(output) + .containsExactly( + "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15"); + } + + @Test + public void shouldCombineIntoBatches() throws Exception { + final Pipe input = new Pipe<>(20, NO_OP_COUNTER); + tasks.forEachRemaining(input::put); + final BlockingQueue> output = new ArrayBlockingQueue<>(10); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom(input, NO_OP_LABELLED_COUNTER) + .inBatches("batch", 6) + .andFinishWith("end", output::offer); + + final CompletableFuture result = pipeline.start(executorService); + + assertThat(output.poll(10, SECONDS)).containsExactly(1, 2, 3, 4, 5, 6); + assertThat(output.poll(10, SECONDS)).containsExactly(7, 8, 9, 10, 11, 12); + assertThat(output.poll(10, SECONDS)).containsExactly(13, 14, 15); + + assertThat(output).isEmpty(); + assertThat(result).isNotDone(); + + // Should not wait to fill the batch. + input.put(16); + assertThat(output.poll(10, SECONDS)).containsExactly(16); + input.put(17); + assertThat(output.poll(10, SECONDS)).containsExactly(17); + + input.close(); + result.get(10, SECONDS); + assertThat(output).isEmpty(); + } + + @Test + public void shouldProcessAsync() throws Exception { + final List output = new ArrayList<>(); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3) + .andFinishWith("end", output::add); + final CompletableFuture result = pipeline.start(executorService); + result.get(10, SECONDS); + assertThat(output) + .containsExactlyInAnyOrder( + "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15"); + } + + @Test + public void shouldLimitInFlightProcessesWhenProcessingAsync() throws Exception { + final List output = new ArrayList<>(); + final List> futures = new CopyOnWriteArrayList<>(); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom( + "input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_COUNTER) + .thenProcessAsync( + "createFuture", + value -> { + final CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + return future; + }, + 3) + .andFinishWith("end", output::add); + final CompletableFuture result = pipeline.start(executorService); + + waitForSize(futures, 3); + + assertThat(result).isNotDone(); + + // Completing one task should cause another to be started. + futures.get(1).complete("2"); + waitForSize(futures, 4); + + futures.get(0).complete("1"); + futures.get(2).complete("3"); + futures.get(3).complete("4"); + + waitForSize(futures, 7); + futures.get(4).complete("5"); + futures.get(5).complete("6"); + futures.get(6).complete("7"); + + result.get(10, SECONDS); + assertThat(output).containsExactly("2", "1", "3", "4", "5", "6", "7"); + } + + @Test + public void shouldFlatMapItems() throws Exception { + final List output = new ArrayList<>(); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20) + .andFinishWith("end", output::add); + + pipeline.start(executorService).get(10, SECONDS); + + assertThat(output) + .containsExactly( + 1, 2, 2, 4, 3, 6, 4, 8, 5, 10, 6, 12, 7, 14, 8, 16, 9, 18, 10, 20, 11, 22, 12, 24, 13, + 26, 14, 28, 15, 30); + } + + @Test + public void shouldProcessInParallel() throws Exception { + final List output = synchronizedList(new ArrayList<>()); + final CountDownLatch latch = new CountDownLatch(1); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenProcessInParallel( + "stageName", + value -> { + if (value == 2) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return value.toString(); + }, + 2) + .andFinishWith("end", output::add); + final CompletableFuture result = pipeline.start(executorService); + + // One thread will block but the other should process the remaining entries. + waitForSize(output, 14); + assertThat(result).isNotDone(); + + latch.countDown(); + + result.get(10, SECONDS); + + assertThat(output) + .containsExactly( + "1", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "2"); + } + + @Test + public void shouldAbortPipeline() throws Exception { + final int allowProcessingUpTo = 5; + final AtomicBoolean processorInterrupted = new AtomicBoolean(false); + final List output = synchronizedList(new ArrayList<>()); + final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenProcess( + "stageName", + value -> { + if (value > allowProcessingUpTo) { + try { + startedProcessingValueSix.countDown(); + Thread.sleep(TimeUnit.MINUTES.toNanos(2)); + } catch (final InterruptedException e) { + processorInterrupted.set(true); + } + } + return value; + }) + .andFinishWith("end", output::add); + + final CompletableFuture result = pipeline.start(executorService); + + startedProcessingValueSix.await(10, SECONDS); + waitForSize(output, allowProcessingUpTo); + + pipeline.abort(); + + assertThatThrownBy(() -> result.get(10, SECONDS)).isInstanceOf(CancellationException.class); + assertThat(output).containsExactly(1, 2, 3, 4, 5); + + waitAtMost(10, SECONDS).untilAsserted(() -> assertThat(processorInterrupted).isTrue()); + } + + @Test + public void shouldAbortPipelineWhenFutureIsCancelled() throws Exception { + final int allowProcessingUpTo = 5; + final AtomicBoolean processorInterrupted = new AtomicBoolean(false); + final List output = synchronizedList(new ArrayList<>()); + final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenProcess( + "stageName", + value -> { + if (value > allowProcessingUpTo) { + try { + startedProcessingValueSix.countDown(); + Thread.sleep(TimeUnit.MINUTES.toNanos(2)); + } catch (final InterruptedException e) { + processorInterrupted.set(true); + } + } + return value; + }) + .andFinishWith("end", output::add); + + final CompletableFuture result = pipeline.start(executorService); + + startedProcessingValueSix.await(10, SECONDS); + waitForSize(output, allowProcessingUpTo); + + result.cancel(false); + + assertThatThrownBy(() -> result.get(10, SECONDS)).isInstanceOf(CancellationException.class); + assertThat(output).containsExactly(1, 2, 3, 4, 5); + + waitAtMost(10, SECONDS).untilAsserted(() -> assertThat(processorInterrupted).isTrue()); + } + + @Test + public void shouldAbortPipelineWhenProcessorThrowsException() { + final RuntimeException expectedError = new RuntimeException("Oops"); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) + .thenProcess( + "stageName", + (Function) + value -> { + throw expectedError; + }) + .andFinishWith("end", new ArrayList()::add); + + final CompletableFuture result = pipeline.start(executorService); + + assertThatThrownBy(() -> result.get(10, SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasRootCauseExactlyInstanceOf(RuntimeException.class) + .extracting(Throwable::getCause) + .isSameAs(expectedError); + } + + @Test + public void shouldTrackTaskCountMetric() throws Exception { + final Map counters = new ConcurrentHashMap<>(); + final LabelledMetric labelledCounter = + labels -> counters.computeIfAbsent(labels[0], label -> new SimpleCounter()); + final Pipeline pipeline = + PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter) + .thenProcess("map", Function.identity()) + .thenProcessInParallel("parallel", Function.identity(), 3) + .thenProcessAsync("async", CompletableFuture::completedFuture, 3) + .inBatches("batch", 4) + .thenFlatMap("flatMap", List::stream, 10) + .andFinishWith("finish", new ArrayList<>()::add); + + pipeline.start(executorService).get(10, SECONDS); + + assertThat(counters) + .containsOnlyKeys("input", "map", "parallel", "async", "batch", "flatMap", "finish"); + assertThat(counters.get("input").count).hasValue(15); + assertThat(counters.get("map").count).hasValue(15); + assertThat(counters.get("parallel").count).hasValue(15); + assertThat(counters.get("async").count).hasValue(15); + assertThat(counters.get("flatMap").count).hasValue(15); + assertThat(counters.get("finish").count).hasValue(15); + // We don't know how many batches will be produced because it's timing dependent but it must + // be at least 4 to fit all the items and shouldn't be more than the items we put in. + assertThat(counters.get("batch").count).hasValueBetween(4, 15); + } + + private void waitForSize(final Collection collection, final int targetSize) { + waitAtMost(10, SECONDS).untilAsserted(() -> assertThat(collection).hasSize(targetSize)); + } + + private static class SimpleCounter implements Counter { + private final AtomicLong count = new AtomicLong(0); + + @Override + public void inc() { + count.incrementAndGet(); + } + + @Override + public void inc(final long amount) { + count.addAndGet(amount); + } + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java new file mode 100644 index 0000000000..01e1d838e6 --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; + +import java.util.Locale; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ProcessingStageTest { + + private final Pipe inputPipe = new Pipe<>(10, NO_OP_COUNTER); + private final Pipe outputPipe = new Pipe<>(10, NO_OP_COUNTER); + @Mock private Processor singleStep; + private ProcessingStage stage; + + @Before + public void setUp() { + stage = new ProcessingStage<>(inputPipe, outputPipe, singleStep); + doAnswer( + invocation -> { + outputPipe.put(inputPipe.get().toLowerCase(Locale.UK)); + return 1; + }) + .when(singleStep) + .processNextInput(inputPipe, outputPipe); + } + + @Test + public void shouldCallSingleStepStageForEachInput() { + inputPipe.put("A"); + inputPipe.put("B"); + inputPipe.put("C"); + inputPipe.close(); + + stage.run(); + + assertThat(outputPipe.poll()).isEqualTo("a"); + assertThat(outputPipe.poll()).isEqualTo("b"); + assertThat(outputPipe.poll()).isEqualTo("c"); + assertThat(outputPipe.poll()).isNull(); + + verify(singleStep, times(3)).processNextInput(inputPipe, outputPipe); + } + + @Test + public void shouldFinalizeSingleStepStageAndCloseOutputPipeWhenInputCloses() { + inputPipe.close(); + + stage.run(); + + verify(singleStep).finalize(outputPipe); + verifyNoMoreInteractions(singleStep); + assertThat(outputPipe.isOpen()).isFalse(); + } +} diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipeTest.java new file mode 100644 index 0000000000..eebd1d4756 --- /dev/null +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/SharedWritePipeTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.pipeline; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +import org.junit.Test; + +public class SharedWritePipeTest { + + private static final int CLOSES_REQUIRED = 3; + + @SuppressWarnings("unchecked") + private final WritePipe delegate = mock(WritePipe.class); + + private final SharedWritePipe pipe = new SharedWritePipe<>(delegate, CLOSES_REQUIRED); + + @Test + public void shouldOnlyCloseDelegatePipeWhenCloseCalledSpecifiedNumberOfTimes() { + for (int i = 0; i < CLOSES_REQUIRED - 1; i++) { + pipe.close(); + verifyZeroInteractions(delegate); + } + + pipe.close(); + verify(delegate).close(); + } +} diff --git a/services/pipeline/src/test/resources/log4j2.xml b/services/pipeline/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..f6fd6801df --- /dev/null +++ b/services/pipeline/src/test/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + INFO + + + + + + + + + + + + diff --git a/settings.gradle b/settings.gradle index d9818d185d..c690988d7a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -35,6 +35,7 @@ include 'ethereum:trie' include 'metrics' include 'pantheon' include 'services:kvstore' +include 'services:pipeline' include 'services:queue' include 'services:util' include 'testutil'