7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskExecutor

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7628/head
Matilda Clerke 2 months ago
parent b1c47ae39a
commit d66dd3a4cd
  1. 40
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  2. 9
      metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java
  3. 6
      metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java
  4. 8
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java
  5. 18
      metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java
  6. 2
      plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java

@ -20,14 +20,18 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledGauge;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */ /** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */
public class PeerTaskExecutor { public class PeerTaskExecutor {
@ -36,9 +40,12 @@ public class PeerTaskExecutor {
private final PeerTaskRequestSender requestSender; private final PeerTaskRequestSender requestSender;
private final LabelledMetric<OperationTimer> requestTimer; private final LabelledMetric<OperationTimer> requestTimer;
private final LabelledMetric<Counter> partialSuccessCounter;
private final LabelledMetric<Counter> timeoutCounter; private final LabelledMetric<Counter> timeoutCounter;
private final LabelledMetric<Counter> invalidResponseCounter; private final LabelledMetric<Counter> invalidResponseCounter;
private final LabelledMetric<Counter> internalExceptionCounter; private final LabelledMetric<Counter> internalExceptionCounter;
private final LabelledGauge inflightRequestGauge;
private final Map<String, AtomicInteger> inflightRequestCountByClassName;
public PeerTaskExecutor( public PeerTaskExecutor(
final PeerSelector peerSelector, final PeerSelector peerSelector,
@ -52,6 +59,12 @@ public class PeerTaskExecutor {
"PeerTaskExecutor:RequestTime", "PeerTaskExecutor:RequestTime",
"Time taken to send a request and receive a response", "Time taken to send a request and receive a response",
"className"); "className");
partialSuccessCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"PeerTaskExecutor:PartialSuccessCounter",
"Counter of the number of partial success occurred",
"className");
timeoutCounter = timeoutCounter =
metricsSystem.createLabelledCounter( metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS, BesuMetricCategory.PEERS,
@ -70,6 +83,13 @@ public class PeerTaskExecutor {
"PeerTaskExecutor:InternalExceptionCounter", "PeerTaskExecutor:InternalExceptionCounter",
"Counter of the number of internal exceptions occurred", "Counter of the number of internal exceptions occurred",
"className"); "className");
inflightRequestGauge =
metricsSystem.createLabelledGauge(
BesuMetricCategory.PEERS,
"PeerTaskExecutor:InflightRequestGauge",
"Gauge of the number of inflight requests",
"className");
inflightRequestCountByClassName = new ConcurrentHashMap<>();
} }
public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) { public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
@ -98,22 +118,32 @@ public class PeerTaskExecutor {
public <T> PeerTaskExecutorResult<T> executeAgainstPeer( public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
final PeerTask<T> peerTask, final EthPeer peer) { final PeerTask<T> peerTask, final EthPeer peer) {
String taskClassName = peerTask.getClass().getSimpleName();
AtomicInteger inflightRequestCountForThisTaskClass =
inflightRequestCountByClassName.getOrDefault(taskClassName, new AtomicInteger(0));
if (!inflightRequestGauge.isLabelsObserved(taskClassName)) {
inflightRequestGauge.labels(inflightRequestCountForThisTaskClass::get, taskClassName);
}
MessageData requestMessageData = peerTask.getRequestMessage(); MessageData requestMessageData = peerTask.getRequestMessage();
PeerTaskExecutorResult<T> executorResult; PeerTaskExecutorResult<T> executorResult;
int retriesRemaining = peerTask.getRetriesWithSamePeer(); int retriesRemaining = peerTask.getRetriesWithSamePeer();
do { do {
try { try {
T result; T result;
try (final OperationTimer.TimingContext ignored = try (final OperationTimer.TimingContext ignored =
requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { requestTimer.labels(taskClassName).startTimer()) {
inflightRequestCountForThisTaskClass.incrementAndGet();
MessageData responseMessageData = MessageData responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);
result = peerTask.parseResponse(responseMessageData); result = peerTask.parseResponse(responseMessageData);
} finally {
inflightRequestCountForThisTaskClass.decrementAndGet();
} }
if (peerTask.isPartialSuccess(result)) { if (peerTask.isPartialSuccess(result)) {
partialSuccessCounter.labels(taskClassName).inc();
executorResult = executorResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.PARTIAL_SUCCESS); Optional.ofNullable(result), PeerTaskExecutorResponseCode.PARTIAL_SUCCESS);
@ -131,19 +161,19 @@ public class PeerTaskExecutor {
} catch (InterruptedException | TimeoutException e) { } catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode()); peer.recordRequestTimeout(requestMessageData.getCode());
timeoutCounter.labels(peerTask.getClass().getSimpleName()).inc(); timeoutCounter.labels(taskClassName).inc();
executorResult = executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
} catch (InvalidPeerTaskResponseException e) { } catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage()); peer.recordUselessResponse(e.getMessage());
invalidResponseCounter.labels(peerTask.getClass().getSimpleName()).inc(); invalidResponseCounter.labels(taskClassName).inc();
executorResult = executorResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
} catch (ExecutionException e) { } catch (ExecutionException e) {
internalExceptionCounter.labels(peerTask.getClass().getSimpleName()).inc(); internalExceptionCounter.labels(taskClassName).inc();
executorResult = executorResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);

@ -253,5 +253,14 @@ public class NoOpMetricsSystem implements ObservableMetricsSystem {
"The count of labels used must match the count of labels expected."); "The count of labels used must match the count of labels expected.");
Preconditions.checkNotNull(valueSupplier, "No valueSupplier specified"); Preconditions.checkNotNull(valueSupplier, "No valueSupplier specified");
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
Preconditions.checkArgument(
labelValues.length == labelCount,
"The count of labels used must match the count of labels expected.");
final String labelValuesString = String.join(",", labelValues);
return labelValuesCache.contains(labelValuesString);
}
} }
} }

@ -36,4 +36,10 @@ public class NoOpValueCollector implements LabelledGauge {
} }
labelValuesCreated.add(labelValuesString); labelValuesCreated.add(labelValuesString);
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
final String labelValuesString = String.join(",", labelValues);
return labelValuesCreated.contains(labelValuesString);
}
} }

@ -63,6 +63,14 @@ public class OpenTelemetryGauge implements LabelledGauge {
} }
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
Preconditions.checkArgument(
labelValues.length == labelNames.size(),
"label values and label names need the same number of elements");
return observationsMap.containsKey(getLabels(labelValues));
}
private Attributes getLabels(final String... labelValues) { private Attributes getLabels(final String... labelValues) {
final AttributesBuilder labelsBuilder = Attributes.builder(); final AttributesBuilder labelsBuilder = Attributes.builder();
for (int i = 0; i < labelNames.size(); i++) { for (int i = 0; i < labelNames.size(); i++) {

@ -47,10 +47,7 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
@Override @Override
public synchronized void labels(final DoubleSupplier valueSupplier, final String... labelValues) { public synchronized void labels(final DoubleSupplier valueSupplier, final String... labelValues) {
if (labelValues.length != labelNames.size()) { validateLabelsCardinality(labelValues);
throw new IllegalArgumentException(
"Label values and label names must be the same cardinality");
}
if (observationsMap.putIfAbsent(List.of(labelValues), valueSupplier) != null) { if (observationsMap.putIfAbsent(List.of(labelValues), valueSupplier) != null) {
final String labelValuesString = String.join(",", labelValues); final String labelValuesString = String.join(",", labelValues);
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -58,6 +55,12 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
} }
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
validateLabelsCardinality(labelValues);
return observationsMap.containsKey(List.of(labelValues));
}
@Override @Override
public List<MetricFamilySamples> collect() { public List<MetricFamilySamples> collect() {
final List<MetricFamilySamples.Sample> samples = new ArrayList<>(); final List<MetricFamilySamples.Sample> samples = new ArrayList<>();
@ -68,4 +71,11 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
metricName, labelNames, labels, valueSupplier.getAsDouble()))); metricName, labelNames, labels, valueSupplier.getAsDouble())));
return List.of(new MetricFamilySamples(metricName, Type.GAUGE, help, samples)); return List.of(new MetricFamilySamples(metricName, Type.GAUGE, help, samples));
} }
private void validateLabelsCardinality(final String... labelValues) {
if (labelValues.length != labelNames.size()) {
throw new IllegalArgumentException(
"Label values and label names must be the same cardinality");
}
}
} }

@ -25,4 +25,6 @@ public interface LabelledGauge {
* @param labelValues the label values * @param labelValues the label values
*/ */
void labels(final DoubleSupplier valueSupplier, final String... labelValues); void labels(final DoubleSupplier valueSupplier, final String... labelValues);
boolean isLabelsObserved(final String... labelValues);
} }

Loading…
Cancel
Save