diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java index 2fc4f17505..36034cb6e5 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java @@ -41,7 +41,7 @@ public class LoadLocalDataStepTest { private final Task task = new StubTask(request); private final Pipe> completedTasks = - new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); private final LoadLocalDataStep loadLocalDataStep = new LoadLocalDataStep(worldStateStorage, new NoOpMetricsSystem()); diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java index 28caa7cf14..857c3302ea 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/Pipe.java @@ -40,14 +40,20 @@ public class Pipe implements ReadPipe, WritePipe { private final int capacity; private final Counter inputCounter; private final Counter outputCounter; + private final Counter abortedItemCounter; private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean aborted = new AtomicBoolean(); - public Pipe(final int capacity, final Counter inputCounter, final Counter outputCounter) { + public Pipe( + final int capacity, + final Counter inputCounter, + final Counter outputCounter, + final Counter abortedItemCounter) { queue = new ArrayBlockingQueue<>(capacity); this.capacity = capacity; this.inputCounter = inputCounter; this.outputCounter = outputCounter; + this.abortedItemCounter = abortedItemCounter; } @Override @@ -81,7 +87,9 @@ public class Pipe implements ReadPipe, WritePipe { @Override public void abort() { - aborted.set(true); + if (aborted.compareAndSet(false, true)) { + abortedItemCounter.inc(queue.size()); + } } @Override diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java index c33f239a7d..5ca3ac8d7f 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilder.java @@ -312,6 +312,7 @@ public class PipelineBuilder { return new Pipe<>( newBufferSize, outputCounter.labels(labelName, "added"), - outputCounter.labels(labelName, "removed")); + outputCounter.labels(labelName, "removed"), + outputCounter.labels(labelName, "aborted")); } } diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java index f4b6a7402c..42f28a0bea 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/BatchingReadPipeTest.java @@ -29,7 +29,7 @@ import org.junit.Test; public class BatchingReadPipeTest { - private final Pipe source = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe source = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); private final Counter batchCounter = mock(Counter.class); private final BatchingReadPipe batchingPipe = new BatchingReadPipe<>(source, 3, batchCounter); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java index 01007c14c8..7dbc6fc2d3 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/CompleterStageTest.java @@ -22,7 +22,7 @@ import org.junit.Test; public class CompleterStageTest { - private final Pipe pipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe pipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); private final List output = new ArrayList<>(); private final CompleterStage stage = new CompleterStage<>("name", pipe, output::add); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java index ab1de8f8dd..4001485dff 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/FlatMapProcessorTest.java @@ -26,8 +26,8 @@ import org.junit.Test; public class FlatMapProcessorTest { - private final Pipe input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); - private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); @SuppressWarnings("unchecked") private final Function> mapper = mock(Function.class); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java index c9a28a5edf..6177e2a18f 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/IteratorSourceStageTest.java @@ -20,7 +20,7 @@ import org.junit.Test; public class IteratorSourceStageTest { - private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); private final IteratorSourceStage stage = new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java index a8cedb1821..1f4ca84abf 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/MapProcessorTest.java @@ -25,8 +25,8 @@ import org.junit.Test; public class MapProcessorTest { - private final Pipe input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); - private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); @SuppressWarnings("unchecked") private final Function processor = mock(Function.class); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java index 48b24fa57d..ab4aa0e89b 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipeTest.java @@ -27,7 +27,8 @@ import org.junit.Test; public class PipeTest { private final Counter inputCounter = mock(Counter.class); private final Counter outputCounter = mock(Counter.class); - private final Pipe pipe = new Pipe<>(5, inputCounter, outputCounter); + private final Counter abortedItemCounter = mock(Counter.class); + private final Pipe pipe = new Pipe<>(5, inputCounter, outputCounter, abortedItemCounter); @Test public void shouldNotHaveMoreWhenEmptyAndClosed() { @@ -120,6 +121,15 @@ public class PipeTest { verify(outputCounter, times(2)).inc(); } + @Test + public void shouldIncrementAbortedItemCounterForItemsDiscardedDueToAborting() { + pipe.put("A"); + pipe.put("B"); + pipe.abort(); + + verify(abortedItemCounter).inc(2); + } + @Test public void shouldReturnNullFromGetImmediatelyIfThreadIsInterrupted() { Thread.currentThread().interrupt(); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java index fc7e49e9d1..1f3ff4c5d9 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java @@ -389,12 +389,16 @@ public class PipelineBuilderTest { stepNames.stream() .map(stageName -> stageName + "_outputPipe") .flatMap( - metricName -> Stream.of(metricName + "-added", metricName + "-removed"))) + metricName -> + Stream.of( + metricName + "-added", + metricName + "-removed", + metricName + "-aborted"))) .collect(toList()); assertThat(counters).containsOnlyKeys(expectedMetricNames); expectedMetricNames.stream() - .filter(name -> !name.endsWith("-batches")) + .filter(name -> !name.endsWith("-batches") && !name.endsWith("-aborted")) .forEach(metric -> assertThat(counters.get(metric).count).hasValue(15)); assertThat(counters.get("async_outputPipe-batches").count).hasValueBetween(4, 15); diff --git a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java index d89148db16..5ec880b3da 100644 --- a/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java +++ b/services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/ProcessingStageTest.java @@ -30,8 +30,10 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class ProcessingStageTest { - private final Pipe inputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); - private final Pipe outputPipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe inputPipe = + new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); + private final Pipe outputPipe = + new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER); @Mock private Processor singleStep; private ProcessingStage stage;