Add tracing support for internals and JSON-RPC (#1557)

* Add tracing support for internals and JSON-RPC

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Remove rocksdb tracing as it slows down execution too much

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Add B3 headers extraction on JSON-RPC requests

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Remove traces around trie tree as they slow down syncing significantly

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Add tracing to fast sync pipeline

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Add tracing for all pipelines

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Address code review

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Add acceptance tests and break out the shaded dependency

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Fix tracer id

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Revert changes to trie

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Upgrade otel to latest, remove old tracing

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>

* Code review comments

Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com>
pull/1745/head
Antoine Toulme 4 years ago committed by GitHub
parent 0905d1b233
commit cd66968b6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java
  2. 10
      acceptance-tests/tests/build.gradle
  3. 171
      acceptance-tests/tests/src/test/java/org/hyperledger/besu/tests/acceptance/OpenTelemetryAcceptanceTest.java
  4. 21
      build.gradle
  5. 6
      docker/graalvm/Dockerfile
  6. 6
      docker/openjdk-11/Dockerfile
  7. 6
      docker/openjdk-latest/Dockerfile
  8. 5
      docs/tracing/README.md
  9. 16
      docs/tracing/docker-compose.yml
  10. 12
      docs/tracing/otel-collector-config.yml
  11. 2
      ethereum/api/build.gradle
  12. 126
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java
  13. 1
      ethereum/core/build.gradle
  14. 119
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java
  15. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  16. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java
  17. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java
  18. 1
      ethereum/trie/build.gradle
  19. 21
      gradle/versions.gradle
  20. 9
      metrics/core/build.gradle
  21. 2
      metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsService.java
  22. 6
      metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsSystemFactory.java
  23. 28
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/MetricsOtelGrpcPushService.java
  24. 4
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryCounter.java
  25. 149
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetrySystem.java
  26. 4
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryTimer.java
  27. 10
      metrics/core/src/test/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryMetricsSystemTest.java
  28. 24
      metrics/core/src/test/java/org/hyperledger/besu/metrics/prometheus/PrometheusMetricsSystemTest.java
  29. 1
      plugins/rocksdb/build.gradle
  30. 1
      services/pipeline/build.gradle
  31. 27
      services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipeline.java
  32. 61
      services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java
  33. 42
      services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java

@ -204,6 +204,10 @@ public class ProcessBesuNodeRunner implements BesuNodeRunner {
params.add("--metrics-category");
params.add(((Enum<?>) category).name());
}
if (node.isMetricsEnabled() || metricsConfiguration.isPushEnabled()) {
params.add("--metrics-protocol");
params.add(metricsConfiguration.getProtocol().name());
}
if (metricsConfiguration.isPushEnabled()) {
params.add("--metrics-push-enabled");
params.add("--metrics-push-host");

@ -24,6 +24,7 @@ dependencies {
testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
testImplementation project(':ethereum:permissioning')
testImplementation project(':ethereum:rlp')
testImplementation project(':metrics:core')
testImplementation project(':plugin-api')
testImplementation project(':privacy-contracts')
testImplementation project(':testutil')
@ -31,6 +32,15 @@ dependencies {
testImplementation 'com.github.tomakehurst:wiremock-jre8-standalone'
testImplementation 'commons-io:commons-io'
testImplementation 'io.grpc:grpc-core'
testImplementation 'io.grpc:grpc-netty'
testImplementation 'io.grpc:grpc-stub'
testImplementation 'io.netty:netty-all'
testImplementation 'io.opentelemetry:opentelemetry-api'
testImplementation 'io.opentelemetry:opentelemetry-proto'
testImplementation 'io.opentelemetry:opentelemetry-sdk'
testImplementation 'io.opentelemetry:opentelemetry-sdk-trace'
testImplementation 'io.opentelemetry:opentelemetry-exporter-otlp'
testImplementation 'junit:junit'
testImplementation 'net.consensys:orion'
testImplementation 'org.apache.commons:commons-compress'

@ -0,0 +1,171 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.tests.acceptance;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.metrics.MetricsProtocol;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import org.hyperledger.besu.tests.acceptance.dsl.AcceptanceTestBase;
import org.hyperledger.besu.tests.acceptance.dsl.WaitUtils;
import org.hyperledger.besu.tests.acceptance.dsl.node.BesuNode;
import org.hyperledger.besu.tests.acceptance.dsl.node.configuration.BesuNodeConfigurationBuilder;
import java.util.ArrayList;
import java.util.List;
import com.google.common.io.Closer;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class OpenTelemetryAcceptanceTest extends AcceptanceTestBase {
private static final class FakeCollector extends TraceServiceGrpc.TraceServiceImplBase {
private final List<ResourceSpans> receivedSpans = new ArrayList<>();
private Status returnedStatus = Status.OK;
@Override
public void export(
final ExportTraceServiceRequest request,
final StreamObserver<ExportTraceServiceResponse> responseObserver) {
receivedSpans.addAll(request.getResourceSpansList());
responseObserver.onNext(ExportTraceServiceResponse.newBuilder().build());
if (!returnedStatus.isOk()) {
if (returnedStatus.getCode() == Status.Code.DEADLINE_EXCEEDED) {
// Do not call onCompleted to simulate a deadline exceeded.
return;
}
responseObserver.onError(returnedStatus.asRuntimeException());
return;
}
responseObserver.onCompleted();
}
List<ResourceSpans> getReceivedSpans() {
return receivedSpans;
}
void setReturnedStatus(final Status returnedStatus) {
this.returnedStatus = returnedStatus;
}
}
private static final class FakeMetricsCollector
extends MetricsServiceGrpc.MetricsServiceImplBase {
private final List<ResourceMetrics> receivedMetrics = new ArrayList<>();
private Status returnedStatus = Status.OK;
@Override
public void export(
final ExportMetricsServiceRequest request,
final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
receivedMetrics.addAll(request.getResourceMetricsList());
responseObserver.onNext(ExportMetricsServiceResponse.newBuilder().build());
if (!returnedStatus.isOk()) {
if (returnedStatus.getCode() == Status.Code.DEADLINE_EXCEEDED) {
// Do not call onCompleted to simulate a deadline exceeded.
return;
}
responseObserver.onError(returnedStatus.asRuntimeException());
return;
}
responseObserver.onCompleted();
}
List<ResourceMetrics> getReceivedMetrics() {
return receivedMetrics;
}
void setReturnedStatus(final Status returnedStatus) {
this.returnedStatus = returnedStatus;
}
}
private final FakeMetricsCollector fakeMetricsCollector = new FakeMetricsCollector();
private final FakeCollector fakeTracesCollector = new FakeCollector();
private final Closer closer = Closer.create();
private BesuNode metricsNode;
@Before
public void setUp() throws Exception {
Server server =
NettyServerBuilder.forPort(4317)
.addService(fakeTracesCollector)
.addService(fakeMetricsCollector)
.build()
.start();
closer.register(server::shutdownNow);
MetricsConfiguration configuration =
MetricsConfiguration.builder()
.protocol(MetricsProtocol.OPENTELEMETRY)
.enabled(true)
.port(0)
.hostsAllowlist(singletonList("*"))
.build();
metricsNode =
besu.create(
new BesuNodeConfigurationBuilder()
.name("metrics-node")
.jsonRpcEnabled()
.metricsConfiguration(configuration)
.build());
cluster.start(metricsNode);
}
@After
public void tearDown() throws Exception {
closer.close();
}
@Test
public void metricsReporting() {
WaitUtils.waitFor(
30,
() -> {
List<ResourceMetrics> resourceMetrics = fakeMetricsCollector.getReceivedMetrics();
assertThat(resourceMetrics.isEmpty()).isFalse();
});
}
@Test
public void traceReporting() {
WaitUtils.waitFor(
30,
() -> {
// call the json RPC endpoint to generate a trace.
net.netVersion().verify(metricsNode);
List<ResourceSpans> spans = fakeTracesCollector.getReceivedSpans();
assertThat(spans.isEmpty()).isFalse();
});
}
}

@ -489,8 +489,6 @@ applicationDefaultJvmArgs = [
run {
args project.hasProperty("besu.run.args") ? project.property("besu.run.args").toString().split("\\s+") : []
doFirst {
applicationDefaultJvmArgs.add(0, "-Dotel.resource.attributes=service.name=besu-dev")
applicationDefaultJvmArgs.add(0, "-javaagent:"+ configurations.javaAgent.singleFile.toPath() + "")
applicationDefaultJvmArgs = applicationDefaultJvmArgs.collect {
it.replace('BESU_HOME', "$buildDir/besu")
}
@ -504,19 +502,6 @@ def tweakStartScript(createScriptTask) {
createScriptTask.unixScript.text = createScriptTask.unixScript.text.replace('BESU_HOME', '\$APP_HOME')
createScriptTask.windowsScript.text = createScriptTask.windowsScript.text.replace('BESU_HOME', '%~dp0..')
// OpenTelemetry Wiring for unix scripts
def agentFileName = configurations.javaAgent.singleFile.toPath().getFileName()
def unixRegex = $/exec "$$JAVACMD" /$
def forwardSlash = "/"
def unixReplacement = $/if [ -n "$$TRACING" ];then
TRACING_AGENT="-javaagent:$$APP_HOME/agent${forwardSlash}${agentFileName}"
fi
exec "$$JAVACMD" $$TRACING_AGENT /$
createScriptTask.unixScript.text = createScriptTask.unixScript.text.replace(unixRegex, unixReplacement)
// OpenTelemetry Wiring for windows scripts
def windowsRegex = $/"%JAVA_EXE%" %DEFAULT_JVM_OPTS%/$
def windowsReplacement = $/if Defined TRACING (TRACING_AGENT="-javaagent:" "%APP_HOME%\agent\/$ + agentFileName + '")\r\n"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %TRACING_AGENT%'
createScriptTask.windowsScript.text = createScriptTask.windowsScript.text.replace(windowsRegex, windowsReplacement)
// Prevent the error originating from the 8191 chars limit on Windows
createScriptTask.windowsScript.text =
@ -790,14 +775,9 @@ tasks.register("verifyDistributions") {
}
}
configurations {
javaAgent
}
dependencies {
implementation project(':besu')
implementation project(':ethereum:evmtool')
javaAgent group: 'io.opentelemetry.instrumentation.auto', name: 'opentelemetry-javaagent', classifier: 'all'
errorprone 'com.google.errorprone:error_prone_core'
}
@ -808,7 +788,6 @@ distributions {
from("build/reports/license/license-dependency.html") { into "." }
from("./docs/GettingStartedBinaries.md") { into "." }
from("./docs/DocsArchive0.8.0.html") { into "." }
from(configurations.javaAgent.singleFile.toPath()) { into "agent"}
}
}
}

@ -21,11 +21,7 @@ ENV BESU_RPC_HTTP_HOST 0.0.0.0
ENV BESU_RPC_WS_HOST 0.0.0.0
ENV BESU_GRAPHQL_HTTP_HOST 0.0.0.0
ENV BESU_PID_PATH "/tmp/pid"
# Tracing defaults
# To enable tracing, uncomment next line
#ENV TRACING=ENABLED
ENV OTEL_EXPORTER=otlp
ENV OTEL_OTLP_ENDPOINT="0.0.0.0:55680"
ENV OTEL_RESOURCE_ATTRIBUTES="service.name=besu-$VERSION"
ENV PATH="/opt/besu/bin:${PATH}"

@ -21,11 +21,7 @@ ENV BESU_RPC_HTTP_HOST 0.0.0.0
ENV BESU_RPC_WS_HOST 0.0.0.0
ENV BESU_GRAPHQL_HTTP_HOST 0.0.0.0
ENV BESU_PID_PATH "/tmp/pid"
# Tracing defaults
# To enable tracing, uncomment next line
#ENV TRACING=ENABLED
ENV OTEL_EXPORTER=otlp
ENV OTEL_OTLP_ENDPOINT="0.0.0.0:55680"
ENV OTEL_RESOURCE_ATTRIBUTES="service.name=besu-$VERSION"
ENV PATH="/opt/besu/bin:${PATH}"

@ -21,11 +21,7 @@ ENV BESU_RPC_HTTP_HOST 0.0.0.0
ENV BESU_RPC_WS_HOST 0.0.0.0
ENV BESU_GRAPHQL_HTTP_HOST 0.0.0.0
ENV BESU_PID_PATH "/tmp/pid"
# Tracing defaults
# To enable tracing, uncomment next line
#ENV TRACING=ENABLED
ENV OTEL_EXPORTER=otlp
ENV OTEL_OTLP_ENDPOINT="0.0.0.0:55680"
ENV OTEL_RESOURCE_ATTRIBUTES="service.name=besu-$VERSION"
ENV PATH="/opt/besu/bin:${PATH}"

@ -10,7 +10,7 @@ To try out this example, start the Open Telemetry Collector and the Zipkin servi
Start besu with:
`$> ./gradlew run --args="--network=dev --rpc-http-enabled"`
`$> OTEL_RESOURCE_ATTRIBUTES="service.name=besu-dev" OTEL_EXPORTER_OTLP_METRIC_INSECURE=true OTEL_EXPORTER_OTLP_SPAN_INSECURE=true ./gradlew run --args="--network=dev --rpc-http-enabled --metrics-enabled --metrics-protocol=opentelemetry"`
Try interacting with the JSON-RPC API. Here is a simple example using cURL:
@ -19,3 +19,6 @@ Try interacting with the JSON-RPC API. Here is a simple example using cURL:
Open the Zipkin UI by browsing to http://localhost:9411/
You will be able to see the detail of your traces.
References:
* [OpenTelemetry Environment Variable Specification](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md)

@ -1,7 +1,8 @@
version: "3"
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.11.0
otelcollector:
image: otel/opentelemetry-collector-contrib:0.17.0
container_name: otelcollector
command: ["--config=/etc/otel-collector-config.yml"]
volumes:
- ./otel-collector-config.yml:/etc/otel-collector-config.yml
@ -10,13 +11,20 @@ services:
- "8888:8888" # Prometheus metrics exposed by the collector
- "8889:8889" # Prometheus exporter metrics
- "13133:13133" # health_check extension
- "55680:55680" # zpages extension
- "4317:4317" # OTLP GRPC receiver
depends_on:
- zipkin
- metricsviewer
#Zipkin
# Zipkin
zipkin:
image: openzipkin/zipkin
container_name: zipkin
ports:
- 9411:9411
metricsviewer:
image: docker.io/tmio/metrics-ui
container_name: metricsviewer
ports:
- 8080:8080

@ -2,11 +2,17 @@ receivers:
otlp:
protocols:
grpc:
http:
exporters:
zipkin:
endpoint: "http://zipkin:9411/api/v2/spans"
otlp:
endpoint: "metricsviewer:4317"
insecure: true
logging:
loglevel: debug
sampling_initial: 5
sampling_thereafter: 200
processors:
batch:
@ -23,3 +29,7 @@ service:
receivers: [otlp]
exporters: [zipkin]
processors: [batch]
metrics:
receivers: [otlp]
exporters: [otlp, logging]
processors: [batch]

@ -49,6 +49,8 @@ dependencies {
implementation 'com.google.guava:guava'
implementation 'com.graphql-java:graphql-java'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.opentelemetry:opentelemetry-extension-trace-propagators'
implementation 'io.vertx:vertx-auth-jwt'
implementation 'io.vertx:vertx-core'
implementation 'io.vertx:vertx-unit'

@ -61,11 +61,19 @@ import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
@ -75,12 +83,14 @@ import io.vertx.core.http.ClientAuth;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
@ -93,17 +103,36 @@ public class JsonRpcHttpService {
private static final Logger LOG = LogManager.getLogger();
private static final String SPAN_CONTEXT = "span_context";
private static final InetSocketAddress EMPTY_SOCKET_ADDRESS = new InetSocketAddress("0.0.0.0", 0);
private static final String APPLICATION_JSON = "application/json";
private static final JsonRpcResponse NO_RESPONSE = new JsonRpcNoResponse();
private static final String EMPTY_RESPONSE = "";
private static final TextMapPropagator.Getter<HttpServerRequest> requestAttributesGetter =
new TextMapPropagator.Getter<>() {
@Override
public Iterable<String> keys(final HttpServerRequest carrier) {
return carrier.headers().names();
}
@Nullable
@Override
public String get(final @Nullable HttpServerRequest carrier, final String key) {
if (carrier == null) {
return null;
}
return carrier.headers().get(key);
}
};
private final Vertx vertx;
private final JsonRpcConfiguration config;
private final Map<String, JsonRpcMethod> rpcMethods;
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private final Tracer tracer;
@VisibleForTesting public final Optional<AuthenticationService> authenticationService;
@ -169,6 +198,7 @@ public class JsonRpcHttpService {
this.authenticationService = authenticationService;
this.livenessService = livenessService;
this.readinessService = readinessService;
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
}
private void validateConfig(final JsonRpcConfiguration config) {
@ -229,6 +259,7 @@ public class JsonRpcHttpService {
private Router buildRouter() {
// Handle json rpc requests
final Router router = Router.router(vertx);
router.route().handler(this::createSpan);
// Verify Host header to avoid rebind attack.
router.route().handler(checkAllowlistHostHeader());
@ -279,6 +310,32 @@ public class JsonRpcHttpService {
return router;
}
private void createSpan(final RoutingContext routingContext) {
final SocketAddress address = routingContext.request().connection().remoteAddress();
Context parent =
B3Propagator.getInstance()
.extract(Context.current(), routingContext.request(), requestAttributesGetter);
final Span serverSpan =
tracer
.spanBuilder(address.host() + ":" + address.port())
.setParent(parent)
.setSpanKind(Span.Kind.SERVER)
.startSpan();
routingContext.put(SPAN_CONTEXT, Context.root().with(serverSpan));
routingContext.addBodyEndHandler(event -> serverSpan.end());
routingContext.addEndHandler(
event -> {
if (event.failed()) {
serverSpan.recordException(event.cause());
serverSpan.setStatus(StatusCode.ERROR);
}
serverSpan.end();
});
routingContext.next();
}
private HttpServerOptions getHttpServerOptions() {
final HttpServerOptions httpServerOptions =
new HttpServerOptions()
@ -590,40 +647,55 @@ public class JsonRpcHttpService {
} catch (final IllegalArgumentException exception) {
return errorResponse(id, INVALID_REQUEST);
}
// Handle notifications
if (requestBody.isNotification()) {
// Notifications aren't handled so create empty result for now.
return NO_RESPONSE;
}
Span span =
tracer
.spanBuilder(requestBody.getMethod())
.setSpanKind(Span.Kind.INTERNAL)
.setParent(ctx.get(SPAN_CONTEXT))
.startSpan();
try {
// Handle notifications
if (requestBody.isNotification()) {
// Notifications aren't handled so create empty result for now.
return NO_RESPONSE;
}
final Optional<JsonRpcError> unavailableMethod = validateMethodAvailability(requestBody);
if (unavailableMethod.isPresent()) {
return errorResponse(id, unavailableMethod.get());
}
final Optional<JsonRpcError> unavailableMethod = validateMethodAvailability(requestBody);
if (unavailableMethod.isPresent()) {
span.setStatus(StatusCode.ERROR, "method unavailable");
return errorResponse(id, unavailableMethod.get());
}
final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod());
final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod());
if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
// Generate response
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(requestBody.getMethod()).startTimer()) {
if (user.isPresent()) {
if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
// Generate response
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(requestBody.getMethod()).startTimer()) {
if (user.isPresent()) {
return method.response(
new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed()));
}
return method.response(
new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed()));
new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed()));
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
span.setStatus(StatusCode.ERROR, "Invalid Params");
return errorResponse(id, JsonRpcError.INVALID_PARAMS);
} catch (final MultiTenancyValidationException e) {
span.setStatus(StatusCode.ERROR, "Unauthorized");
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} catch (final RuntimeException e) {
LOG.error("Error processing JSON-RPC requestBody", e);
span.setStatus(StatusCode.ERROR, "Error processing JSON-RPC requestBody");
return errorResponse(id, JsonRpcError.INTERNAL_ERROR);
}
return method.response(
new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed()));
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
return errorResponse(id, JsonRpcError.INVALID_PARAMS);
} catch (final MultiTenancyValidationException e) {
} else {
span.setStatus(StatusCode.ERROR, "Unauthorized");
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} catch (final RuntimeException e) {
LOG.error("Error processing JSON-RPC requestBody", e);
return errorResponse(id, JsonRpcError.INTERNAL_ERROR);
}
} else {
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} finally {
span.end();
}
}

@ -41,6 +41,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.google.guava:guava'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.vertx:vertx-core'
implementation 'net.java.dev.jna:jna'
implementation 'org.apache.logging.log4j:log4j-api'

@ -34,6 +34,9 @@ import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -50,6 +53,9 @@ public abstract class AbstractBlockProcessor implements BlockProcessor {
private static final Logger LOG = LogManager.getLogger();
private static final Tracer tracer =
OpenTelemetry.getGlobalTracer("org.hyperledger.besu.block", "1.0.0");
static final int MAX_GENERATION = 6;
public static class Result implements BlockProcessor.Result {
@ -121,66 +127,71 @@ public abstract class AbstractBlockProcessor implements BlockProcessor {
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final PrivateMetadataUpdater privateMetadataUpdater) {
final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
for (final Transaction transaction : transactions) {
final long remainingGasBudget = blockHeader.getGasLimit() - currentGasUsed;
if (!gasBudgetCalculator.hasBudget(
transaction, blockHeader.getNumber(), blockHeader.getGasLimit(), currentGasUsed)) {
LOG.info(
"Block processing error: transaction gas limit {} exceeds available block budget"
+ " remaining {}. Block {} Transaction {}",
transaction.getGasLimit(),
remainingGasBudget,
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
return AbstractBlockProcessor.Result.failed();
Span globalProcessBlock =
tracer.spanBuilder("processBlock").setSpanKind(Span.Kind.INTERNAL).startSpan();
try {
final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
for (final Transaction transaction : transactions) {
final long remainingGasBudget = blockHeader.getGasLimit() - currentGasUsed;
if (!gasBudgetCalculator.hasBudget(
transaction, blockHeader.getNumber(), blockHeader.getGasLimit(), currentGasUsed)) {
LOG.info(
"Block processing error: transaction gas limit {} exceeds available block budget"
+ " remaining {}. Block {} Transaction {}",
transaction.getGasLimit(),
remainingGasBudget,
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
return AbstractBlockProcessor.Result.failed();
}
final WorldUpdater worldStateUpdater = worldState.updater();
final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain);
final Address miningBeneficiary =
miningBeneficiaryCalculator.calculateBeneficiary(blockHeader);
final TransactionProcessingResult result =
transactionProcessor.processTransaction(
blockchain,
worldStateUpdater,
blockHeader,
transaction,
miningBeneficiary,
OperationTracer.NO_TRACING,
blockHashLookup,
true,
TransactionValidationParams.processingBlock(),
privateMetadataUpdater);
if (result.isInvalid()) {
LOG.info(
"Block processing error: transaction invalid '{}'. Block {} Transaction {}",
result.getValidationResult().getInvalidReason(),
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
return AbstractBlockProcessor.Result.failed();
}
worldStateUpdater.commit();
currentGasUsed += transaction.getGasLimit() - result.getGasRemaining();
final TransactionReceipt transactionReceipt =
transactionReceiptFactory.create(
transaction.getType(), result, worldState, currentGasUsed);
receipts.add(transactionReceipt);
}
final WorldUpdater worldStateUpdater = worldState.updater();
final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain);
final Address miningBeneficiary =
miningBeneficiaryCalculator.calculateBeneficiary(blockHeader);
final TransactionProcessingResult result =
transactionProcessor.processTransaction(
blockchain,
worldStateUpdater,
blockHeader,
transaction,
miningBeneficiary,
OperationTracer.NO_TRACING,
blockHashLookup,
true,
TransactionValidationParams.processingBlock(),
privateMetadataUpdater);
if (result.isInvalid()) {
LOG.info(
"Block processing error: transaction invalid '{}'. Block {} Transaction {}",
result.getValidationResult().getInvalidReason(),
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) {
// no need to log, rewardCoinbase logs the error.
return AbstractBlockProcessor.Result.failed();
}
worldStateUpdater.commit();
currentGasUsed += transaction.getGasLimit() - result.getGasRemaining();
final TransactionReceipt transactionReceipt =
transactionReceiptFactory.create(
transaction.getType(), result, worldState, currentGasUsed);
receipts.add(transactionReceipt);
}
if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) {
// no need to log, rewardCoinbase logs the error.
return AbstractBlockProcessor.Result.failed();
worldState.persist(blockHeader.getHash());
return AbstractBlockProcessor.Result.successful(receipts);
} finally {
globalProcessBlock.end();
}
worldState.persist(blockHeader.getHash());
return AbstractBlockProcessor.Result.successful(receipts);
}
protected MiningBeneficiaryCalculator getMiningBeneficiaryCalculator() {

@ -144,7 +144,9 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
"chain_download_pipeline_processed_total",
"Number of entries process by each chain download pipeline stage",
"step",
"action"))
"action"),
true,
"fastSync")
.thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism)
.thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize)
.inBatches(headerRequestSize)

@ -101,7 +101,9 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
"chain_download_pipeline_processed_total",
"Number of entries process by each chain download pipeline stage",
"step",
"action"))
"action"),
true,
"fullSync")
.thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism)
.thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize)
.inBatches(headerRequestSize)

@ -206,7 +206,7 @@ public class WorldStateDownloadProcess {
final Pipeline<Task<NodeDataRequest>> completionPipeline =
PipelineBuilder.<Task<NodeDataRequest>>createPipeline(
"requestDataAvailable", bufferCapacity, outputCounter)
"requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request")
.andFinishWith(
"requestCompleteTask",
task ->
@ -219,7 +219,9 @@ public class WorldStateDownloadProcess {
"requestDequeued",
new TaskQueueIterator(downloadState),
bufferCapacity,
outputCounter)
outputCounter,
true,
"world_state_download")
.thenFlatMapInParallel(
"requestLoadLocalData",
task -> loadLocalDataStep.loadLocalData(task, requestsToComplete),

@ -33,6 +33,7 @@ dependencies {
implementation project(':services:kvstore')
implementation 'com.google.guava:guava'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'org.apache.tuweni:bytes'
implementation 'org.bouncycastle:bcprov-jdk15on'

@ -47,18 +47,25 @@ dependencyManagement {
dependency 'info.picocli:picocli:4.5.0'
dependency 'io.grpc:grpc-netty:1.33.0'
dependency 'io.grpc:grpc-core:1.33.1'
dependency 'io.grpc:grpc-netty:1.33.1'
dependency 'io.grpc:grpc-stub:1.33.1'
dependency 'io.netty:netty-tcnative-boringssl-static:2.0.31.Final'
dependency group: 'io.netty', name: 'netty-transport-native-epoll', version:'4.1.54.Final', classifier: 'linux-x86_64'
dependency 'io.netty:netty-all:4.1.54.Final'
dependency 'io.kubernetes:client-java:5.0.0'
dependency 'io.pkts:pkts-core:3.0.7'
dependency group: 'io.opentelemetry.instrumentation.auto', name: 'opentelemetry-javaagent', version: '0.8.0', classifier: 'all'
dependency 'io.opentelemetry:opentelemetry-api:0.9.1'
dependency 'io.opentelemetry:opentelemetry-sdk:0.9.1'
dependency 'io.opentelemetry:opentelemetry-exporters-otlp:0.9.1'
dependency 'io.opentelemetry:opentelemetry-extension-runtime-metrics:0.9.1'
dependency 'io.opentelemetry:opentelemetry-api:0.13.1'
dependency 'io.opentelemetry:opentelemetry-proto:0.13.1'
dependency 'io.opentelemetry:opentelemetry-sdk:0.13.1'
dependency 'io.opentelemetry:opentelemetry-sdk-trace:0.13.1'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp:0.13.1'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp-metrics:0.13.1-alpha'
dependency 'io.opentelemetry:opentelemetry-extension-trace-propagators:0.13.1'
dependency 'io.prometheus:simpleclient:0.9.0'
dependency 'io.prometheus:simpleclient_common:0.9.0'

@ -39,10 +39,15 @@ dependencies {
implementation 'com.google.guava:guava'
implementation 'io.grpc:grpc-netty'
implementation 'io.grpc:grpc-core'
implementation 'io.netty:netty-tcnative-boringssl-static'
implementation 'io.netty:netty-transport-native-epoll'
implementation 'io.netty:netty-all'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.opentelemetry:opentelemetry-sdk'
implementation 'io.opentelemetry:opentelemetry-extension-runtime-metrics'
implementation 'io.opentelemetry:opentelemetry-exporters-otlp'
implementation 'io.opentelemetry:opentelemetry-sdk-trace'
implementation 'io.opentelemetry:opentelemetry-exporter-otlp'
implementation 'io.opentelemetry:opentelemetry-exporter-otlp-metrics'
implementation 'io.prometheus:simpleclient'
implementation 'io.prometheus:simpleclient_common'

@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import io.vertx.core.Vertx;
import org.apache.logging.log4j.LogManager;
/**
* Service responsible for exposing metrics to the outside, either through a port and network
@ -36,6 +37,7 @@ public interface MetricsService {
final Vertx vertx,
final MetricsConfiguration configuration,
final MetricsSystem metricsSystem) {
LogManager.getLogger().trace("Creating metrics service {}", configuration.getProtocol());
if (configuration.getProtocol() == MetricsProtocol.PROMETHEUS) {
if (configuration.isEnabled()) {
return Optional.of(new MetricsHttpService(vertx, configuration, metricsSystem));

@ -22,9 +22,14 @@ import org.hyperledger.besu.metrics.opentelemetry.OpenTelemetrySystem;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/** Creates a new metric system based on configuration. */
public class MetricsSystemFactory {
private static final Logger LOG = LogManager.getLogger();
private MetricsSystemFactory() {}
/**
@ -34,6 +39,7 @@ public class MetricsSystemFactory {
* @return a new metric system
*/
public static ObservableMetricsSystem create(final MetricsConfiguration metricsConfiguration) {
LOG.trace("Creating a metric system with {}", metricsConfiguration.getProtocol());
if (!metricsConfiguration.isEnabled() && !metricsConfiguration.isPushEnabled()) {
return new NoOpMetricsSystem();
}

@ -22,23 +22,35 @@ import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import io.opentelemetry.exporters.otlp.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MetricsOtelGrpcPushService implements MetricsService {
private static final Logger LOG = LogManager.getLogger();
private final MetricsConfiguration configuration;
private final OpenTelemetrySystem metricsSystem;
private IntervalMetricReader periodicReader;
private SpanProcessor spanProcessor;
public MetricsOtelGrpcPushService(
final MetricsConfiguration configuration, final OpenTelemetrySystem metricsSystem) {
this.configuration = configuration;
this.metricsSystem = metricsSystem;
}
@Override
public CompletableFuture<?> start() {
LOG.info("Starting OpenTelemetry push service");
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.getDefault();
IntervalMetricReader.Builder builder =
IntervalMetricReader.builder()
@ -49,6 +61,14 @@ public class MetricsOtelGrpcPushService implements MetricsService {
Collections.singleton(metricsSystem.getMeterSdkProvider().getMetricProducer()))
.setMetricExporter(exporter);
this.periodicReader = builder.build();
this.spanProcessor =
BatchSpanProcessor.builder(
OtlpGrpcSpanExporter.builder()
.readSystemProperties()
.readEnvironmentVariables()
.build())
.build();
OpenTelemetrySdk.get().getTracerManagement().addSpanProcessor(spanProcessor);
return CompletableFuture.completedFuture(null);
}
@ -57,6 +77,12 @@ public class MetricsOtelGrpcPushService implements MetricsService {
if (periodicReader != null) {
periodicReader.shutdown();
}
if (spanProcessor != null) {
CompletableResultCode result = spanProcessor.shutdown();
CompletableFuture<?> future = new CompletableFuture<>();
result.whenComplete(() -> future.complete(null));
return future;
}
return CompletableFuture.completedFuture(null);
}

@ -20,8 +20,8 @@ import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import java.util.ArrayList;
import java.util.List;
import io.opentelemetry.common.Labels;
import io.opentelemetry.metrics.LongCounter;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.api.metrics.LongCounter;
public class OpenTelemetryCounter implements LabelledMetric<Counter> {

@ -40,21 +40,23 @@ import java.util.function.DoubleSupplier;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableSet;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.common.Labels;
import io.opentelemetry.metrics.DoubleValueObserver;
import io.opentelemetry.metrics.DoubleValueRecorder;
import io.opentelemetry.metrics.LongCounter;
import io.opentelemetry.metrics.LongSumObserver;
import io.opentelemetry.metrics.LongUpDownSumObserver;
import io.opentelemetry.metrics.Meter;
import io.opentelemetry.sdk.metrics.MeterSdkProvider;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.api.metrics.DoubleValueRecorder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.resources.ResourceAttributes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/** Metrics system relying on the native OpenTelemetry format. */
public class OpenTelemetrySystem implements ObservableMetricsSystem {
private static final Logger LOG = LogManager.getLogger();
private static final String TYPE_LABEL_KEY = "type";
private static final String AREA_LABEL_KEY = "area";
private static final String POOL_LABEL_KEY = "pool";
@ -69,25 +71,24 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
private final Map<String, LabelledMetric<Counter>> cachedCounters = new ConcurrentHashMap<>();
private final Map<String, LabelledMetric<OperationTimer>> cachedTimers =
new ConcurrentHashMap<>();
private final MeterSdkProvider meterSdkProvider;
private final SdkMeterProvider meterSdkProvider;
public OpenTelemetrySystem(
final Set<MetricCategory> enabledCategories,
final boolean timersEnabled,
final String jobName) {
LOG.info("Starting OpenTelemetry metrics system");
this.enabledCategories = ImmutableSet.copyOf(enabledCategories);
this.timersEnabled = timersEnabled;
Resource resource =
Resource.getDefault()
.merge(
Resource.create(
Attributes.newBuilder()
.setAttribute(ResourceAttributes.SERVICE_NAME, jobName)
.build()));
this.meterSdkProvider = MeterSdkProvider.builder().setResource(resource).build();
Attributes.builder().put(ResourceAttributes.SERVICE_NAME, jobName).build()));
this.meterSdkProvider = SdkMeterProvider.builder().setResource(resource).build();
}
MeterSdkProvider getMeterSdkProvider() {
SdkMeterProvider getMeterSdkProvider() {
return meterSdkProvider;
}
@ -132,14 +133,13 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
private Object extractValue(final MetricData.Type type, final MetricData.Point point) {
switch (type) {
case NON_MONOTONIC_LONG:
case MONOTONIC_LONG:
case LONG_GAUGE:
case LONG_SUM:
return ((MetricData.LongPoint) point).getValue();
case NON_MONOTONIC_DOUBLE:
case MONOTONIC_DOUBLE:
case DOUBLE_GAUGE:
return ((MetricData.DoublePoint) point).getValue();
case SUMMARY:
return ((MetricData.SummaryPoint) point).getPercentileValues();
return ((MetricData.DoubleSummaryPoint) point).getPercentileValues();
default:
throw new UnsupportedOperationException("Unsupported type " + type);
}
@ -151,6 +151,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
final String name,
final String help,
final String... labelNames) {
LOG.trace("Creating a counter {}", name);
return cachedCounters.computeIfAbsent(
name,
(k) -> {
@ -171,6 +172,7 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
final String name,
final String help,
final String... labelNames) {
LOG.trace("Creating a timer {}", name);
return cachedTimers.computeIfAbsent(
name,
(k) -> {
@ -192,11 +194,17 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
final String name,
final String help,
final DoubleSupplier valueSupplier) {
LOG.trace("Creating a gauge {}", name);
if (isCategoryEnabled(category)) {
final Meter meter = meterSdkProvider.get(category.getName());
DoubleValueObserver observer =
meter.doubleValueObserverBuilder(name).setDescription(help).build();
observer.setCallback(result -> result.observe(valueSupplier.getAsDouble(), Labels.empty()));
meter
.doubleValueObserverBuilder(name)
.setDescription(help)
.setUpdater(
res -> {
res.observe(valueSupplier.getAsDouble(), Labels.empty());
})
.build();
}
}
@ -217,31 +225,22 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
final List<MemoryPoolMXBean> poolBeans = ManagementFactory.getMemoryPoolMXBeans();
final Meter meter = meterSdkProvider.get(StandardMetricCategory.JVM.getName());
final LongSumObserver gcMetric =
meter
.longSumObserverBuilder("jvm.gc.collection")
.setDescription("Time spent in a given JVM garbage collector in milliseconds.")
.setUnit("ms")
.build();
final List<Labels> labelSets = new ArrayList<>(garbageCollectors.size());
for (final GarbageCollectorMXBean gc : garbageCollectors) {
labelSets.add(Labels.of("gc", gc.getName()));
}
gcMetric.setCallback(
resultLongObserver -> {
for (int i = 0; i < garbageCollectors.size(); i++) {
resultLongObserver.observe(
garbageCollectors.get(i).getCollectionTime(), labelSets.get(i));
}
});
final LongUpDownSumObserver areaMetric =
meter
.longUpDownSumObserverBuilder("jvm.memory.area")
.setDescription("Bytes of a given JVM memory area.")
.setUnit("By")
.build();
meter
.longSumObserverBuilder("jvm.gc.collection")
.setDescription("Time spent in a given JVM garbage collector in milliseconds.")
.setUnit("ms")
.setUpdater(
resultLongObserver -> {
for (int i = 0; i < garbageCollectors.size(); i++) {
resultLongObserver.observe(
garbageCollectors.get(i).getCollectionTime(), labelSets.get(i));
}
})
.build();
final Labels usedHeap = Labels.of(TYPE_LABEL_KEY, USED, AREA_LABEL_KEY, HEAP);
final Labels usedNonHeap = Labels.of(TYPE_LABEL_KEY, USED, AREA_LABEL_KEY, NON_HEAP);
final Labels committedHeap = Labels.of(TYPE_LABEL_KEY, COMMITTED, AREA_LABEL_KEY, HEAP);
@ -249,24 +248,22 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
// TODO: Decide if max is needed or not. May be derived with some approximation from max(used).
final Labels maxHeap = Labels.of(TYPE_LABEL_KEY, MAX, AREA_LABEL_KEY, HEAP);
final Labels maxNonHeap = Labels.of(TYPE_LABEL_KEY, MAX, AREA_LABEL_KEY, NON_HEAP);
areaMetric.setCallback(
resultLongObserver -> {
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage();
resultLongObserver.observe(heapUsage.getUsed(), usedHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), usedNonHeap);
resultLongObserver.observe(heapUsage.getUsed(), committedHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), committedNonHeap);
resultLongObserver.observe(heapUsage.getUsed(), maxHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), maxNonHeap);
});
final LongUpDownSumObserver poolMetric =
meter
.longUpDownSumObserverBuilder("jvm.memory.pool")
.setDescription("Bytes of a given JVM memory pool.")
.setUnit("By")
.build();
meter
.longUpDownSumObserverBuilder("jvm.memory.area")
.setDescription("Bytes of a given JVM memory area.")
.setUnit("By")
.setUpdater(
resultLongObserver -> {
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage();
resultLongObserver.observe(heapUsage.getUsed(), usedHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), usedNonHeap);
resultLongObserver.observe(heapUsage.getUsed(), committedHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), committedNonHeap);
resultLongObserver.observe(heapUsage.getUsed(), maxHeap);
resultLongObserver.observe(nonHeapUsage.getUsed(), maxNonHeap);
})
.build();
final List<Labels> usedLabelSets = new ArrayList<>(poolBeans.size());
final List<Labels> committedLabelSets = new ArrayList<>(poolBeans.size());
final List<Labels> maxLabelSets = new ArrayList<>(poolBeans.size());
@ -275,16 +272,22 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem {
committedLabelSets.add(Labels.of(TYPE_LABEL_KEY, COMMITTED, POOL_LABEL_KEY, pool.getName()));
maxLabelSets.add(Labels.of(TYPE_LABEL_KEY, MAX, POOL_LABEL_KEY, pool.getName()));
}
poolMetric.setCallback(
resultLongObserver -> {
for (int i = 0; i < poolBeans.size(); i++) {
MemoryUsage poolUsage = poolBeans.get(i).getUsage();
resultLongObserver.observe(poolUsage.getUsed(), usedLabelSets.get(i));
resultLongObserver.observe(poolUsage.getCommitted(), committedLabelSets.get(i));
// TODO: Decide if max is needed or not. May be derived with some approximation from
// max(used).
resultLongObserver.observe(poolUsage.getMax(), maxLabelSets.get(i));
}
});
meter
.longUpDownSumObserverBuilder("jvm.memory.pool")
.setDescription("Bytes of a given JVM memory pool.")
.setUnit("By")
.setUpdater(
resultLongObserver -> {
for (int i = 0; i < poolBeans.size(); i++) {
MemoryUsage poolUsage = poolBeans.get(i).getUsage();
resultLongObserver.observe(poolUsage.getUsed(), usedLabelSets.get(i));
resultLongObserver.observe(poolUsage.getCommitted(), committedLabelSets.get(i));
// TODO: Decide if max is needed or not. May be derived with some approximation from
// max(used).
resultLongObserver.observe(poolUsage.getMax(), maxLabelSets.get(i));
}
})
.build();
}
}

@ -20,8 +20,8 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.List;
import io.opentelemetry.common.Labels;
import io.opentelemetry.metrics.DoubleValueRecorder;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.api.metrics.DoubleValueRecorder;
public class OpenTelemetryTimer implements LabelledMetric<OperationTimer> {

@ -34,13 +34,10 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import com.google.common.collect.ImmutableSet;
import io.opentelemetry.sdk.metrics.data.MetricData;
import org.junit.Test;
public class OpenTelemetryMetricsSystemTest {
@ -174,13 +171,10 @@ public class OpenTelemetryMetricsSystemTest {
.build();
final ObservableMetricsSystem localMetricSystem =
MetricsSystemFactory.create(metricsConfiguration);
localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7d);
List<MetricData.ValueAtPercentile> values = new ArrayList<>();
values.add(MetricData.ValueAtPercentile.create(0, 7d));
values.add(MetricData.ValueAtPercentile.create(100, 7d));
localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7.0);
assertThat(localMetricSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(RPC, "myValue", values, emptyList()));
.containsExactlyInAnyOrder(new Observation(RPC, "myValue", 7.0, emptyList()));
}
@Test

@ -57,11 +57,11 @@ public class PrometheusMetricsSystemTest {
counter.inc();
assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 1d, emptyList()));
.containsExactly(new Observation(PEERS, "connected", 1.0, emptyList()));
counter.inc();
assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 2d, emptyList()));
.containsExactly(new Observation(PEERS, "connected", 2.0, emptyList()));
}
@Test
@ -74,11 +74,11 @@ public class PrometheusMetricsSystemTest {
counter1.labels().inc();
assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 1d, emptyList()));
.containsExactly(new Observation(PEERS, "connected", 1.0, emptyList()));
counter2.labels().inc();
assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 2d, emptyList()));
.containsExactly(new Observation(PEERS, "connected", 2.0, emptyList()));
}
@Test
@ -92,8 +92,8 @@ public class PrometheusMetricsSystemTest {
assertThat(metricsSystem.streamObservations())
.containsExactlyInAnyOrder(
new Observation(PEERS, "connected", 2d, singletonList("value1")),
new Observation(PEERS, "connected", 1d, singletonList("value2")));
new Observation(PEERS, "connected", 2.0, singletonList("value1")),
new Observation(PEERS, "connected", 1.0, singletonList("value2")));
}
@Test
@ -102,11 +102,11 @@ public class PrometheusMetricsSystemTest {
counter.inc(5);
assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 5d, emptyList()));
.containsExactly(new Observation(PEERS, "connected", 5.0, emptyList()));
counter.inc(6);
assertThat(metricsSystem.streamObservations())
.containsExactly(new Observation(PEERS, "connected", 11d, emptyList()));
.containsExactly(new Observation(PEERS, "connected", 11.0, emptyList()));
}
@Test
@ -174,10 +174,10 @@ public class PrometheusMetricsSystemTest {
@Test
public void shouldCreateObservationFromGauge() {
metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d);
metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0);
assertThat(metricsSystem.streamObservations())
.containsExactlyInAnyOrder(new Observation(JVM, "myValue", 7d, emptyList()));
.containsExactlyInAnyOrder(new Observation(JVM, "myValue", 7.0, emptyList()));
}
@Test
@ -185,8 +185,8 @@ public class PrometheusMetricsSystemTest {
// Gauges have a reference to the source of their data so creating it twice will still only
// pull data from the first instance, possibly leaking memory and likely returning the wrong
// results.
metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d);
assertThatThrownBy(() -> metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d))
metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0);
assertThatThrownBy(() -> metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0))
.isInstanceOf(IllegalArgumentException.class);
}

@ -41,6 +41,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.google.guava:guava'
implementation 'info.picocli:picocli'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.prometheus:simpleclient'
implementation 'org.apache.logging.log4j:log4j-api'
implementation 'org.apache.tuweni:bytes'

@ -33,6 +33,7 @@ dependencies {
implementation project(':metrics:core')
implementation project(':plugin-api')
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'com.google.guava:guava'
implementation 'org.apache.logging.log4j:log4j-api'

@ -27,6 +27,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -37,6 +41,8 @@ public class Pipeline<I> {
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Tracer tracer =
OpenTelemetry.getGlobalTracer("org.hyperledger.besu.services.pipeline", "1.0.0");
/**
* Flags that the pipeline is being completed so that when we abort we can close the streams
@ -47,14 +53,20 @@ public class Pipeline<I> {
private final AtomicBoolean completing = new AtomicBoolean(false);
private final CompletableFuture<Void> overallFuture = new CompletableFuture<>();
private final String name;
private final boolean tracingEnabled;
private volatile List<Future<?>> futures;
Pipeline(
final Pipe<I> inputPipe,
final String name,
final boolean tracingEnabled,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
this.inputPipe = inputPipe;
this.tracingEnabled = tracingEnabled;
this.name = name;
this.stages = stages;
this.pipes = pipes;
this.completerStage = completerStage;
@ -123,12 +135,24 @@ public class Pipeline<I> {
private Future<?> runWithErrorHandling(final ExecutorService executorService, final Stage task) {
return executorService.submit(
() -> {
Span taskSpan = null;
if (tracingEnabled) {
taskSpan =
tracer
.spanBuilder(task.getName())
.setAttribute("pipeline", name)
.setSpanKind(Span.Kind.INTERNAL)
.startSpan();
}
final Thread thread = Thread.currentThread();
final String originalName = thread.getName();
try {
thread.setName(originalName + " (" + task.getName() + ")");
task.run();
} catch (final Throwable t) {
if (tracingEnabled) {
taskSpan.setStatus(StatusCode.ERROR);
}
LOG.debug("Unhandled exception in pipeline. Aborting.", t);
try {
abort(t);
@ -139,6 +163,9 @@ public class Pipeline<I> {
LOG.error("Failed to abort pipeline after error", t2);
}
} finally {
if (tracingEnabled) {
taskSpan.end();
}
thread.setName(originalName);
}
});

@ -51,6 +51,8 @@ public class PipelineBuilder<I, T> {
private final ReadPipe<T> pipeEnd;
private final int bufferSize;
private final LabelledMetric<Counter> outputCounter;
private final boolean tracingEnabled;
private final String pipelineName;
public PipelineBuilder(
final Pipe<I> inputPipe,
@ -59,7 +61,9 @@ public class PipelineBuilder<I, T> {
final String lastStageName,
final ReadPipe<T> pipeEnd,
final int bufferSize,
final LabelledMetric<Counter> outputCounter) {
final LabelledMetric<Counter> outputCounter,
final boolean tracingEnabled,
final String pipelineName) {
checkArgument(!pipes.isEmpty(), "Must have at least one pipe in a pipeline");
this.lastStageName = lastStageName;
this.outputCounter = outputCounter;
@ -68,6 +72,8 @@ public class PipelineBuilder<I, T> {
this.pipes = pipes;
this.pipeEnd = pipeEnd;
this.bufferSize = bufferSize;
this.tracingEnabled = tracingEnabled;
this.pipelineName = pipelineName;
}
/**
@ -81,17 +87,29 @@ public class PipelineBuilder<I, T> {
* @param itemCounter the counter to increment for each output of a stage. Must accept two labels,
* the stage name and action (output or drained).
* @param <T> the type of items input into the pipeline.
* @param tracingEnabled whether this pipeline should be traced
* @param pipelineName the name of the pipeline for tracing purposes
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T, T> createPipelineFrom(
final String sourceName,
final Iterator<T> source,
final int bufferSize,
final LabelledMetric<Counter> itemCounter) {
final LabelledMetric<Counter> itemCounter,
final boolean tracingEnabled,
final String pipelineName) {
final Pipe<T> pipe = createPipe(bufferSize, sourceName, itemCounter);
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(sourceName, source, pipe);
return new PipelineBuilder<>(
pipe, singleton(sourceStage), singleton(pipe), sourceName, pipe, bufferSize, itemCounter);
pipe,
singleton(sourceStage),
singleton(pipe),
sourceName,
pipe,
bufferSize,
itemCounter,
tracingEnabled,
pipelineName);
}
/**
@ -103,13 +121,27 @@ public class PipelineBuilder<I, T> {
* @param outputCounter the counter to increment for each output of a stage. Must have a single
* label which will be filled with the stage name.
* @param <T> the type of items input into the pipeline.
* @param tracingEnabled whether this pipeline should be traced
* @param pipelineName the name of the pipeline for tracing purposes
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T, T> createPipeline(
final String sourceName, final int bufferSize, final LabelledMetric<Counter> outputCounter) {
final String sourceName,
final int bufferSize,
final LabelledMetric<Counter> outputCounter,
final boolean tracingEnabled,
final String pipelineName) {
final Pipe<T> pipe = createPipe(bufferSize, sourceName, outputCounter);
return new PipelineBuilder<>(
pipe, emptyList(), singleton(pipe), sourceName, pipe, bufferSize, outputCounter);
pipe,
emptyList(),
singleton(pipe),
sourceName,
pipe,
bufferSize,
outputCounter,
tracingEnabled,
pipelineName);
}
/**
@ -216,7 +248,9 @@ public class PipelineBuilder<I, T> {
maximumBatchSize,
outputCounter.labels(lastStageName + "_outputPipe", "batches")),
(int) Math.ceil(((double) bufferSize) / maximumBatchSize),
outputCounter);
outputCounter,
tracingEnabled,
pipelineName);
}
/**
@ -274,7 +308,12 @@ public class PipelineBuilder<I, T> {
*/
public Pipeline<I> andFinishWith(final String stageName, final Consumer<T> completer) {
return new Pipeline<>(
inputPipe, stages, pipes, new CompleterStage<>(stageName, pipeEnd, completer));
inputPipe,
pipelineName,
tracingEnabled,
stages,
pipes,
new CompleterStage<>(stageName, pipeEnd, completer));
}
private <O> PipelineBuilder<I, O> thenProcessInParallel(
@ -297,7 +336,9 @@ public class PipelineBuilder<I, T> {
stageName,
newPipeEnd,
newBufferSize,
outputCounter);
outputCounter,
tracingEnabled,
pipelineName);
}
private <O> PipelineBuilder<I, O> addStage(
@ -317,7 +358,9 @@ public class PipelineBuilder<I, T> {
processStage.getName(),
outputPipe,
newBufferSize,
outputCounter);
outputCounter,
tracingEnabled,
pipelineName);
}
private <X> List<X> concat(final Collection<X> existing, final X newItem) {

@ -78,7 +78,8 @@ public class PipelineBuilderTest {
public void shouldPipeTasksFromSupplierToCompleter() throws Exception {
final List<Integer> output = new ArrayList<>();
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
result.get(10, SECONDS);
@ -89,7 +90,8 @@ public class PipelineBuilderTest {
public void shouldPassInputThroughIntermediateStage() throws Exception {
final List<String> output = new ArrayList<>();
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenProcess("toString", Object::toString)
.andFinishWith("end", output::add);
@ -104,7 +106,8 @@ public class PipelineBuilderTest {
public void shouldCombineIntoBatches() throws Exception {
final BlockingQueue<List<Integer>> output = new ArrayBlockingQueue<>(10);
final Pipeline<Integer> pipeline =
PipelineBuilder.<Integer>createPipeline("source", 20, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.<Integer>createPipeline(
"source", 20, NO_OP_LABELLED_2_COUNTER, false, "test")
.inBatches(6)
.andFinishWith("end", output::offer);
@ -135,7 +138,8 @@ public class PipelineBuilderTest {
public void shouldProcessAsync() throws Exception {
final List<String> output = new ArrayList<>();
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3)
.andFinishWith("end", output::add);
final CompletableFuture<?> result = pipeline.start(executorService);
@ -150,7 +154,8 @@ public class PipelineBuilderTest {
final Map<Integer, CompletableFuture<String>> futures = new ConcurrentHashMap<>();
final List<String> output = new CopyOnWriteArrayList<>();
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 15, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 15, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenProcessAsyncOrdered(
"toString",
value -> {
@ -198,7 +203,12 @@ public class PipelineBuilderTest {
final List<CompletableFuture<String>> futures = new CopyOnWriteArrayList<>();
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom(
"input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_2_COUNTER)
"input",
asList(1, 2, 3, 4, 5, 6, 7).iterator(),
10,
NO_OP_LABELLED_2_COUNTER,
false,
"test")
.thenProcessAsync(
"createFuture",
value -> {
@ -235,7 +245,8 @@ public class PipelineBuilderTest {
public void shouldFlatMapItems() throws Exception {
final List<Integer> output = new ArrayList<>();
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20)
.andFinishWith("end", output::add);
@ -252,7 +263,8 @@ public class PipelineBuilderTest {
final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1);
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenProcessInParallel(
"stageName",
value -> {
@ -287,7 +299,8 @@ public class PipelineBuilderTest {
final List<String> output = synchronizedList(new ArrayList<>());
final CountDownLatch latch = new CountDownLatch(1);
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenFlatMapInParallel(
"stageName",
value -> {
@ -327,7 +340,8 @@ public class PipelineBuilderTest {
final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenProcess(
"stageName",
value -> {
@ -363,7 +377,8 @@ public class PipelineBuilderTest {
final List<Integer> output = synchronizedList(new ArrayList<>());
final CountDownLatch startedProcessingValueSix = new CountDownLatch(1);
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenProcess(
"stageName",
value -> {
@ -396,7 +411,8 @@ public class PipelineBuilderTest {
public void shouldAbortPipelineWhenProcessorThrowsException() {
final RuntimeException expectedError = new RuntimeException("Oops");
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER)
PipelineBuilder.createPipelineFrom(
"input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test")
.thenProcess(
"stageName",
(Function<Integer, Integer>)
@ -421,7 +437,7 @@ public class PipelineBuilderTest {
labels ->
counters.computeIfAbsent(labels[0] + "-" + labels[1], label -> new SimpleCounter());
final Pipeline<Integer> pipeline =
PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter)
PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter, false, "test")
.thenProcess("map", Function.identity())
.thenProcessInParallel("parallel", Function.identity(), 3)
.thenProcessAsync("async", CompletableFuture::completedFuture, 3)

Loading…
Cancel
Save