Fix metrics breakages (#1185)

* Number of metrics labels need to match up with constructor
* Number of labels must be consistant, so I split it into two metrics
* Also, naming best practices say that sum() and avg() of a metric
  should be meaningful, separating into two metrics fixes that.
* fix style issues (finals, intellij warnings)

* Change NoOpMetrics to check label count.

* Cascading changes to support this in many support classes.  Mostly places
we presumed all NoOpMetrics were equals.

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Danno Ferrin 6 years ago committed by GitHub
parent cdefb330be
commit 74e2b5a632
  1. 38
      ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/PendingTransactions.java
  2. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java
  3. 8
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncValidationPolicyTest.java
  4. 2
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java
  5. 2
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnectionTest.java
  6. 69
      metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java
  7. 4
      metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystem.java
  8. 71
      metrics/src/test/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystemTest.java
  9. 4
      metrics/src/test/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystemTest.java
  10. 24
      services/pipeline/src/test/java/tech/pegasys/pantheon/services/pipeline/PipelineBuilderTest.java

@ -24,6 +24,7 @@ import java.time.Clock;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -34,7 +35,6 @@ import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** /**
* Holds the current set of pending transactions with the ability to iterate them based on priority * Holds the current set of pending transactions with the ability to iterate them based on priority
@ -62,7 +62,7 @@ public class PendingTransactions {
private final int maxPendingTransactions; private final int maxPendingTransactions;
private final Clock clock; private final Clock clock;
private final LabelledMetric<Counter> transactionCounter; private final LabelledMetric<Counter> transactionRemovedCounter;
private final Counter localTransactionAddedCounter; private final Counter localTransactionAddedCounter;
private final Counter remoteTransactionAddedCounter; private final Counter remoteTransactionAddedCounter;
@ -70,37 +70,45 @@ public class PendingTransactions {
final int maxPendingTransactions, final Clock clock, final MetricsSystem metricsSystem) { final int maxPendingTransactions, final Clock clock, final MetricsSystem metricsSystem) {
this.maxPendingTransactions = maxPendingTransactions; this.maxPendingTransactions = maxPendingTransactions;
this.clock = clock; this.clock = clock;
transactionCounter = final LabelledMetric<Counter> transactionAddedCounter =
metricsSystem.createLabelledCounter( metricsSystem.createLabelledCounter(
MetricCategory.TRANSACTION_POOL, MetricCategory.TRANSACTION_POOL,
"transactions_total", "transactions_added_total",
"Count of transactions changed in the transaction pool", "Count of transactions added to the transaction pool",
"source"); "source");
localTransactionAddedCounter = transactionCounter.labels("local", "added"); localTransactionAddedCounter = transactionAddedCounter.labels("local");
remoteTransactionAddedCounter = transactionCounter.labels("remote", "added"); remoteTransactionAddedCounter = transactionAddedCounter.labels("remote");
transactionRemovedCounter =
metricsSystem.createLabelledCounter(
MetricCategory.TRANSACTION_POOL,
"transactions_removed_total",
"Count of transactions removed from the transaction pool",
"source",
"operation");
} }
public boolean addRemoteTransaction(final Transaction transaction) { public boolean addRemoteTransaction(final Transaction transaction) {
final TransactionInfo transactionInfo = final TransactionInfo transactionInfo =
new TransactionInfo(transaction, false, clock.instant()); new TransactionInfo(transaction, false, clock.instant());
boolean addTransaction = addTransaction(transactionInfo); final boolean addTransaction = addTransaction(transactionInfo);
remoteTransactionAddedCounter.inc(); remoteTransactionAddedCounter.inc();
return addTransaction; return addTransaction;
} }
boolean addLocalTransaction(final Transaction transaction) { boolean addLocalTransaction(final Transaction transaction) {
boolean addTransaction = final boolean addTransaction =
addTransaction(new TransactionInfo(transaction, true, clock.instant())); addTransaction(new TransactionInfo(transaction, true, clock.instant()));
localTransactionAddedCounter.inc(); localTransactionAddedCounter.inc();
return addTransaction; return addTransaction;
} }
public void removeTransaction(final Transaction transaction) { void removeTransaction(final Transaction transaction) {
doRemoveTransaction(transaction, false); doRemoveTransaction(transaction, false);
notifyTransactionDropped(transaction); notifyTransactionDropped(transaction);
} }
public void transactionAddedToBlock(final Transaction transaction) { void transactionAddedToBlock(final Transaction transaction) {
doRemoveTransaction(transaction, true); doRemoveTransaction(transaction, true);
} }
@ -127,7 +135,7 @@ public class PendingTransactions {
final boolean receivedFromLocalSource, final boolean addedToBlock) { final boolean receivedFromLocalSource, final boolean addedToBlock) {
final String location = receivedFromLocalSource ? "local" : "remote"; final String location = receivedFromLocalSource ? "local" : "remote";
final String operation = addedToBlock ? "addedToBlock" : "dropped"; final String operation = addedToBlock ? "addedToBlock" : "dropped";
transactionCounter.labels(location, "removed", operation).inc(); transactionRemovedCounter.labels(location, operation).inc();
} }
/* /*
@ -239,15 +247,15 @@ public class PendingTransactions {
public Set<TransactionInfo> getTransactionInfo() { public Set<TransactionInfo> getTransactionInfo() {
synchronized (pendingTransactions) { synchronized (pendingTransactions) {
return pendingTransactions.values().stream().collect(Collectors.toSet()); return new HashSet<>(pendingTransactions.values());
} }
} }
public void addTransactionListener(final PendingTransactionListener listener) { void addTransactionListener(final PendingTransactionListener listener) {
listeners.subscribe(listener); listeners.subscribe(listener);
} }
public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) { void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) {
transactionDroppedListeners.subscribe(listener); transactionDroppedListeners.subscribe(listener);
} }

@ -44,7 +44,7 @@ public abstract class AbstractEthTask<T> implements EthTask<T> {
final LabelledMetric<OperationTimer> ethTasksTimer = final LabelledMetric<OperationTimer> ethTasksTimer =
metricsSystem.createLabelledTimer( metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"); MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName");
if (ethTasksTimer == NoOpMetricsSystem.NO_OP_LABELLED_TIMER) { if (ethTasksTimer == NoOpMetricsSystem.NO_OP_LABELLED_1_OPERATION_TIMER) {
taskTimer = taskTimer =
() -> () ->
new OperationTimer.TimingContext() { new OperationTimer.TimingContext() {

@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail; import static org.assertj.core.api.Fail.fail;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.FULL; import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.FULL;
import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.LIGHT; import static tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode.LIGHT;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_COUNTER; import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode; import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
@ -26,21 +26,21 @@ public class FastSyncValidationPolicyTest {
@Test @Test
public void shouldAlwaysUseFastValidationWhenFullValidationRateIsZero() { public void shouldAlwaysUseFastValidationWhenFullValidationRateIsZero() {
final FastSyncValidationPolicy policy = final FastSyncValidationPolicy policy =
new FastSyncValidationPolicy(0, LIGHT, FULL, NO_OP_LABELLED_COUNTER); new FastSyncValidationPolicy(0, LIGHT, FULL, NO_OP_LABELLED_1_COUNTER);
assertThat(policy.getValidationModeForNextBlock()).isEqualTo(LIGHT); assertThat(policy.getValidationModeForNextBlock()).isEqualTo(LIGHT);
} }
@Test @Test
public void shouldAlwaysUseFullValidationWhenFullValidationRateIsOne() { public void shouldAlwaysUseFullValidationWhenFullValidationRateIsOne() {
final FastSyncValidationPolicy policy = final FastSyncValidationPolicy policy =
new FastSyncValidationPolicy(1, LIGHT, FULL, NO_OP_LABELLED_COUNTER); new FastSyncValidationPolicy(1, LIGHT, FULL, NO_OP_LABELLED_1_COUNTER);
assertThat(policy.getValidationModeForNextBlock()).isEqualTo(FULL); assertThat(policy.getValidationModeForNextBlock()).isEqualTo(FULL);
} }
@Test @Test
public void shouldEventuallyUseBothModesWhenValidationPolicyIsHalf() { public void shouldEventuallyUseBothModesWhenValidationPolicyIsHalf() {
final FastSyncValidationPolicy policy = final FastSyncValidationPolicy policy =
new FastSyncValidationPolicy(0.5f, LIGHT, FULL, NO_OP_LABELLED_COUNTER); new FastSyncValidationPolicy(0.5f, LIGHT, FULL, NO_OP_LABELLED_1_COUNTER);
boolean seenLight = false; boolean seenLight = false;
boolean seenFull = false; boolean seenFull = false;
// It's theoretically possible to flip a coin 2^31-1 times and only ever get heads but // It's theoretically possible to flip a coin 2^31-1 times and only ever get heads but

@ -44,7 +44,7 @@ public class DeFramerTest {
new PeerInfo(5, "abc", Collections.emptyList(), 0, BytesValue.fromHexString("0x01")), new PeerInfo(5, "abc", Collections.emptyList(), 0, BytesValue.fromHexString("0x01")),
callbacks, callbacks,
connectFuture, connectFuture,
NoOpMetricsSystem.NO_OP_LABELLED_COUNTER); NoOpMetricsSystem.NO_OP_LABELLED_3_COUNTER);
@Test @Test
public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() throws Exception { public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() throws Exception {

@ -50,7 +50,7 @@ public class NettyPeerConnectionTest {
when(channel.eventLoop()).thenReturn(eventLoop); when(channel.eventLoop()).thenReturn(eventLoop);
connection = connection =
new NettyPeerConnection( new NettyPeerConnection(
context, peerInfo, multiplexer, callbacks, NoOpMetricsSystem.NO_OP_LABELLED_COUNTER); context, peerInfo, multiplexer, callbacks, NoOpMetricsSystem.NO_OP_LABELLED_3_COUNTER);
} }
@Test @Test

@ -20,27 +20,25 @@ import tech.pegasys.pantheon.metrics.Observation;
import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.OperationTimer.TimingContext; import tech.pegasys.pantheon.metrics.OperationTimer.TimingContext;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.prometheus.client.Collector; import com.google.common.base.Preconditions;
public class NoOpMetricsSystem implements MetricsSystem { public class NoOpMetricsSystem implements MetricsSystem {
public static final Counter NO_OP_COUNTER = new NoOpCounter(); public static final Counter NO_OP_COUNTER = new NoOpCounter();
private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0; private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0;
private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT; public static final OperationTimer NO_OP_OPERATION_TIMER = () -> NO_OP_TIMING_CONTEXT;
public static final LabelledMetric<OperationTimer> NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER;
public static final LabelledMetric<Counter> NO_OP_LABELLED_COUNTER = label -> NO_OP_COUNTER; public static final LabelledMetric<Counter> NO_OP_LABELLED_1_COUNTER =
public static final Collector NO_OP_COLLECTOR = new LabelCountingNoOpMetric<>(1, NO_OP_COUNTER);
new Collector() { public static final LabelledMetric<Counter> NO_OP_LABELLED_2_COUNTER =
@Override new LabelCountingNoOpMetric<>(2, NO_OP_COUNTER);
public List<MetricFamilySamples> collect() { public static final LabelledMetric<Counter> NO_OP_LABELLED_3_COUNTER =
return Collections.emptyList(); new LabelCountingNoOpMetric<>(3, NO_OP_COUNTER);
} public static final LabelledMetric<OperationTimer> NO_OP_LABELLED_1_OPERATION_TIMER =
}; new LabelCountingNoOpMetric<>(1, NO_OP_OPERATION_TIMER);
@Override @Override
public LabelledMetric<Counter> createLabelledCounter( public LabelledMetric<Counter> createLabelledCounter(
@ -48,7 +46,20 @@ public class NoOpMetricsSystem implements MetricsSystem {
final String name, final String name,
final String help, final String help,
final String... labelNames) { final String... labelNames) {
return NO_OP_LABELLED_COUNTER; return getCounterLabelledMetric(labelNames.length);
}
public static LabelledMetric<Counter> getCounterLabelledMetric(final int labelCount) {
switch (labelCount) {
case 1:
return NO_OP_LABELLED_1_COUNTER;
case 2:
return NO_OP_LABELLED_2_COUNTER;
case 3:
return NO_OP_LABELLED_3_COUNTER;
default:
return new LabelCountingNoOpMetric<>(labelCount, NO_OP_COUNTER);
}
} }
@Override @Override
@ -57,7 +68,16 @@ public class NoOpMetricsSystem implements MetricsSystem {
final String name, final String name,
final String help, final String help,
final String... labelNames) { final String... labelNames) {
return NO_OP_LABELLED_TIMER; return getOperationTimerLabelledMetric(labelNames.length);
}
public static LabelledMetric<OperationTimer> getOperationTimerLabelledMetric(
final int labelCount) {
if (labelCount == 1) {
return NO_OP_LABELLED_1_OPERATION_TIMER;
} else {
return new LabelCountingNoOpMetric<>(labelCount, NO_OP_OPERATION_TIMER);
}
} }
@Override @Override
@ -76,4 +96,23 @@ public class NoOpMetricsSystem implements MetricsSystem {
public Stream<Observation> getMetrics() { public Stream<Observation> getMetrics() {
return Stream.empty(); return Stream.empty();
} }
public static class LabelCountingNoOpMetric<T> implements LabelledMetric<T> {
final int labelCount;
final T fakeMetric;
LabelCountingNoOpMetric(final int labelCount, final T fakeMetric) {
this.labelCount = labelCount;
this.fakeMetric = fakeMetric;
}
@Override
public T labels(final String... labels) {
Preconditions.checkArgument(
labels.length == labelCount,
"The count of labels used must match the count of labels expected.");
return fakeMetric;
}
}
} }

@ -99,7 +99,7 @@ public class PrometheusMetricsSystem implements MetricsSystem {
addCollector(category, counter); addCollector(category, counter);
return new PrometheusCounter(counter); return new PrometheusCounter(counter);
} else { } else {
return NoOpMetricsSystem.NO_OP_LABELLED_COUNTER; return NoOpMetricsSystem.getCounterLabelledMetric(labelNames.length);
} }
}); });
} }
@ -128,7 +128,7 @@ public class PrometheusMetricsSystem implements MetricsSystem {
addCollector(category, summary); addCollector(category, summary);
return new PrometheusTimer(summary); return new PrometheusTimer(summary);
} else { } else {
return NoOpMetricsSystem.NO_OP_LABELLED_TIMER; return NoOpMetricsSystem.getOperationTimerLabelledMetric(labelNames.length);
} }
}); });
} }

@ -0,0 +1,71 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.metrics.noop;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import org.junit.Test;
public class NoOpMetricsSystemTest {
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Test
public void labelCountsMatchOnCounter() {
final LabelledMetric<Counter> labeledCounter =
metricsSystem.createLabelledCounter(MetricCategory.PROCESS, "name", "help", "label1");
assertThat(labeledCounter.labels("one")).isSameAs(NoOpMetricsSystem.NO_OP_COUNTER);
}
@Test
public void failsWheLabelCountsDoNotMatchOnCounter() {
final LabelledMetric<Counter> labeledCounter =
metricsSystem.createLabelledCounter(
MetricCategory.PROCESS, "name", "help", "label1", "label2");
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> labeledCounter.labels("one"))
.withMessage("The count of labels used must match the count of labels expected.");
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> labeledCounter.labels("one", "two", "three"))
.withMessage("The count of labels used must match the count of labels expected.");
}
@Test
public void labelCountsMatchOnTimer() {
final LabelledMetric<OperationTimer> labeledTimer =
metricsSystem.createLabelledTimer(MetricCategory.PROCESS, "name", "help", "label1");
assertThat(labeledTimer.labels("one")).isSameAs(NoOpMetricsSystem.NO_OP_OPERATION_TIMER);
}
@Test
public void failsWheLabelCountsDoNotMatchOnTimer() {
final LabelledMetric<OperationTimer> labeledTimer =
metricsSystem.createLabelledTimer(
MetricCategory.PROCESS, "name", "help", "label1", "label2");
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> labeledTimer.labels("one"))
.withMessage("The count of labels used must match the count of labels expected.");
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> labeledTimer.labels("one", "two", "three"))
.withMessage("The count of labels used must match the count of labels expected.");
}
}

@ -181,7 +181,7 @@ public class PrometheusMetricsSystemTest {
// do a category we are not watching // do a category we are not watching
final LabelledMetric<Counter> counterN = final LabelledMetric<Counter> counterN =
localMetricSystem.createLabelledCounter(NETWORK, "ABC", "Not that kind of network", "show"); localMetricSystem.createLabelledCounter(NETWORK, "ABC", "Not that kind of network", "show");
assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_COUNTER); assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
counterN.labels("show").inc(); counterN.labels("show").inc();
assertThat(localMetricSystem.getMetrics()).isEmpty(); assertThat(localMetricSystem.getMetrics()).isEmpty();
@ -189,7 +189,7 @@ public class PrometheusMetricsSystemTest {
// do a category we are watching // do a category we are watching
final LabelledMetric<Counter> counterR = final LabelledMetric<Counter> counterR =
localMetricSystem.createLabelledCounter(RPC, "name", "Not useful", "method"); localMetricSystem.createLabelledCounter(RPC, "name", "Not useful", "method");
assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_COUNTER); assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER);
counterR.labels("op").inc(); counterR.labels("op").inc();
assertThat(localMetricSystem.getMetrics()) assertThat(localMetricSystem.getMetrics())

@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.waitAtMost; import static org.awaitility.Awaitility.waitAtMost;
import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_COUNTER; import static tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem.NO_OP_LABELLED_2_COUNTER;
import tech.pegasys.pantheon.metrics.Counter; import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.LabelledMetric;
@ -76,7 +76,7 @@ public class PipelineBuilderTest {
public void shouldPipeTasksFromSupplierToCompleter() throws Exception { public void shouldPipeTasksFromSupplierToCompleter() throws Exception {
final List<Integer> output = new ArrayList<>(); final List<Integer> output = new ArrayList<>();
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.andFinishWith("end", output::add); .andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService); final CompletableFuture<?> result = pipeline.start(executorService);
result.get(10, SECONDS); result.get(10, SECONDS);
@ -87,7 +87,7 @@ public class PipelineBuilderTest {
public void shouldPassInputThroughIntermediateStage() throws Exception { public void shouldPassInputThroughIntermediateStage() throws Exception {
final List<String> output = new ArrayList<>(); final List<String> output = new ArrayList<>();
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenProcess("toString", Object::toString) .thenProcess("toString", Object::toString)
.andFinishWith("end", output::add); .andFinishWith("end", output::add);
@ -102,7 +102,7 @@ public class PipelineBuilderTest {
public void shouldCombineIntoBatches() throws Exception { public void shouldCombineIntoBatches() throws Exception {
final BlockingQueue<List<Integer>> output = new ArrayBlockingQueue<>(10); final BlockingQueue<List<Integer>> output = new ArrayBlockingQueue<>(10);
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.<Integer>createPipeline("source", 20, NO_OP_LABELLED_COUNTER) PipelineBuilder.<Integer>createPipeline("source", 20, NO_OP_LABELLED_2_COUNTER)
.inBatches(6) .inBatches(6)
.andFinishWith("end", output::offer); .andFinishWith("end", output::offer);
@ -133,7 +133,7 @@ public class PipelineBuilderTest {
public void shouldProcessAsync() throws Exception { public void shouldProcessAsync() throws Exception {
final List<String> output = new ArrayList<>(); final List<String> output = new ArrayList<>();
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3) .thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3)
.andFinishWith("end", output::add); .andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService); final CompletableFuture<?> result = pipeline.start(executorService);
@ -149,7 +149,7 @@ public class PipelineBuilderTest {
final List<CompletableFuture<String>> futures = new CopyOnWriteArrayList<>(); final List<CompletableFuture<String>> futures = new CopyOnWriteArrayList<>();
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom( PipelineBuilder.createPipelineFrom(
"input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_COUNTER) "input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_2_COUNTER)
.thenProcessAsync( .thenProcessAsync(
"createFuture", "createFuture",
value -> { value -> {
@ -186,7 +186,7 @@ public class PipelineBuilderTest {
public void shouldFlatMapItems() throws Exception { public void shouldFlatMapItems() throws Exception {
final List<Integer> output = new ArrayList<>(); final List<Integer> output = new ArrayList<>();
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20) .thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20)
.andFinishWith("end", output::add); .andFinishWith("end", output::add);
@ -203,7 +203,7 @@ public class PipelineBuilderTest {
final List<String> output = synchronizedList(new ArrayList<>()); final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenProcessInParallel( .thenProcessInParallel(
"stageName", "stageName",
value -> { value -> {
@ -238,7 +238,7 @@ public class PipelineBuilderTest {
final List<String> output = synchronizedList(new ArrayList<>()); final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenFlatMapInParallel( .thenFlatMapInParallel(
"stageName", "stageName",
value -> { value -> {
@ -278,7 +278,7 @@ public class PipelineBuilderTest {
final List<Integer> output = synchronizedList(new ArrayList<>()); final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenProcess( .thenProcess(
"stageName", "stageName",
value -> { value -> {
@ -314,7 +314,7 @@ public class PipelineBuilderTest {
final List<Integer> output = synchronizedList(new ArrayList<>()); final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenProcess( .thenProcess(
"stageName", "stageName",
value -> { value -> {
@ -347,7 +347,7 @@ public class PipelineBuilderTest {
public void shouldAbortPipelineWhenProcessorThrowsException() { public void shouldAbortPipelineWhenProcessorThrowsException() {
final RuntimeException expectedError = new RuntimeException("Oops"); final RuntimeException expectedError = new RuntimeException("Oops");
final Pipeline<Integer> pipeline = final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_COUNTER) PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
.thenProcess( .thenProcess(
"stageName", "stageName",
(Function<Integer, Integer>) (Function<Integer, Integer>)

Loading…
Cancel
Save