Add pipe stage name to thread while executing (#1097)

Makes it easier to identify when profiling and debugging.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 8b5c197883
commit 723f96fdbc
  1. 14
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/CompleterStage.java
  2. 11
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStage.java
  3. 12
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipeline.java
  4. 20
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java
  5. 14
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/ProcessingStage.java
  6. 17
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Stage.java
  7. 2
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java
  8. 2
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java
  9. 2
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java

@ -17,17 +17,22 @@ import tech.pegasys.pantheon.metrics.Counter;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
class CompleterStage<T> implements Runnable {
class CompleterStage<T> implements Stage {
private final ReadPipe<T> input;
private final Consumer<T> completer;
private final Counter outputCounter;
private final String name;
private final CompletableFuture<?> future = new CompletableFuture<>();
CompleterStage(
final ReadPipe<T> input, final Consumer<T> completer, final Counter outputCounter) {
final String name,
final ReadPipe<T> input,
final Consumer<T> completer,
final Counter outputCounter) {
this.input = input;
this.completer = completer;
this.outputCounter = outputCounter;
this.name = name;
}
@Override
@ -45,4 +50,9 @@ class CompleterStage<T> implements Runnable {
public CompletableFuture<?> getFuture() {
return future;
}
@Override
public String getName() {
return name;
}
}

@ -14,13 +14,15 @@ package tech.pegasys.pantheon.services.pipeline;
import java.util.Iterator;
class IteratorSourceStage<T> implements Runnable {
class IteratorSourceStage<T> implements Stage {
private final Iterator<T> source;
private final Pipe<T> pipe;
private final String name;
IteratorSourceStage(final Iterator<T> source, final Pipe<T> pipe) {
IteratorSourceStage(final String name, final Iterator<T> source, final Pipe<T> pipe) {
this.source = source;
this.pipe = pipe;
this.name = name;
}
@Override
@ -33,4 +35,9 @@ class IteratorSourceStage<T> implements Runnable {
}
pipe.close();
}
@Override
public String getName() {
return name;
}
}

@ -31,7 +31,7 @@ import org.apache.logging.log4j.Logger;
public class Pipeline {
private static final Logger LOG = LogManager.getLogger();
private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
@ -49,7 +49,7 @@ public class Pipeline {
Pipeline(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
this.inputPipe = inputPipe;
@ -109,11 +109,13 @@ public class Pipeline {
abort(exception);
}
private Future<?> runWithErrorHandling(
final ExecutorService executorService, final Runnable task) {
private Future<?> runWithErrorHandling(final ExecutorService executorService, final Stage task) {
return executorService.submit(
() -> {
final Thread thread = Thread.currentThread();
final String originalName = thread.getName();
try {
thread.setName(originalName + " (" + task.getName() + ")");
task.run();
} catch (final Throwable t) {
LOG.debug("Unhandled exception in pipeline. Aborting.", t);
@ -125,6 +127,8 @@ public class Pipeline {
// need to call get on it which would normally expose the error.
LOG.error("Failed to abort pipeline after error", t2);
}
} finally {
thread.setName(originalName);
}
});
}

@ -41,7 +41,7 @@ import java.util.stream.Stream;
public class PipelineBuilder<T> {
private final Pipe<?> inputPipe;
private final Collection<Runnable> stages;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final ReadPipe<T> pipeEnd;
private final int bufferSize;
@ -49,7 +49,7 @@ public class PipelineBuilder<T> {
public PipelineBuilder(
final Pipe<?> inputPipe,
final Collection<Runnable> stages,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final ReadPipe<T> pipeEnd,
final int bufferSize,
@ -82,7 +82,7 @@ public class PipelineBuilder<T> {
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
final Pipe<T> pipe = new Pipe<>(bufferSize, outputCounter.labels(sourceName));
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(source, pipe);
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(sourceName, source, pipe);
return new PipelineBuilder<>(
pipe, singleton(sourceStage), singleton(pipe), pipe, bufferSize, outputCounter);
}
@ -134,10 +134,10 @@ public class PipelineBuilder<T> {
final String stageName, final Function<T, O> processor, final int numberOfThreads) {
final Pipe<O> newPipeEnd = new Pipe<>(bufferSize, outputCounter.labels(stageName));
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Runnable> newStages = new ArrayList<>(stages);
final ArrayList<Stage> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
final Runnable processStage =
new ProcessingStage<>(pipeEnd, outputPipe, new MapProcessor<>(processor));
final Stage processStage =
new ProcessingStage<>(stageName, pipeEnd, outputPipe, new MapProcessor<>(processor));
newStages.add(processStage);
}
return new PipelineBuilder<>(
@ -218,7 +218,7 @@ public class PipelineBuilder<T> {
inputPipe,
stages,
pipes,
new CompleterStage<>(pipeEnd, completer, outputCounter.labels(stageName)));
new CompleterStage<>(stageName, pipeEnd, completer, outputCounter.labels(stageName)));
}
private <O> PipelineBuilder<O> addStage(final Processor<T, O> processor, final String stageName) {
@ -228,12 +228,12 @@ public class PipelineBuilder<T> {
private <O> PipelineBuilder<O> addStage(
final Processor<T, O> processor, final int newBufferSize, final String stageName) {
final Pipe<O> outputPipe = new Pipe<>(newBufferSize, outputCounter.labels(stageName));
final Runnable processStage = new ProcessingStage<>(pipeEnd, outputPipe, processor);
final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor);
return addStage(processStage, outputPipe);
}
private <O> PipelineBuilder<O> addStage(final Runnable stage, final Pipe<O> outputPipe) {
final List<Runnable> newStages = concat(stages, stage);
private <O> PipelineBuilder<O> addStage(final Stage stage, final Pipe<O> outputPipe) {
final List<Stage> newStages = concat(stages, stage);
return new PipelineBuilder<>(
inputPipe, newStages, concat(pipes, outputPipe), outputPipe, bufferSize, outputCounter);
}

@ -12,14 +12,19 @@
*/
package tech.pegasys.pantheon.services.pipeline;
class ProcessingStage<I, O> implements Runnable {
class ProcessingStage<I, O> implements Stage {
private final String name;
private final ReadPipe<I> inputPipe;
private final WritePipe<O> outputPipe;
private final Processor<I, O> processor;
public ProcessingStage(
final ReadPipe<I> inputPipe, final WritePipe<O> outputPipe, final Processor<I, O> processor) {
final String name,
final ReadPipe<I> inputPipe,
final WritePipe<O> outputPipe,
final Processor<I, O> processor) {
this.name = name;
this.inputPipe = inputPipe;
this.outputPipe = outputPipe;
this.processor = processor;
@ -33,4 +38,9 @@ class ProcessingStage<I, O> implements Runnable {
processor.finalize(outputPipe);
outputPipe.close();
}
@Override
public String getName() {
return name;
}
}

@ -0,0 +1,17 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.pipeline;
public interface Stage extends Runnable {
String getName();
}

@ -31,7 +31,7 @@ public class CompleterStageTest {
private final List<String> output = new ArrayList<>();
private final Counter outputCounter = mock(Counter.class);
private final CompleterStage<String> stage =
new CompleterStage<>(pipe, output::add, outputCounter);
new CompleterStage<>("name", pipe, output::add, outputCounter);
@Test
public void shouldAddItemsToOutputUntilPipeHasNoMore() {

@ -23,7 +23,7 @@ public class IteratorSourceStageTest {
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER);
private final IteratorSourceStage<String> stage =
new IteratorSourceStage<>(Iterators.forArray("a", "b", "c", "d"), output);
new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output);
@Test
public void shouldOutputEntriesThenClosePipe() {

@ -37,7 +37,7 @@ public class ProcessingStageTest {
@Before
public void setUp() {
stage = new ProcessingStage<>(inputPipe, outputPipe, singleStep);
stage = new ProcessingStage<>("name", inputPipe, outputPipe, singleStep);
doAnswer(
invocation -> {
outputPipe.put(inputPipe.get().toLowerCase(Locale.UK));

Loading…
Cancel
Save