Count the number of items discarded when a pipe is aborted (#1208)

Allows monitoring to accurately calculate the number of items in a pipe even after it has aborted.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 26a851e436
commit 56347981c6
  1. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java
  2. 12
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java
  3. 3
      services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java
  4. 2
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java
  5. 2
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java
  6. 4
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java
  7. 2
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java
  8. 4
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java
  9. 12
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java
  10. 8
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java
  11. 6
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java

@ -41,7 +41,7 @@ public class LoadLocalDataStepTest {
private final Task<NodeDataRequest> task = new StubTask(request);
private final Pipe<Task<NodeDataRequest>> completedTasks =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final LoadLocalDataStep loadLocalDataStep =
new LoadLocalDataStep(worldStateStorage, new NoOpMetricsSystem());

@ -40,14 +40,20 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
private final int capacity;
private final Counter inputCounter;
private final Counter outputCounter;
private final Counter abortedItemCounter;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
public Pipe(final int capacity, final Counter inputCounter, final Counter outputCounter) {
public Pipe(
final int capacity,
final Counter inputCounter,
final Counter outputCounter,
final Counter abortedItemCounter) {
queue = new ArrayBlockingQueue<>(capacity);
this.capacity = capacity;
this.inputCounter = inputCounter;
this.outputCounter = outputCounter;
this.abortedItemCounter = abortedItemCounter;
}
@Override
@ -81,7 +87,9 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
@Override
public void abort() {
aborted.set(true);
if (aborted.compareAndSet(false, true)) {
abortedItemCounter.inc(queue.size());
}
}
@Override

@ -312,6 +312,7 @@ public class PipelineBuilder<I, T> {
return new Pipe<>(
newBufferSize,
outputCounter.labels(labelName, "added"),
outputCounter.labels(labelName, "removed"));
outputCounter.labels(labelName, "removed"),
outputCounter.labels(labelName, "aborted"));
}
}

@ -29,7 +29,7 @@ import org.junit.Test;
public class BatchingReadPipeTest {
private final Pipe<String> source = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> source = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Counter batchCounter = mock(Counter.class);
private final BatchingReadPipe<String> batchingPipe =
new BatchingReadPipe<>(source, 3, batchCounter);

@ -22,7 +22,7 @@ import org.junit.Test;
public class CompleterStageTest {
private final Pipe<String> pipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> pipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final List<String> output = new ArrayList<>();
private final CompleterStage<String> stage = new CompleterStage<>("name", pipe, output::add);

@ -26,8 +26,8 @@ import org.junit.Test;
public class FlatMapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
@SuppressWarnings("unchecked")
private final Function<String, Stream<String>> mapper = mock(Function.class);

@ -20,7 +20,7 @@ import org.junit.Test;
public class IteratorSourceStageTest {
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final IteratorSourceStage<String> stage =
new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output);

@ -25,8 +25,8 @@ import org.junit.Test;
public class MapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
@SuppressWarnings("unchecked")
private final Function<String, String> processor = mock(Function.class);

@ -27,7 +27,8 @@ import org.junit.Test;
public class PipeTest {
private final Counter inputCounter = mock(Counter.class);
private final Counter outputCounter = mock(Counter.class);
private final Pipe<String> pipe = new Pipe<>(5, inputCounter, outputCounter);
private final Counter abortedItemCounter = mock(Counter.class);
private final Pipe<String> pipe = new Pipe<>(5, inputCounter, outputCounter, abortedItemCounter);
@Test
public void shouldNotHaveMoreWhenEmptyAndClosed() {
@ -120,6 +121,15 @@ public class PipeTest {
verify(outputCounter, times(2)).inc();
}
@Test
public void shouldIncrementAbortedItemCounterForItemsDiscardedDueToAborting() {
pipe.put("A");
pipe.put("B");
pipe.abort();
verify(abortedItemCounter).inc(2);
}
@Test
public void shouldReturnNullFromGetImmediatelyIfThreadIsInterrupted() {
Thread.currentThread().interrupt();

@ -389,12 +389,16 @@ public class PipelineBuilderTest {
stepNames.stream()
.map(stageName -> stageName + "_outputPipe")
.flatMap(
metricName -> Stream.of(metricName + "-added", metricName + "-removed")))
metricName ->
Stream.of(
metricName + "-added",
metricName + "-removed",
metricName + "-aborted")))
.collect(toList());
assertThat(counters).containsOnlyKeys(expectedMetricNames);
expectedMetricNames.stream()
.filter(name -> !name.endsWith("-batches"))
.filter(name -> !name.endsWith("-batches") && !name.endsWith("-aborted"))
.forEach(metric -> assertThat(counters.get(metric).count).hasValue(15));
assertThat(counters.get("async_outputPipe-batches").count).hasValueBetween(4, 15);

@ -30,8 +30,10 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ProcessingStageTest {
private final Pipe<String> inputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> outputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> inputPipe =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> outputPipe =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
@Mock private Processor<String, String> singleStep;
private ProcessingStage<String, String> stage;

Loading…
Cancel
Save