mirror of https://github.com/hyperledger/besu
Add OTLP metrics support (#1492)
* Add OTLP metrics support Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Don't automatically push to grpc. Only allow if pushEnabled is set to true Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Code review Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Missed refactoring Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Add missing header Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Don't make otel depend on the push enabled flag Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Expose JUL logs to log4j2 Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Code review fixes and make sure not to start push gateway if not set to prometheus Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * spotless Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Fix unit test Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * code review feedback Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Add changelog entry Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>pull/1559/head
parent
94aec764ff
commit
45fd9f861e
@ -0,0 +1,59 @@ |
|||||||
|
/* |
||||||
|
* Copyright ConsenSys AG. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||||
|
* specific language governing permissions and limitations under the License. |
||||||
|
* |
||||||
|
* SPDX-License-Identifier: Apache-2.0 |
||||||
|
*/ |
||||||
|
package org.hyperledger.besu.metrics; |
||||||
|
|
||||||
|
import static org.hyperledger.besu.metrics.MetricsProtocol.OPENTELEMETRY; |
||||||
|
import static org.hyperledger.besu.metrics.MetricsProtocol.PROMETHEUS; |
||||||
|
|
||||||
|
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; |
||||||
|
import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem; |
||||||
|
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration; |
||||||
|
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem; |
||||||
|
|
||||||
|
/** Creates a new metric system based on configuration. */ |
||||||
|
public class MetricsSystemFactory { |
||||||
|
|
||||||
|
private MetricsSystemFactory() {} |
||||||
|
|
||||||
|
/** |
||||||
|
* Creates and starts a new metric system to observe the behavior of the client |
||||||
|
* |
||||||
|
* @param metricsConfiguration the configuration of the metric system |
||||||
|
* @return a new metric system |
||||||
|
*/ |
||||||
|
public static ObservableMetricsSystem create(final MetricsConfiguration metricsConfiguration) { |
||||||
|
if (!metricsConfiguration.isEnabled() && !metricsConfiguration.isPushEnabled()) { |
||||||
|
return new NoOpMetricsSystem(); |
||||||
|
} |
||||||
|
if (PROMETHEUS.equals(metricsConfiguration.getProtocol())) { |
||||||
|
final PrometheusMetricsSystem metricsSystem = |
||||||
|
new PrometheusMetricsSystem( |
||||||
|
metricsConfiguration.getMetricCategories(), metricsConfiguration.isTimersEnabled()); |
||||||
|
metricsSystem.init(); |
||||||
|
return metricsSystem; |
||||||
|
} else if (OPENTELEMETRY.equals(metricsConfiguration.getProtocol())) { |
||||||
|
final OpenTelemetrySystem metricsSystem = |
||||||
|
new OpenTelemetrySystem( |
||||||
|
metricsConfiguration.getMetricCategories(), |
||||||
|
metricsConfiguration.isTimersEnabled(), |
||||||
|
metricsConfiguration.getPrometheusJob()); |
||||||
|
metricsSystem.initDefaults(); |
||||||
|
return metricsSystem; |
||||||
|
} else { |
||||||
|
throw new IllegalArgumentException( |
||||||
|
"Invalid metrics protocol " + metricsConfiguration.getProtocol()); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,67 @@ |
|||||||
|
/* |
||||||
|
* Copyright ConsenSys AG. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||||
|
* specific language governing permissions and limitations under the License. |
||||||
|
* |
||||||
|
* SPDX-License-Identifier: Apache-2.0 |
||||||
|
* |
||||||
|
*/ |
||||||
|
package org.hyperledger.besu.metrics.opentelemetry; |
||||||
|
|
||||||
|
import org.hyperledger.besu.metrics.MetricsService; |
||||||
|
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration; |
||||||
|
|
||||||
|
import java.util.Collections; |
||||||
|
import java.util.Optional; |
||||||
|
import java.util.concurrent.CompletableFuture; |
||||||
|
|
||||||
|
import io.opentelemetry.exporters.otlp.OtlpGrpcMetricExporter; |
||||||
|
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader; |
||||||
|
|
||||||
|
public class MetricsOtelGrpcPushService implements MetricsService { |
||||||
|
|
||||||
|
private final MetricsConfiguration configuration; |
||||||
|
private final OpenTelemetrySystem metricsSystem; |
||||||
|
private IntervalMetricReader periodicReader; |
||||||
|
|
||||||
|
public MetricsOtelGrpcPushService( |
||||||
|
final MetricsConfiguration configuration, final OpenTelemetrySystem metricsSystem) { |
||||||
|
this.configuration = configuration; |
||||||
|
this.metricsSystem = metricsSystem; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public CompletableFuture<?> start() { |
||||||
|
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.getDefault(); |
||||||
|
IntervalMetricReader.Builder builder = |
||||||
|
IntervalMetricReader.builder() |
||||||
|
.setExportIntervalMillis(configuration.getPushInterval() * 1000L) |
||||||
|
.readEnvironmentVariables() |
||||||
|
.readSystemProperties() |
||||||
|
.setMetricProducers( |
||||||
|
Collections.singleton(metricsSystem.getMeterSdkProvider().getMetricProducer())) |
||||||
|
.setMetricExporter(exporter); |
||||||
|
this.periodicReader = builder.build(); |
||||||
|
return CompletableFuture.completedFuture(null); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public CompletableFuture<?> stop() { |
||||||
|
if (periodicReader != null) { |
||||||
|
periodicReader.shutdown(); |
||||||
|
} |
||||||
|
return CompletableFuture.completedFuture(null); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Optional<Integer> getPort() { |
||||||
|
return Optional.empty(); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,65 @@ |
|||||||
|
/* |
||||||
|
* Copyright ConsenSys AG. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||||
|
* specific language governing permissions and limitations under the License. |
||||||
|
* |
||||||
|
* SPDX-License-Identifier: Apache-2.0 |
||||||
|
*/ |
||||||
|
package org.hyperledger.besu.metrics.opentelemetry; |
||||||
|
|
||||||
|
import org.hyperledger.besu.plugin.services.metrics.Counter; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import io.opentelemetry.common.Labels; |
||||||
|
import io.opentelemetry.metrics.LongCounter; |
||||||
|
|
||||||
|
public class OpenTelemetryCounter implements LabelledMetric<Counter> { |
||||||
|
|
||||||
|
private final LongCounter counter; |
||||||
|
private final String[] labelNames; |
||||||
|
|
||||||
|
public OpenTelemetryCounter(final LongCounter counter, final String... labelNames) { |
||||||
|
this.counter = counter; |
||||||
|
this.labelNames = labelNames; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Counter labels(final String... labelValues) { |
||||||
|
List<String> labelKeysAndValues = new ArrayList<>(); |
||||||
|
for (int i = 0; i < labelNames.length; i++) { |
||||||
|
labelKeysAndValues.add(labelNames[i]); |
||||||
|
labelKeysAndValues.add(labelValues[i]); |
||||||
|
} |
||||||
|
final Labels labels = Labels.of(labelKeysAndValues.toArray(new String[] {})); |
||||||
|
LongCounter.BoundLongCounter boundLongCounter = counter.bind(labels); |
||||||
|
return new OpenTelemetryCounter.UnlabelledCounter(boundLongCounter); |
||||||
|
} |
||||||
|
|
||||||
|
private static class UnlabelledCounter implements Counter { |
||||||
|
private final LongCounter.BoundLongCounter counter; |
||||||
|
|
||||||
|
private UnlabelledCounter(final LongCounter.BoundLongCounter counter) { |
||||||
|
this.counter = counter; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void inc() { |
||||||
|
counter.add(1); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void inc(final long amount) { |
||||||
|
counter.add(amount); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,290 @@ |
|||||||
|
/* |
||||||
|
* Copyright ConsenSys AG. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||||
|
* specific language governing permissions and limitations under the License. |
||||||
|
* |
||||||
|
* SPDX-License-Identifier: Apache-2.0 |
||||||
|
*/ |
||||||
|
package org.hyperledger.besu.metrics.opentelemetry; |
||||||
|
|
||||||
|
import org.hyperledger.besu.metrics.BesuMetricCategory; |
||||||
|
import org.hyperledger.besu.metrics.ObservableMetricsSystem; |
||||||
|
import org.hyperledger.besu.metrics.Observation; |
||||||
|
import org.hyperledger.besu.metrics.StandardMetricCategory; |
||||||
|
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.Counter; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.MetricCategory; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; |
||||||
|
|
||||||
|
import java.lang.management.GarbageCollectorMXBean; |
||||||
|
import java.lang.management.ManagementFactory; |
||||||
|
import java.lang.management.MemoryMXBean; |
||||||
|
import java.lang.management.MemoryPoolMXBean; |
||||||
|
import java.lang.management.MemoryUsage; |
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Collection; |
||||||
|
import java.util.EnumSet; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
import java.util.Set; |
||||||
|
import java.util.concurrent.ConcurrentHashMap; |
||||||
|
import java.util.function.DoubleSupplier; |
||||||
|
import java.util.stream.Stream; |
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableSet; |
||||||
|
import io.opentelemetry.common.Attributes; |
||||||
|
import io.opentelemetry.common.Labels; |
||||||
|
import io.opentelemetry.metrics.DoubleValueObserver; |
||||||
|
import io.opentelemetry.metrics.DoubleValueRecorder; |
||||||
|
import io.opentelemetry.metrics.LongCounter; |
||||||
|
import io.opentelemetry.metrics.LongSumObserver; |
||||||
|
import io.opentelemetry.metrics.LongUpDownSumObserver; |
||||||
|
import io.opentelemetry.metrics.Meter; |
||||||
|
import io.opentelemetry.sdk.metrics.MeterSdkProvider; |
||||||
|
import io.opentelemetry.sdk.metrics.data.MetricData; |
||||||
|
import io.opentelemetry.sdk.resources.Resource; |
||||||
|
import io.opentelemetry.sdk.resources.ResourceAttributes; |
||||||
|
|
||||||
|
/** Metrics system relying on the native OpenTelemetry format. */ |
||||||
|
public class OpenTelemetrySystem implements ObservableMetricsSystem { |
||||||
|
private static final String TYPE_LABEL_KEY = "type"; |
||||||
|
private static final String AREA_LABEL_KEY = "area"; |
||||||
|
private static final String POOL_LABEL_KEY = "pool"; |
||||||
|
private static final String USED = "used"; |
||||||
|
private static final String COMMITTED = "committed"; |
||||||
|
private static final String MAX = "max"; |
||||||
|
private static final String HEAP = "heap"; |
||||||
|
private static final String NON_HEAP = "non_heap"; |
||||||
|
|
||||||
|
private final Set<MetricCategory> enabledCategories; |
||||||
|
private final boolean timersEnabled; |
||||||
|
private final Map<String, LabelledMetric<Counter>> cachedCounters = new ConcurrentHashMap<>(); |
||||||
|
private final Map<String, LabelledMetric<OperationTimer>> cachedTimers = |
||||||
|
new ConcurrentHashMap<>(); |
||||||
|
private final MeterSdkProvider meterSdkProvider; |
||||||
|
|
||||||
|
public OpenTelemetrySystem( |
||||||
|
final Set<MetricCategory> enabledCategories, |
||||||
|
final boolean timersEnabled, |
||||||
|
final String jobName) { |
||||||
|
this.enabledCategories = ImmutableSet.copyOf(enabledCategories); |
||||||
|
this.timersEnabled = timersEnabled; |
||||||
|
Resource resource = |
||||||
|
Resource.getDefault() |
||||||
|
.merge( |
||||||
|
Resource.create( |
||||||
|
Attributes.newBuilder() |
||||||
|
.setAttribute(ResourceAttributes.SERVICE_NAME, jobName) |
||||||
|
.build())); |
||||||
|
this.meterSdkProvider = MeterSdkProvider.builder().setResource(resource).build(); |
||||||
|
} |
||||||
|
|
||||||
|
MeterSdkProvider getMeterSdkProvider() { |
||||||
|
return meterSdkProvider; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Stream<Observation> streamObservations(final MetricCategory category) { |
||||||
|
return streamObservations().filter(metricData -> metricData.getCategory().equals(category)); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Stream<Observation> streamObservations() { |
||||||
|
Collection<MetricData> metricsList = meterSdkProvider.getMetricProducer().collectAllMetrics(); |
||||||
|
return metricsList.stream().map(this::convertToObservations).flatMap(stream -> stream); |
||||||
|
} |
||||||
|
|
||||||
|
private Stream<Observation> convertToObservations(final MetricData metricData) { |
||||||
|
List<Observation> observations = new ArrayList<>(); |
||||||
|
MetricCategory category = |
||||||
|
categoryNameToMetricCategory(metricData.getInstrumentationLibraryInfo().getName()); |
||||||
|
for (MetricData.Point point : metricData.getPoints()) { |
||||||
|
List<String> labels = new ArrayList<>(); |
||||||
|
point.getLabels().forEach((k, v) -> labels.add(v)); |
||||||
|
observations.add( |
||||||
|
new Observation( |
||||||
|
category, metricData.getName(), extractValue(metricData.getType(), point), labels)); |
||||||
|
} |
||||||
|
return observations.stream(); |
||||||
|
} |
||||||
|
|
||||||
|
private MetricCategory categoryNameToMetricCategory(final String name) { |
||||||
|
Set<MetricCategory> categories = |
||||||
|
ImmutableSet.<MetricCategory>builder() |
||||||
|
.addAll(EnumSet.allOf(BesuMetricCategory.class)) |
||||||
|
.addAll(EnumSet.allOf(StandardMetricCategory.class)) |
||||||
|
.build(); |
||||||
|
for (MetricCategory category : categories) { |
||||||
|
if (category.getName().equals(name)) { |
||||||
|
return category; |
||||||
|
} |
||||||
|
} |
||||||
|
throw new IllegalArgumentException("Invalid metric category: " + name); |
||||||
|
} |
||||||
|
|
||||||
|
private Object extractValue(final MetricData.Type type, final MetricData.Point point) { |
||||||
|
switch (type) { |
||||||
|
case NON_MONOTONIC_LONG: |
||||||
|
case MONOTONIC_LONG: |
||||||
|
return ((MetricData.LongPoint) point).getValue(); |
||||||
|
case NON_MONOTONIC_DOUBLE: |
||||||
|
case MONOTONIC_DOUBLE: |
||||||
|
return ((MetricData.DoublePoint) point).getValue(); |
||||||
|
case SUMMARY: |
||||||
|
return ((MetricData.SummaryPoint) point).getPercentileValues(); |
||||||
|
default: |
||||||
|
throw new UnsupportedOperationException("Unsupported type " + type); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public LabelledMetric<Counter> createLabelledCounter( |
||||||
|
final MetricCategory category, |
||||||
|
final String name, |
||||||
|
final String help, |
||||||
|
final String... labelNames) { |
||||||
|
return cachedCounters.computeIfAbsent( |
||||||
|
name, |
||||||
|
(k) -> { |
||||||
|
if (isCategoryEnabled(category)) { |
||||||
|
final Meter meter = meterSdkProvider.get(category.getName()); |
||||||
|
|
||||||
|
final LongCounter counter = meter.longCounterBuilder(name).setDescription(help).build(); |
||||||
|
return new OpenTelemetryCounter(counter, labelNames); |
||||||
|
} else { |
||||||
|
return NoOpMetricsSystem.getCounterLabelledMetric(labelNames.length); |
||||||
|
} |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public LabelledMetric<OperationTimer> createLabelledTimer( |
||||||
|
final MetricCategory category, |
||||||
|
final String name, |
||||||
|
final String help, |
||||||
|
final String... labelNames) { |
||||||
|
return cachedTimers.computeIfAbsent( |
||||||
|
name, |
||||||
|
(k) -> { |
||||||
|
if (timersEnabled && isCategoryEnabled(category)) { |
||||||
|
final Meter meter = meterSdkProvider.get(category.getName()); |
||||||
|
|
||||||
|
final DoubleValueRecorder recorder = |
||||||
|
meter.doubleValueRecorderBuilder(name).setDescription(help).build(); |
||||||
|
return new OpenTelemetryTimer(recorder, labelNames); |
||||||
|
} else { |
||||||
|
return NoOpMetricsSystem.getOperationTimerLabelledMetric(labelNames.length); |
||||||
|
} |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void createGauge( |
||||||
|
final MetricCategory category, |
||||||
|
final String name, |
||||||
|
final String help, |
||||||
|
final DoubleSupplier valueSupplier) { |
||||||
|
if (isCategoryEnabled(category)) { |
||||||
|
final Meter meter = meterSdkProvider.get(category.getName()); |
||||||
|
DoubleValueObserver observer = |
||||||
|
meter.doubleValueObserverBuilder(name).setDescription(help).build(); |
||||||
|
observer.setCallback(result -> result.observe(valueSupplier.getAsDouble(), Labels.empty())); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Set<MetricCategory> getEnabledCategories() { |
||||||
|
return enabledCategories; |
||||||
|
} |
||||||
|
|
||||||
|
public void initDefaults() { |
||||||
|
if (isCategoryEnabled(StandardMetricCategory.JVM)) { |
||||||
|
collectGC(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private void collectGC() { |
||||||
|
final List<GarbageCollectorMXBean> garbageCollectors = |
||||||
|
ManagementFactory.getGarbageCollectorMXBeans(); |
||||||
|
final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); |
||||||
|
final List<MemoryPoolMXBean> poolBeans = ManagementFactory.getMemoryPoolMXBeans(); |
||||||
|
final Meter meter = meterSdkProvider.get(StandardMetricCategory.JVM.getName()); |
||||||
|
final LongSumObserver gcMetric = |
||||||
|
meter |
||||||
|
.longSumObserverBuilder("jvm.gc.collection") |
||||||
|
.setDescription("Time spent in a given JVM garbage collector in milliseconds.") |
||||||
|
.setUnit("ms") |
||||||
|
.build(); |
||||||
|
final List<Labels> labelSets = new ArrayList<>(garbageCollectors.size()); |
||||||
|
for (final GarbageCollectorMXBean gc : garbageCollectors) { |
||||||
|
labelSets.add(Labels.of("gc", gc.getName())); |
||||||
|
} |
||||||
|
|
||||||
|
gcMetric.setCallback( |
||||||
|
resultLongObserver -> { |
||||||
|
for (int i = 0; i < garbageCollectors.size(); i++) { |
||||||
|
resultLongObserver.observe( |
||||||
|
garbageCollectors.get(i).getCollectionTime(), labelSets.get(i)); |
||||||
|
} |
||||||
|
}); |
||||||
|
|
||||||
|
final LongUpDownSumObserver areaMetric = |
||||||
|
meter |
||||||
|
.longUpDownSumObserverBuilder("jvm.memory.area") |
||||||
|
.setDescription("Bytes of a given JVM memory area.") |
||||||
|
.setUnit("By") |
||||||
|
.build(); |
||||||
|
final Labels usedHeap = Labels.of(TYPE_LABEL_KEY, USED, AREA_LABEL_KEY, HEAP); |
||||||
|
final Labels usedNonHeap = Labels.of(TYPE_LABEL_KEY, USED, AREA_LABEL_KEY, NON_HEAP); |
||||||
|
final Labels committedHeap = Labels.of(TYPE_LABEL_KEY, COMMITTED, AREA_LABEL_KEY, HEAP); |
||||||
|
final Labels committedNonHeap = Labels.of(TYPE_LABEL_KEY, COMMITTED, AREA_LABEL_KEY, NON_HEAP); |
||||||
|
// TODO: Decide if max is needed or not. May be derived with some approximation from max(used).
|
||||||
|
final Labels maxHeap = Labels.of(TYPE_LABEL_KEY, MAX, AREA_LABEL_KEY, HEAP); |
||||||
|
final Labels maxNonHeap = Labels.of(TYPE_LABEL_KEY, MAX, AREA_LABEL_KEY, NON_HEAP); |
||||||
|
areaMetric.setCallback( |
||||||
|
resultLongObserver -> { |
||||||
|
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage(); |
||||||
|
MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage(); |
||||||
|
resultLongObserver.observe(heapUsage.getUsed(), usedHeap); |
||||||
|
resultLongObserver.observe(nonHeapUsage.getUsed(), usedNonHeap); |
||||||
|
resultLongObserver.observe(heapUsage.getUsed(), committedHeap); |
||||||
|
resultLongObserver.observe(nonHeapUsage.getUsed(), committedNonHeap); |
||||||
|
resultLongObserver.observe(heapUsage.getUsed(), maxHeap); |
||||||
|
resultLongObserver.observe(nonHeapUsage.getUsed(), maxNonHeap); |
||||||
|
}); |
||||||
|
|
||||||
|
final LongUpDownSumObserver poolMetric = |
||||||
|
meter |
||||||
|
.longUpDownSumObserverBuilder("jvm.memory.pool") |
||||||
|
.setDescription("Bytes of a given JVM memory pool.") |
||||||
|
.setUnit("By") |
||||||
|
.build(); |
||||||
|
final List<Labels> usedLabelSets = new ArrayList<>(poolBeans.size()); |
||||||
|
final List<Labels> committedLabelSets = new ArrayList<>(poolBeans.size()); |
||||||
|
final List<Labels> maxLabelSets = new ArrayList<>(poolBeans.size()); |
||||||
|
for (final MemoryPoolMXBean pool : poolBeans) { |
||||||
|
usedLabelSets.add(Labels.of(TYPE_LABEL_KEY, USED, POOL_LABEL_KEY, pool.getName())); |
||||||
|
committedLabelSets.add(Labels.of(TYPE_LABEL_KEY, COMMITTED, POOL_LABEL_KEY, pool.getName())); |
||||||
|
maxLabelSets.add(Labels.of(TYPE_LABEL_KEY, MAX, POOL_LABEL_KEY, pool.getName())); |
||||||
|
} |
||||||
|
poolMetric.setCallback( |
||||||
|
resultLongObserver -> { |
||||||
|
for (int i = 0; i < poolBeans.size(); i++) { |
||||||
|
MemoryUsage poolUsage = poolBeans.get(i).getUsage(); |
||||||
|
resultLongObserver.observe(poolUsage.getUsed(), usedLabelSets.get(i)); |
||||||
|
resultLongObserver.observe(poolUsage.getCommitted(), committedLabelSets.get(i)); |
||||||
|
// TODO: Decide if max is needed or not. May be derived with some approximation from
|
||||||
|
// max(used).
|
||||||
|
resultLongObserver.observe(poolUsage.getMax(), maxLabelSets.get(i)); |
||||||
|
} |
||||||
|
}); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,53 @@ |
|||||||
|
/* |
||||||
|
* Copyright ConsenSys AG. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||||
|
* specific language governing permissions and limitations under the License. |
||||||
|
* |
||||||
|
* SPDX-License-Identifier: Apache-2.0 |
||||||
|
*/ |
||||||
|
package org.hyperledger.besu.metrics.opentelemetry; |
||||||
|
|
||||||
|
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import io.opentelemetry.common.Labels; |
||||||
|
import io.opentelemetry.metrics.DoubleValueRecorder; |
||||||
|
|
||||||
|
public class OpenTelemetryTimer implements LabelledMetric<OperationTimer> { |
||||||
|
|
||||||
|
private final DoubleValueRecorder recorder; |
||||||
|
private final String[] labelNames; |
||||||
|
|
||||||
|
public OpenTelemetryTimer(final DoubleValueRecorder recorder, final String... labelNames) { |
||||||
|
this.recorder = recorder; |
||||||
|
this.labelNames = labelNames; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public OperationTimer labels(final String... labelValues) { |
||||||
|
List<String> labelKeysAndValues = new ArrayList<>(); |
||||||
|
for (int i = 0; i < labelNames.length; i++) { |
||||||
|
labelKeysAndValues.add(labelNames[i]); |
||||||
|
labelKeysAndValues.add(labelValues[i]); |
||||||
|
} |
||||||
|
final Labels labels = Labels.of(labelKeysAndValues.toArray(new String[] {})); |
||||||
|
return () -> { |
||||||
|
final long startTime = System.nanoTime(); |
||||||
|
return () -> { |
||||||
|
long elapsed = System.nanoTime() - startTime; |
||||||
|
recorder.record(elapsed, labels); |
||||||
|
return elapsed / 1e9; |
||||||
|
}; |
||||||
|
}; |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,253 @@ |
|||||||
|
/* |
||||||
|
* Copyright ConsenSys AG. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||||
|
* specific language governing permissions and limitations under the License. |
||||||
|
* |
||||||
|
* SPDX-License-Identifier: Apache-2.0 |
||||||
|
*/ |
||||||
|
package org.hyperledger.besu.metrics.opentelemetry; |
||||||
|
|
||||||
|
import static java.util.Collections.emptyList; |
||||||
|
import static java.util.Collections.singletonList; |
||||||
|
import static org.assertj.core.api.Assertions.assertThat; |
||||||
|
import static org.hyperledger.besu.metrics.BesuMetricCategory.DEFAULT_METRIC_CATEGORIES; |
||||||
|
import static org.hyperledger.besu.metrics.BesuMetricCategory.NETWORK; |
||||||
|
import static org.hyperledger.besu.metrics.BesuMetricCategory.PEERS; |
||||||
|
import static org.hyperledger.besu.metrics.BesuMetricCategory.RPC; |
||||||
|
import static org.hyperledger.besu.metrics.MetricsProtocol.OPENTELEMETRY; |
||||||
|
|
||||||
|
import org.hyperledger.besu.metrics.BesuMetricCategory; |
||||||
|
import org.hyperledger.besu.metrics.MetricsSystemFactory; |
||||||
|
import org.hyperledger.besu.metrics.ObservableMetricsSystem; |
||||||
|
import org.hyperledger.besu.metrics.Observation; |
||||||
|
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; |
||||||
|
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration; |
||||||
|
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.Counter; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; |
||||||
|
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Collections; |
||||||
|
import java.util.Comparator; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableSet; |
||||||
|
import io.opentelemetry.sdk.metrics.data.MetricData; |
||||||
|
import org.junit.Test; |
||||||
|
|
||||||
|
public class OpenTelemetryMetricsSystemTest { |
||||||
|
|
||||||
|
private static final Comparator<Observation> IGNORE_VALUES = |
||||||
|
Comparator.<Observation, String>comparing(observation -> observation.getCategory().getName()) |
||||||
|
.thenComparing(Observation::getMetricName) |
||||||
|
.thenComparing((o1, o2) -> o1.getLabels().equals(o2.getLabels()) ? 0 : 1); |
||||||
|
|
||||||
|
private final ObservableMetricsSystem metricsSystem = |
||||||
|
new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, true, "job"); |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldCreateObservationFromCounter() { |
||||||
|
final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string"); |
||||||
|
|
||||||
|
counter.inc(); |
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.containsExactly(new Observation(PEERS, "connected", 1L, emptyList())); |
||||||
|
|
||||||
|
counter.inc(); |
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.containsExactly(new Observation(PEERS, "connected", 2L, emptyList())); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldHandleDuplicateCounterCreation() { |
||||||
|
final LabelledMetric<Counter> counter1 = |
||||||
|
metricsSystem.createLabelledCounter(PEERS, "connected", "Some help string"); |
||||||
|
final LabelledMetric<Counter> counter2 = |
||||||
|
metricsSystem.createLabelledCounter(PEERS, "connected", "Some help string"); |
||||||
|
assertThat(counter1).isEqualTo(counter2); |
||||||
|
|
||||||
|
counter1.labels().inc(); |
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.containsExactly(new Observation(PEERS, "connected", 1L, emptyList())); |
||||||
|
|
||||||
|
counter2.labels().inc(); |
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.containsExactly(new Observation(PEERS, "connected", 2L, emptyList())); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldCreateSeparateObservationsForEachCounterLabelValue() { |
||||||
|
final LabelledMetric<Counter> counter = |
||||||
|
metricsSystem.createLabelledCounter(PEERS, "connected", "Some help string", "labelName"); |
||||||
|
|
||||||
|
counter.labels("value1").inc(); |
||||||
|
counter.labels("value2").inc(); |
||||||
|
counter.labels("value1").inc(); |
||||||
|
|
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.containsExactlyInAnyOrder( |
||||||
|
new Observation(PEERS, "connected", 2L, singletonList("value1")), |
||||||
|
new Observation(PEERS, "connected", 1L, singletonList("value2"))); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldIncrementCounterBySpecifiedAmount() { |
||||||
|
final Counter counter = metricsSystem.createCounter(PEERS, "connected", "Some help string"); |
||||||
|
|
||||||
|
counter.inc(5); |
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.containsExactly(new Observation(PEERS, "connected", 5L, emptyList())); |
||||||
|
|
||||||
|
counter.inc(6); |
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.containsExactly(new Observation(PEERS, "connected", 11L, emptyList())); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldCreateObservationsFromTimer() { |
||||||
|
final OperationTimer timer = metricsSystem.createTimer(RPC, "request", "Some help"); |
||||||
|
|
||||||
|
final OperationTimer.TimingContext context = timer.startTimer(); |
||||||
|
context.stopTimer(); |
||||||
|
|
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.usingElementComparator(IGNORE_VALUES) |
||||||
|
.containsExactlyInAnyOrder(new Observation(RPC, "request", null, Collections.emptyList())); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldHandleDuplicateTimerCreation() { |
||||||
|
final LabelledMetric<OperationTimer> timer1 = |
||||||
|
metricsSystem.createLabelledTimer(RPC, "request", "Some help"); |
||||||
|
final LabelledMetric<OperationTimer> timer2 = |
||||||
|
metricsSystem.createLabelledTimer(RPC, "request", "Some help"); |
||||||
|
assertThat(timer1).isEqualTo(timer2); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldCreateObservationsFromTimerWithLabels() { |
||||||
|
final LabelledMetric<OperationTimer> timer = |
||||||
|
metricsSystem.createLabelledTimer(RPC, "request", "Some help", "methodName"); |
||||||
|
|
||||||
|
//noinspection EmptyTryBlock
|
||||||
|
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {} |
||||||
|
//noinspection EmptyTryBlock
|
||||||
|
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {} |
||||||
|
//noinspection EmptyTryBlock
|
||||||
|
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {} |
||||||
|
//noinspection EmptyTryBlock
|
||||||
|
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {} |
||||||
|
|
||||||
|
assertThat(metricsSystem.streamObservations()) |
||||||
|
.usingElementComparator(IGNORE_VALUES) // We don't know how long it will actually take.
|
||||||
|
.containsExactlyInAnyOrder(new Observation(RPC, "request", null, singletonList("method"))); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldNotCreateObservationsFromTimerWhenTimersDisabled() { |
||||||
|
final ObservableMetricsSystem metricsSystem = |
||||||
|
new OpenTelemetrySystem(DEFAULT_METRIC_CATEGORIES, false, "job"); |
||||||
|
final LabelledMetric<OperationTimer> timer = |
||||||
|
metricsSystem.createLabelledTimer(RPC, "request", "Some help", "methodName"); |
||||||
|
|
||||||
|
//noinspection EmptyTryBlock
|
||||||
|
try (final OperationTimer.TimingContext ignored = timer.labels("method").startTimer()) {} |
||||||
|
|
||||||
|
assertThat(metricsSystem.streamObservations()).isEmpty(); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldCreateObservationFromGauge() { |
||||||
|
final MetricsConfiguration metricsConfiguration = |
||||||
|
MetricsConfiguration.builder() |
||||||
|
.metricCategories(ImmutableSet.of(BesuMetricCategory.RPC)) |
||||||
|
.enabled(true) |
||||||
|
.protocol(OPENTELEMETRY) |
||||||
|
.build(); |
||||||
|
final ObservableMetricsSystem localMetricSystem = |
||||||
|
MetricsSystemFactory.create(metricsConfiguration); |
||||||
|
localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7d); |
||||||
|
List<MetricData.ValueAtPercentile> values = new ArrayList<>(); |
||||||
|
values.add(MetricData.ValueAtPercentile.create(0, 7d)); |
||||||
|
values.add(MetricData.ValueAtPercentile.create(100, 7d)); |
||||||
|
|
||||||
|
assertThat(localMetricSystem.streamObservations()) |
||||||
|
.containsExactlyInAnyOrder(new Observation(RPC, "myValue", values, emptyList())); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void shouldOnlyObserveEnabledMetrics() { |
||||||
|
final MetricsConfiguration metricsConfiguration = |
||||||
|
MetricsConfiguration.builder() |
||||||
|
.metricCategories(ImmutableSet.of(BesuMetricCategory.RPC)) |
||||||
|
.enabled(true) |
||||||
|
.protocol(OPENTELEMETRY) |
||||||
|
.build(); |
||||||
|
final ObservableMetricsSystem localMetricSystem = |
||||||
|
MetricsSystemFactory.create(metricsConfiguration); |
||||||
|
|
||||||
|
// do a category we are not watching
|
||||||
|
final LabelledMetric<Counter> counterN = |
||||||
|
localMetricSystem.createLabelledCounter(NETWORK, "ABC", "Not that kind of network", "show"); |
||||||
|
assertThat(counterN).isSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER); |
||||||
|
|
||||||
|
counterN.labels("show").inc(); |
||||||
|
assertThat(localMetricSystem.streamObservations()).isEmpty(); |
||||||
|
|
||||||
|
// do a category we are watching
|
||||||
|
final LabelledMetric<Counter> counterR = |
||||||
|
localMetricSystem.createLabelledCounter(RPC, "name", "Not useful", "method"); |
||||||
|
assertThat(counterR).isNotSameAs(NoOpMetricsSystem.NO_OP_LABELLED_1_COUNTER); |
||||||
|
|
||||||
|
counterR.labels("op").inc(); |
||||||
|
assertThat(localMetricSystem.streamObservations()) |
||||||
|
.containsExactly(new Observation(RPC, "name", (long) 1, singletonList("op"))); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void returnsNoOpMetricsWhenAllDisabled() { |
||||||
|
final MetricsConfiguration metricsConfiguration = |
||||||
|
MetricsConfiguration.builder() |
||||||
|
.enabled(false) |
||||||
|
.pushEnabled(false) |
||||||
|
.protocol(OPENTELEMETRY) |
||||||
|
.build(); |
||||||
|
final MetricsSystem localMetricSystem = MetricsSystemFactory.create(metricsConfiguration); |
||||||
|
|
||||||
|
assertThat(localMetricSystem).isInstanceOf(NoOpMetricsSystem.class); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void returnsPrometheusMetricsWhenEnabled() { |
||||||
|
final MetricsConfiguration metricsConfiguration = |
||||||
|
MetricsConfiguration.builder() |
||||||
|
.enabled(true) |
||||||
|
.pushEnabled(false) |
||||||
|
.protocol(OPENTELEMETRY) |
||||||
|
.build(); |
||||||
|
final MetricsSystem localMetricSystem = MetricsSystemFactory.create(metricsConfiguration); |
||||||
|
|
||||||
|
assertThat(localMetricSystem).isInstanceOf(OpenTelemetrySystem.class); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void returnsNoOpMetricsWhenPushEnabled() { |
||||||
|
final MetricsConfiguration metricsConfiguration = |
||||||
|
MetricsConfiguration.builder() |
||||||
|
.enabled(false) |
||||||
|
.pushEnabled(true) |
||||||
|
.protocol(OPENTELEMETRY) |
||||||
|
.build(); |
||||||
|
final MetricsSystem localMetricSystem = MetricsSystemFactory.create(metricsConfiguration); |
||||||
|
|
||||||
|
assertThat(localMetricSystem).isInstanceOf(OpenTelemetrySystem.class); |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue