Upgrade OpenTelemetry to 1.2.0 (#2313)

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

Co-authored-by: Sally MacFarlane <sally.macfarlane@consensys.net>
pull/2333/head
Antoine Toulme 3 years ago committed by GitHub
parent 81d4fac8c9
commit 5670264f12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      besu/src/main/java/org/hyperledger/besu/Runner.java
  3. 19
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java
  4. 108
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java
  5. 17
      gradle/versions.gradle
  6. 2
      metrics/core/build.gradle
  7. 23
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/MetricsOtelGrpcPushService.java
  8. 9
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryCounter.java
  9. 49
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetrySystem.java
  10. 2
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryTimer.java
  11. 7
      services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipeline.java

@ -3,6 +3,7 @@
## 21.1.7
### Additions and Improvements
* Upgrade OpenTelemetry to 1.2.0. [\#2313](https://github.com/hyperledger/besu/pull/2313)
* Ethereum Classic Magneto Hard Fork [\#2315](https://github.com/hyperledger/besu/pull/2315)

@ -106,6 +106,7 @@ public class Runner implements AutoCloseable {
public void start() {
try {
LOG.info("Starting Ethereum main loop ... ");
metrics.ifPresent(service -> waitForServiceToStart("metrics", service.start()));
natService.start();
networkRunner.start();
if (networkRunner.getNetwork().isP2pEnabled()) {
@ -120,7 +121,6 @@ public class Runner implements AutoCloseable {
jsonRpc.ifPresent(service -> waitForServiceToStart("jsonRpc", service.start()));
graphQLHttp.ifPresent(service -> waitForServiceToStart("graphQLHttp", service.start()));
websocketRpc.ifPresent(service -> waitForServiceToStart("websocketRpc", service.start()));
metrics.ifPresent(service -> waitForServiceToStart("metrics", service.start()));
ethStatsService.ifPresent(EthStatsService::start);
LOG.info("Ethereum main loop is up.");
writeBesuPortsToFile();

@ -70,13 +70,14 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.extension.trace.propagation.JaegerPropagator;
import io.opentelemetry.extension.trace.propagation.TraceMultiPropagator;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
@ -114,13 +115,13 @@ public class JsonRpcHttpService {
private static final String EMPTY_RESPONSE = "";
private static final TextMapPropagator traceFormats =
TraceMultiPropagator.create(
TextMapPropagator.composite(
JaegerPropagator.getInstance(),
B3Propagator.getInstance(),
B3Propagator.injectingSingleHeader(),
W3CBaggagePropagator.getInstance());
private static final TextMapPropagator.Getter<HttpServerRequest> requestAttributesGetter =
new TextMapPropagator.Getter<>() {
private static final TextMapGetter<HttpServerRequest> requestAttributesGetter =
new TextMapGetter<>() {
@Override
public Iterable<String> keys(final HttpServerRequest carrier) {
return carrier.headers().names();
@ -142,7 +143,7 @@ public class JsonRpcHttpService {
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private final Tracer tracer;
private Tracer tracer;
private final int maxActiveConnections;
private final AtomicInteger activeConnectionsCount = new AtomicInteger();
@ -210,7 +211,6 @@ public class JsonRpcHttpService {
this.authenticationService = authenticationService;
this.livenessService = livenessService;
this.readinessService = readinessService;
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
this.maxActiveConnections = config.getMaxActiveConnections();
}
@ -226,6 +226,7 @@ public class JsonRpcHttpService {
public CompletableFuture<?> start() {
LOG.info("Starting JSON-RPC service on {}:{}", config.getHost(), config.getPort());
LOG.debug("max number of active connections {}", maxActiveConnections);
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
try {
@ -365,7 +366,7 @@ public class JsonRpcHttpService {
tracer
.spanBuilder(address.host() + ":" + address.port())
.setParent(parent)
.setSpanKind(Span.Kind.SERVER)
.setSpanKind(SpanKind.SERVER)
.startSpan();
routingContext.put(SPAN_CONTEXT, Context.current().with(serverSpan));
@ -695,7 +696,7 @@ public class JsonRpcHttpService {
Span span =
tracer
.spanBuilder(requestBody.getMethod())
.setSpanKind(Span.Kind.INTERNAL)
.setSpanKind(SpanKind.INTERNAL)
.setParent(ctx.get(SPAN_CONTEXT))
.startSpan();
try {

@ -37,9 +37,6 @@ import java.util.Collections;
import java.util.List;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -57,9 +54,6 @@ public abstract class AbstractBlockProcessor implements BlockProcessor {
private static final Logger LOG = LogManager.getLogger();
private static final Tracer tracer =
OpenTelemetry.getGlobalTracer("org.hyperledger.besu.block", "1.0.0");
static final int MAX_GENERATION = 6;
public static class Result implements BlockProcessor.Result {
@ -156,68 +150,62 @@ public abstract class AbstractBlockProcessor implements BlockProcessor {
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final PrivateMetadataUpdater privateMetadataUpdater) {
final Span globalProcessBlock =
tracer.spanBuilder("processBlock").setSpanKind(Span.Kind.INTERNAL).startSpan();
try {
final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
for (final Transaction transaction : transactions) {
if (!hasAvailableBlockBudget(blockHeader, transaction, currentGasUsed)) {
return AbstractBlockProcessor.Result.failed();
}
final WorldUpdater worldStateUpdater = worldState.updater();
final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain);
final Address miningBeneficiary =
miningBeneficiaryCalculator.calculateBeneficiary(blockHeader);
final TransactionProcessingResult result =
transactionProcessor.processTransaction(
blockchain,
worldStateUpdater,
blockHeader,
transaction,
miningBeneficiary,
OperationTracer.NO_TRACING,
blockHashLookup,
true,
TransactionValidationParams.processingBlock(),
privateMetadataUpdater);
if (result.isInvalid()) {
LOG.info(
"Block processing error: transaction invalid '{}'. Block {} Transaction {}",
result.getValidationResult().getInvalidReason(),
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
if (worldState instanceof BonsaiPersistedWorldState) {
((BonsaiWorldStateUpdater) worldStateUpdater).reset();
}
return AbstractBlockProcessor.Result.failed();
}
worldStateUpdater.commit();
currentGasUsed += transaction.getGasLimit() - result.getGasRemaining();
final TransactionReceipt transactionReceipt =
transactionReceiptFactory.create(
transaction.getType(), result, worldState, currentGasUsed);
receipts.add(transactionReceipt);
final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
for (final Transaction transaction : transactions) {
if (!hasAvailableBlockBudget(blockHeader, transaction, currentGasUsed)) {
return AbstractBlockProcessor.Result.failed();
}
if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) {
// no need to log, rewardCoinbase logs the error.
final WorldUpdater worldStateUpdater = worldState.updater();
final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain);
final Address miningBeneficiary =
miningBeneficiaryCalculator.calculateBeneficiary(blockHeader);
final TransactionProcessingResult result =
transactionProcessor.processTransaction(
blockchain,
worldStateUpdater,
blockHeader,
transaction,
miningBeneficiary,
OperationTracer.NO_TRACING,
blockHashLookup,
true,
TransactionValidationParams.processingBlock(),
privateMetadataUpdater);
if (result.isInvalid()) {
LOG.info(
"Block processing error: transaction invalid '{}'. Block {} Transaction {}",
result.getValidationResult().getInvalidReason(),
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
if (worldState instanceof BonsaiPersistedWorldState) {
((BonsaiWorldStateUpdater) worldState.updater()).reset();
((BonsaiWorldStateUpdater) worldStateUpdater).reset();
}
return AbstractBlockProcessor.Result.failed();
}
worldState.persist(blockHeader);
return AbstractBlockProcessor.Result.successful(receipts);
} finally {
globalProcessBlock.end();
worldStateUpdater.commit();
currentGasUsed += transaction.getGasLimit() - result.getGasRemaining();
final TransactionReceipt transactionReceipt =
transactionReceiptFactory.create(
transaction.getType(), result, worldState, currentGasUsed);
receipts.add(transactionReceipt);
}
if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) {
// no need to log, rewardCoinbase logs the error.
if (worldState instanceof BonsaiPersistedWorldState) {
((BonsaiWorldStateUpdater) worldState.updater()).reset();
}
return AbstractBlockProcessor.Result.failed();
}
worldState.persist(blockHeader);
return AbstractBlockProcessor.Result.successful(receipts);
}
protected boolean hasAvailableBlockBudget(

@ -57,18 +57,19 @@ dependencyManagement {
dependency 'io.netty:netty-tcnative-boringssl-static:2.0.35.Final'
dependency group: 'io.netty', name: 'netty-transport-native-epoll', version:'4.1.56.Final', classifier: 'linux-x86_64'
dependency 'io.opentelemetry:opentelemetry-api:0.13.1'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp-metrics:0.13.1-alpha'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp:0.13.1'
dependency 'io.opentelemetry:opentelemetry-extension-trace-propagators:0.13.1'
dependency 'io.opentelemetry:opentelemetry-proto:0.13.1'
dependency 'io.opentelemetry:opentelemetry-sdk-trace:0.13.1'
dependency 'io.opentelemetry:opentelemetry-sdk:0.13.1'
dependency 'io.opentelemetry:opentelemetry-api:1.2.0'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.2.0-alpha'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp:1.2.0'
dependency 'io.opentelemetry:opentelemetry-extension-trace-propagators:1.2.0'
dependency 'io.opentelemetry:opentelemetry-proto:1.2.0-alpha'
dependency 'io.opentelemetry:opentelemetry-sdk-trace:1.2.0'
dependency 'io.opentelemetry:opentelemetry-sdk:1.2.0'
dependency 'io.opentelemetry:opentelemetry-semconv:1.2.0-alpha'
dependency 'io.opentracing.contrib:opentracing-okhttp3:3.0.0'
dependency 'io.opentracing:opentracing-api:0.33.0'
dependency 'io.opentracing:opentracing-util:0.33.0'
dependency 'io.jaegertracing:jaeger-client:1.5.0'
dependency 'io.jaegertracing:jaeger-client:1.6.0'
dependency 'io.pkts:pkts-core:3.0.7'

@ -45,9 +45,11 @@ dependencies {
implementation 'io.netty:netty-all'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.opentelemetry:opentelemetry-sdk'
implementation 'io.opentelemetry:opentelemetry-semconv'
implementation 'io.opentelemetry:opentelemetry-sdk-trace'
implementation 'io.opentelemetry:opentelemetry-exporter-otlp'
implementation 'io.opentelemetry:opentelemetry-exporter-otlp-metrics'
implementation 'io.opentelemetry:opentelemetry-proto'
implementation 'io.prometheus:simpleclient'
implementation 'io.prometheus:simpleclient_common'

@ -27,6 +27,8 @@ import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReaderBuilder;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import org.apache.logging.log4j.LogManager;
@ -52,23 +54,16 @@ public class MetricsOtelGrpcPushService implements MetricsService {
public CompletableFuture<?> start() {
LOG.info("Starting OpenTelemetry push service");
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.getDefault();
IntervalMetricReader.Builder builder =
IntervalMetricReaderBuilder builder =
IntervalMetricReader.builder()
.setExportIntervalMillis(configuration.getPushInterval() * 1000L)
.readEnvironmentVariables()
.readSystemProperties()
.setMetricProducers(
Collections.singleton(metricsSystem.getMeterSdkProvider().getMetricProducer()))
.setMetricProducers(Collections.singleton(metricsSystem.getMeterSdkProvider()))
.setMetricExporter(exporter);
this.periodicReader = builder.build();
this.spanProcessor =
BatchSpanProcessor.builder(
OtlpGrpcSpanExporter.builder()
.readSystemProperties()
.readEnvironmentVariables()
.build())
.build();
OpenTelemetrySdk.get().getTracerManagement().addSpanProcessor(spanProcessor);
this.periodicReader = builder.buildAndStart();
this.spanProcessor = BatchSpanProcessor.builder(OtlpGrpcSpanExporter.builder().build()).build();
OpenTelemetrySdk.builder()
.setTracerProvider(SdkTracerProvider.builder().addSpanProcessor(spanProcessor).build())
.buildAndRegisterGlobal();
return CompletableFuture.completedFuture(null);
}

@ -20,8 +20,9 @@ import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import java.util.ArrayList;
import java.util.List;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.api.metrics.BoundLongCounter;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.common.Labels;
public class OpenTelemetryCounter implements LabelledMetric<Counter> {
@ -41,14 +42,14 @@ public class OpenTelemetryCounter implements LabelledMetric<Counter> {
labelKeysAndValues.add(labelValues[i]);
}
final Labels labels = Labels.of(labelKeysAndValues.toArray(new String[] {}));
LongCounter.BoundLongCounter boundLongCounter = counter.bind(labels);
BoundLongCounter boundLongCounter = counter.bind(labels);
return new OpenTelemetryCounter.UnlabelledCounter(boundLongCounter);
}
private static class UnlabelledCounter implements Counter {
private final LongCounter.BoundLongCounter counter;
private final BoundLongCounter counter;
private UnlabelledCounter(final LongCounter.BoundLongCounter counter) {
private UnlabelledCounter(final BoundLongCounter counter) {
this.counter = counter;
}

@ -41,14 +41,20 @@ import java.util.stream.Stream;
import com.google.common.collect.ImmutableSet;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.api.metrics.DoubleValueRecorder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.resources.ResourceAttributes;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -99,7 +105,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
@Override
public Stream<Observation> streamObservations() {
Collection<MetricData> metricsList = meterSdkProvider.getMetricProducer().collectAllMetrics();
Collection<MetricData> metricsList = meterSdkProvider.collectAllMetrics();
return metricsList.stream().map(this::convertToObservations).flatMap(stream -> stream);
}
@ -107,7 +113,32 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
List<Observation> observations = new ArrayList<>();
MetricCategory category =
categoryNameToMetricCategory(metricData.getInstrumentationLibraryInfo().getName());
for (MetricData.Point point : metricData.getPoints()) {
Collection<?> points;
switch (metricData.getType()) {
case DOUBLE_GAUGE:
points = metricData.getDoubleGaugeData().getPoints();
break;
case DOUBLE_SUM:
points = metricData.getDoubleSumData().getPoints();
break;
case SUMMARY:
points = metricData.getDoubleSummaryData().getPoints();
break;
case LONG_SUM:
points = metricData.getLongSumData().getPoints();
break;
case HISTOGRAM:
points = metricData.getDoubleHistogramData().getPoints();
break;
case LONG_GAUGE:
points = metricData.getLongGaugeData().getPoints();
break;
default:
throw new UnsupportedOperationException("Unsupported type " + metricData.getType().name());
}
for (Object ptObj : points) {
PointData point = (PointData) ptObj;
List<String> labels = new ArrayList<>();
point.getLabels().forEach((k, v) -> labels.add(v));
observations.add(
@ -131,15 +162,17 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
throw new IllegalArgumentException("Invalid metric category: " + name);
}
private Object extractValue(final MetricData.Type type, final MetricData.Point point) {
private Object extractValue(final MetricDataType type, final PointData point) {
switch (type) {
case LONG_GAUGE:
case LONG_SUM:
return ((MetricData.LongPoint) point).getValue();
return ((LongPointData) point).getValue();
case DOUBLE_GAUGE:
return ((MetricData.DoublePoint) point).getValue();
return ((DoublePointData) point).getValue();
case SUMMARY:
return ((MetricData.DoubleSummaryPoint) point).getPercentileValues();
return ((DoubleSummaryPointData) point).getPercentileValues();
case HISTOGRAM:
return ((DoubleHistogramPointData) point).getCounts();
default:
throw new UnsupportedOperationException("Unsupported type " + type);
}

@ -20,8 +20,8 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.List;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.api.metrics.DoubleValueRecorder;
import io.opentelemetry.api.metrics.common.Labels;
public class OpenTelemetryTimer implements LabelledMetric<OperationTimer> {

@ -27,8 +27,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import org.apache.logging.log4j.LogManager;
@ -42,7 +43,7 @@ public class Pipeline<I> {
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Tracer tracer =
OpenTelemetry.getGlobalTracer("org.hyperledger.besu.services.pipeline", "1.0.0");
GlobalOpenTelemetry.getTracer("org.hyperledger.besu.services.pipeline", "1.0.0");
/**
* Flags that the pipeline is being completed so that when we abort we can close the streams
@ -141,7 +142,7 @@ public class Pipeline<I> {
tracer
.spanBuilder(task.getName())
.setAttribute("pipeline", name)
.setSpanKind(Span.Kind.INTERNAL)
.setSpanKind(SpanKind.INTERNAL)
.startSpan();
}
final Thread thread = Thread.currentThread();

Loading…
Cancel
Save