From d66dd3a4cd0ab6ec267ea87a740905ab495cafff Mon Sep 17 00:00:00 2001 From: Matilda Clerke Date: Fri, 11 Oct 2024 11:10:30 +1100 Subject: [PATCH] 7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskExecutor Signed-off-by: Matilda Clerke --- .../manager/peertask/PeerTaskExecutor.java | 40 ++++++++++++++++--- .../besu/metrics/noop/NoOpMetricsSystem.java | 9 +++++ .../besu/metrics/noop/NoOpValueCollector.java | 6 +++ .../opentelemetry/OpenTelemetryGauge.java | 8 ++++ .../metrics/prometheus/PrometheusGauge.java | 18 +++++++-- .../services/metrics/LabelledGauge.java | 2 + 6 files changed, 74 insertions(+), 9 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 9b06297746..6fee0a0577 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -20,14 +20,18 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.plugin.services.metrics.LabelledGauge; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import java.util.Collection; import java.util.HashSet; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; /** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */ public class PeerTaskExecutor { @@ -36,9 +40,12 @@ public class PeerTaskExecutor { private final PeerTaskRequestSender requestSender; private final LabelledMetric requestTimer; + private final LabelledMetric partialSuccessCounter; private final LabelledMetric timeoutCounter; private final LabelledMetric invalidResponseCounter; private final LabelledMetric internalExceptionCounter; + private final LabelledGauge inflightRequestGauge; + private final Map inflightRequestCountByClassName; public PeerTaskExecutor( final PeerSelector peerSelector, @@ -52,6 +59,12 @@ public class PeerTaskExecutor { "PeerTaskExecutor:RequestTime", "Time taken to send a request and receive a response", "className"); + partialSuccessCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.PEERS, + "PeerTaskExecutor:PartialSuccessCounter", + "Counter of the number of partial success occurred", + "className"); timeoutCounter = metricsSystem.createLabelledCounter( BesuMetricCategory.PEERS, @@ -70,6 +83,13 @@ public class PeerTaskExecutor { "PeerTaskExecutor:InternalExceptionCounter", "Counter of the number of internal exceptions occurred", "className"); + inflightRequestGauge = + metricsSystem.createLabelledGauge( + BesuMetricCategory.PEERS, + "PeerTaskExecutor:InflightRequestGauge", + "Gauge of the number of inflight requests", + "className"); + inflightRequestCountByClassName = new ConcurrentHashMap<>(); } public PeerTaskExecutorResult execute(final PeerTask peerTask) { @@ -98,22 +118,32 @@ public class PeerTaskExecutor { public PeerTaskExecutorResult executeAgainstPeer( final PeerTask peerTask, final EthPeer peer) { + String taskClassName = peerTask.getClass().getSimpleName(); + AtomicInteger inflightRequestCountForThisTaskClass = + inflightRequestCountByClassName.getOrDefault(taskClassName, new AtomicInteger(0)); + if (!inflightRequestGauge.isLabelsObserved(taskClassName)) { + inflightRequestGauge.labels(inflightRequestCountForThisTaskClass::get, taskClassName); + } MessageData requestMessageData = peerTask.getRequestMessage(); PeerTaskExecutorResult executorResult; int retriesRemaining = peerTask.getRetriesWithSamePeer(); do { - try { T result; try (final OperationTimer.TimingContext ignored = - requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { + requestTimer.labels(taskClassName).startTimer()) { + inflightRequestCountForThisTaskClass.incrementAndGet(); + MessageData responseMessageData = requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); result = peerTask.parseResponse(responseMessageData); + } finally { + inflightRequestCountForThisTaskClass.decrementAndGet(); } if (peerTask.isPartialSuccess(result)) { + partialSuccessCounter.labels(taskClassName).inc(); executorResult = new PeerTaskExecutorResult<>( Optional.ofNullable(result), PeerTaskExecutorResponseCode.PARTIAL_SUCCESS); @@ -131,19 +161,19 @@ public class PeerTaskExecutor { } catch (InterruptedException | TimeoutException e) { peer.recordRequestTimeout(requestMessageData.getCode()); - timeoutCounter.labels(peerTask.getClass().getSimpleName()).inc(); + timeoutCounter.labels(taskClassName).inc(); executorResult = new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); } catch (InvalidPeerTaskResponseException e) { peer.recordUselessResponse(e.getMessage()); - invalidResponseCounter.labels(peerTask.getClass().getSimpleName()).inc(); + invalidResponseCounter.labels(taskClassName).inc(); executorResult = new PeerTaskExecutorResult<>( Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); } catch (ExecutionException e) { - internalExceptionCounter.labels(peerTask.getClass().getSimpleName()).inc(); + internalExceptionCounter.labels(taskClassName).inc(); executorResult = new PeerTaskExecutorResult<>( Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java index 2d1ee26cfd..5f876fa4d8 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java @@ -253,5 +253,14 @@ public class NoOpMetricsSystem implements ObservableMetricsSystem { "The count of labels used must match the count of labels expected."); Preconditions.checkNotNull(valueSupplier, "No valueSupplier specified"); } + + @Override + public boolean isLabelsObserved(final String... labelValues) { + Preconditions.checkArgument( + labelValues.length == labelCount, + "The count of labels used must match the count of labels expected."); + final String labelValuesString = String.join(",", labelValues); + return labelValuesCache.contains(labelValuesString); + } } } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java index 144e7f3187..6f36f10d2c 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java @@ -36,4 +36,10 @@ public class NoOpValueCollector implements LabelledGauge { } labelValuesCreated.add(labelValuesString); } + + @Override + public boolean isLabelsObserved(final String... labelValues) { + final String labelValuesString = String.join(",", labelValues); + return labelValuesCreated.contains(labelValuesString); + } } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java index 6aea56586a..e1d785c68e 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java @@ -63,6 +63,14 @@ public class OpenTelemetryGauge implements LabelledGauge { } } + @Override + public boolean isLabelsObserved(final String... labelValues) { + Preconditions.checkArgument( + labelValues.length == labelNames.size(), + "label values and label names need the same number of elements"); + return observationsMap.containsKey(getLabels(labelValues)); + } + private Attributes getLabels(final String... labelValues) { final AttributesBuilder labelsBuilder = Attributes.builder(); for (int i = 0; i < labelNames.size(); i++) { diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java index 83000dfac3..b69e3f9062 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java @@ -47,10 +47,7 @@ public class PrometheusGauge extends Collector implements LabelledGauge { @Override public synchronized void labels(final DoubleSupplier valueSupplier, final String... labelValues) { - if (labelValues.length != labelNames.size()) { - throw new IllegalArgumentException( - "Label values and label names must be the same cardinality"); - } + validateLabelsCardinality(labelValues); if (observationsMap.putIfAbsent(List.of(labelValues), valueSupplier) != null) { final String labelValuesString = String.join(",", labelValues); throw new IllegalArgumentException( @@ -58,6 +55,12 @@ public class PrometheusGauge extends Collector implements LabelledGauge { } } + @Override + public boolean isLabelsObserved(final String... labelValues) { + validateLabelsCardinality(labelValues); + return observationsMap.containsKey(List.of(labelValues)); + } + @Override public List collect() { final List samples = new ArrayList<>(); @@ -68,4 +71,11 @@ public class PrometheusGauge extends Collector implements LabelledGauge { metricName, labelNames, labels, valueSupplier.getAsDouble()))); return List.of(new MetricFamilySamples(metricName, Type.GAUGE, help, samples)); } + + private void validateLabelsCardinality(final String... labelValues) { + if (labelValues.length != labelNames.size()) { + throw new IllegalArgumentException( + "Label values and label names must be the same cardinality"); + } + } } diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java index 724e31c58d..c2f64c1113 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java @@ -25,4 +25,6 @@ public interface LabelledGauge { * @param labelValues the label values */ void labels(final DoubleSupplier valueSupplier, final String... labelValues); + + boolean isLabelsObserved(final String... labelValues); }