Add IPC transport for JSON-RPC APIs (#3695)

* Fix json-rpc HTTP tests [#535]

The `o.h.b.e.a.j.JsonRpcHttpServiceTest#exceptionallyHandleJsonSingleRequest` and `o.h.b.e.a.j.JsonRpcHttpServiceTest#exceptionallyHandleJsonBatchRequest` tests were throwing ClassCastException in `o.h.b.e.a.j.JsonRpcHttpService#validateMethodAvailability` which wasn't ever catched, returning status 500 by default, but that wasn't the use case aimed to test. Another test running an exceptional method is `o.h.b.t.a.p.EnclaveErrorAcceptanceTest#whenEnclaveIsDisconnectedGetReceiptReturnsInternalError` which validates an "Internal Error" proper json-rpc response. I changed the first two tests to be consistent with the later one.

* Extract json-rpc HTTP authentication to a handler [#535]

* Replace TimeoutHandler in GraphQLHttpService with Vert.x's impl [#535]

* Extract json-rpc HTTP parser to a handler [#535]

* Refactor json-rpc WS handler [#535]

* Add json-rpc IPC support [#535]

Signed-off-by: Diego López León <dieguitoll@gmail.com>
Signed-off-by: Diego López León <dieguitoll@gmail.com>
pull/3768/head
Diego López León 3 years ago committed by GitHub
parent bf349a4520
commit 28845823a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 12
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/BesuNode.java
  3. 10
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java
  4. 1
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java
  5. 8
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/configuration/BesuNodeConfiguration.java
  6. 9
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/configuration/BesuNodeConfigurationBuilder.java
  7. 1
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/configuration/BesuNodeFactory.java
  8. 1
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/privacy/PrivacyNode.java
  9. 74
      acceptance-tests/tests/src/test/java/org/hyperledger/besu/tests/acceptance/jsonrpc/ipc/Web3JSupportAcceptanceTest.java
  10. 12
      besu/src/main/java/org/hyperledger/besu/Runner.java
  11. 67
      besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
  12. 26
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  13. 71
      besu/src/main/java/org/hyperledger/besu/cli/options/unstable/IpcOptions.java
  14. 7
      besu/src/test/java/org/hyperledger/besu/RunnerBuilderTest.java
  15. 3
      besu/src/test/java/org/hyperledger/besu/RunnerTest.java
  16. 1
      besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java
  17. 3
      ethereum/api/build.gradle
  18. 8
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLHttpService.java
  19. 64
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/AuthenticationHandler.java
  20. 36
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/HandlerFactory.java
  21. 19
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/HandlerName.java
  22. 173
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/JsonRpcExecutorHandler.java
  23. 68
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/JsonRpcParserHandler.java
  24. 59
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/TimeoutHandler.java
  25. 4
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/handlers/TimeoutOptions.java
  26. 2
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonResponseStreamer.java
  27. 352
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java
  28. 4
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/context/ContextKey.java
  29. 55
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/AuthenticatedJsonRpcProcessor.java
  30. 52
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/BaseJsonRpcProcessor.java
  31. 120
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/JsonRpcExecutor.java
  32. 30
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/JsonRpcProcessor.java
  33. 48
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/TimedJsonRpcProcessor.java
  34. 61
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/execution/TracedJsonRpcProcessor.java
  35. 51
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/ipc/JsonRpcIpcConfiguration.java
  36. 210
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/ipc/JsonRpcIpcService.java
  37. 263
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketRequestHandler.java
  38. 52
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketService.java
  39. 18
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpServiceTest.java
  40. 195
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/ipc/JsonRpcIpcServiceTest.java
  41. 56
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/timeout/TimeoutHandlerTest.java
  42. 4
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketHostAllowlistTest.java
  43. 29
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketRequestHandlerTest.java
  44. 11
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketServiceLoginTest.java
  45. 4
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketServiceTest.java
  46. 26
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/methods/EthSubscribeIntegrationTest.java
  47. 23
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/methods/EthUnsubscribeIntegrationTest.java
  48. 8
      gradle/check-licenses.gradle
  49. 6
      gradle/versions.gradle

@ -14,6 +14,7 @@
- Onchain node permissioning - log the enodeURL that was previously only throwing an IllegalStateException during the isPermitted check [#3697](https://github.com/hyperledger/besu/pull/3697)
- \[EXPERIMENTAL\] Add snapsync `--sync-mode="X_SNAP"` (only as client) [#3710](https://github.com/hyperledger/besu/pull/3710)
- Adapt Fast sync, and Snap sync, to use finalized block, from consensus layer, as pivot after the Merge [#3506](https://github.com/hyperledger/besu/issues/3506)
- Add IPC JSON-RPC interface (BSD/MacOS and Linux only) [#3695](https://github.com/hyperledger/besu/pull/3695)
### Bug Fixes

@ -22,6 +22,7 @@ import org.hyperledger.besu.crypto.KeyPair;
import org.hyperledger.besu.crypto.KeyPairUtil;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
@ -102,6 +103,7 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
private final JsonRpcConfiguration jsonRpcConfiguration;
private final Optional<JsonRpcConfiguration> engineRpcConfiguration;
private final WebSocketConfiguration webSocketConfiguration;
private final JsonRpcIpcConfiguration jsonRpcIpcConfiguration;
private final MetricsConfiguration metricsConfiguration;
private Optional<PermissioningConfiguration> permissioningConfiguration;
private final GenesisConfigurationProvider genesisConfigProvider;
@ -132,6 +134,7 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
final JsonRpcConfiguration jsonRpcConfiguration,
final Optional<JsonRpcConfiguration> engineRpcConfiguration,
final WebSocketConfiguration webSocketConfiguration,
final JsonRpcIpcConfiguration jsonRpcIpcConfiguration,
final MetricsConfiguration metricsConfiguration,
final Optional<PermissioningConfiguration> permissioningConfiguration,
final Optional<String> keyfilePath,
@ -178,6 +181,7 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
this.jsonRpcConfiguration = jsonRpcConfiguration;
this.engineRpcConfiguration = engineRpcConfiguration;
this.webSocketConfiguration = webSocketConfiguration;
this.jsonRpcIpcConfiguration = jsonRpcIpcConfiguration;
this.metricsConfiguration = metricsConfiguration;
this.permissioningConfiguration = permissioningConfiguration;
this.genesisConfigProvider = genesisConfigProvider;
@ -234,6 +238,10 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
return webSocketConfiguration().isEnabled();
}
public boolean isJsonRpcIpcEnabled() {
return jsonRpcIpcConfiguration().isEnabled();
}
boolean isMetricsEnabled() {
return metricsConfiguration.isEnabled();
}
@ -593,6 +601,10 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
return webSocketConfiguration;
}
JsonRpcIpcConfiguration jsonRpcIpcConfiguration() {
return jsonRpcIpcConfiguration;
}
Optional<String> wsRpcListenHost() {
return Optional.of(webSocketConfiguration().getHost());
}

@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.hyperledger.besu.cli.options.unstable.NetworkingOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty.TLSConfiguration;
import org.hyperledger.besu.ethereum.permissioning.PermissioningConfiguration;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
@ -230,6 +231,15 @@ public class ProcessBesuNodeRunner implements BesuNodeRunner {
params.add("0");
}
if (node.isJsonRpcIpcEnabled()) {
final JsonRpcIpcConfiguration ipcConfiguration = node.jsonRpcIpcConfiguration();
params.add("--Xrpc-ipc-enabled");
params.add("--Xrpc-ipc-path");
params.add(ipcConfiguration.getPath().toString());
params.add("--Xrpc-ipc-apis");
params.add(String.join(",", ipcConfiguration.getEnabledApis()));
}
if (node.isMetricsEnabled()) {
final MetricsConfiguration metricsConfiguration = node.getMetricsConfiguration();
params.add("--metrics-enabled");

@ -202,6 +202,7 @@ public class ThreadBesuNodeRunner implements BesuNodeRunner {
.networkingConfiguration(node.getNetworkingConfiguration())
.jsonRpcConfiguration(node.jsonRpcConfiguration())
.webSocketConfiguration(node.webSocketConfiguration())
.jsonRpcIpcConfiguration(node.jsonRpcIpcConfiguration())
.dataDir(node.homeDirectory())
.metricsSystem(metricsSystem)
.permissioningService(new PermissioningServiceImpl())

@ -17,6 +17,7 @@ package org.hyperledger.besu.tests.acceptance.dsl.node.configuration;
import org.hyperledger.besu.cli.config.NetworkName;
import org.hyperledger.besu.crypto.KeyPair;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
@ -39,6 +40,7 @@ public class BesuNodeConfiguration {
private final JsonRpcConfiguration jsonRpcConfiguration;
private final Optional<JsonRpcConfiguration> engineRpcConfiguration;
private final WebSocketConfiguration webSocketConfiguration;
private final JsonRpcIpcConfiguration jsonRpcIpcConfiguration;
private final Optional<WebSocketConfiguration> engineWebSocketConfiguration;
private final MetricsConfiguration metricsConfiguration;
private final Optional<PermissioningConfiguration> permissioningConfiguration;
@ -72,6 +74,7 @@ public class BesuNodeConfiguration {
final JsonRpcConfiguration jsonRpcConfiguration,
final Optional<JsonRpcConfiguration> engineRpcConfiguration,
final WebSocketConfiguration webSocketConfiguration,
final JsonRpcIpcConfiguration jsonRpcIpcConfiguration,
final Optional<WebSocketConfiguration> engineWebSocketConfiguration,
final MetricsConfiguration metricsConfiguration,
final Optional<PermissioningConfiguration> permissioningConfiguration,
@ -102,6 +105,7 @@ public class BesuNodeConfiguration {
this.jsonRpcConfiguration = jsonRpcConfiguration;
this.engineRpcConfiguration = engineRpcConfiguration;
this.webSocketConfiguration = webSocketConfiguration;
this.jsonRpcIpcConfiguration = jsonRpcIpcConfiguration;
this.engineWebSocketConfiguration = engineWebSocketConfiguration;
this.metricsConfiguration = metricsConfiguration;
this.permissioningConfiguration = permissioningConfiguration;
@ -150,6 +154,10 @@ public class BesuNodeConfiguration {
return webSocketConfiguration;
}
public JsonRpcIpcConfiguration getJsonRpcIpcConfiguration() {
return jsonRpcIpcConfiguration;
}
public Optional<WebSocketConfiguration> getEngineWebSocketConfiguration() {
return engineWebSocketConfiguration;
}

@ -23,6 +23,7 @@ import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.JwtAlgorithm;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.api.tls.FileBasedPasswordProvider;
import org.hyperledger.besu.ethereum.core.AddressHelpers;
@ -59,6 +60,7 @@ public class BesuNodeConfigurationBuilder {
private JsonRpcConfiguration jsonRpcConfiguration = JsonRpcConfiguration.createDefault();
private JsonRpcConfiguration engineRpcConfiguration = JsonRpcConfiguration.createEngineDefault();
private WebSocketConfiguration webSocketConfiguration = WebSocketConfiguration.createDefault();
private JsonRpcIpcConfiguration jsonRpcIpcConfiguration = new JsonRpcIpcConfiguration();
private WebSocketConfiguration engineWebSocketConfiguration =
WebSocketConfiguration.createDefault();
private MetricsConfiguration metricsConfiguration = MetricsConfiguration.builder().build();
@ -240,6 +242,12 @@ public class BesuNodeConfigurationBuilder {
return this;
}
public BesuNodeConfigurationBuilder jsonRpcIpcConfiguration(
final JsonRpcIpcConfiguration jsonRpcIpcConfiguration) {
this.jsonRpcIpcConfiguration = jsonRpcIpcConfiguration;
return this;
}
public BesuNodeConfigurationBuilder metricsConfiguration(
final MetricsConfiguration metricsConfiguration) {
this.metricsConfiguration = metricsConfiguration;
@ -487,6 +495,7 @@ public class BesuNodeConfigurationBuilder {
jsonRpcConfiguration,
Optional.of(engineRpcConfiguration),
webSocketConfiguration,
jsonRpcIpcConfiguration,
Optional.of(engineWebSocketConfiguration),
metricsConfiguration,
permissioningConfiguration,

@ -64,6 +64,7 @@ public class BesuNodeFactory {
config.getJsonRpcConfiguration(),
config.getEngineRpcConfiguration(),
config.getWebSocketConfiguration(),
config.getJsonRpcIpcConfiguration(),
config.getMetricsConfiguration(),
config.getPermissioningConfiguration(),
config.getKeyFilePath(),

@ -103,6 +103,7 @@ public class PrivacyNode implements AutoCloseable {
besuConfig.getJsonRpcConfiguration(),
besuConfig.getEngineRpcConfiguration(),
besuConfig.getWebSocketConfiguration(),
besuConfig.getJsonRpcIpcConfiguration(),
besuConfig.getMetricsConfiguration(),
besuConfig.getPermissioningConfiguration(),
besuConfig.getKeyFilePath(),

@ -0,0 +1,74 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.tests.acceptance.jsonrpc.ipc;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assume.assumeTrue;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.tests.acceptance.dsl.AcceptanceTestBase;
import org.hyperledger.besu.tests.acceptance.dsl.node.Node;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Locale;
import org.junit.Before;
import org.junit.Test;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.Request;
import org.web3j.protocol.core.methods.response.NetVersion;
import org.web3j.protocol.ipc.UnixIpcService;
public class Web3JSupportAcceptanceTest extends AcceptanceTestBase {
private Node node;
private Path socketPath;
@Before
public void setUp() throws Exception {
socketPath = Files.createTempFile("besu-test-", ".ipc");
node =
besu.createNode(
"node1",
(configurationBuilder) ->
configurationBuilder.jsonRpcIpcConfiguration(
new JsonRpcIpcConfiguration(
true, socketPath, Collections.singletonList(RpcApis.NET.name()))));
cluster.start(node);
}
@Test
public void netVersionCall() {
final String osName = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
assumeTrue(osName.contains("mac") || osName.contains("linux"));
final Web3j web3 = Web3j.build(new UnixIpcService(socketPath.toString()));
final Request<?, NetVersion> ethBlockNumberRequest = web3.netVersion();
node.verify(
node -> {
try {
assertThat(ethBlockNumberRequest.send().getNetVersion())
.isEqualTo(String.valueOf(2018));
} catch (IOException e) {
fail("Web3J net_version call failed", e);
}
});
}
}

@ -17,6 +17,7 @@ package org.hyperledger.besu;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService;
import org.hyperledger.besu.ethereum.api.query.cache.AutoTransactionLogBloomCachingService;
import org.hyperledger.besu.ethereum.api.query.cache.TransactionLogBloomCacher;
@ -64,6 +65,7 @@ public class Runner implements AutoCloseable {
private final Optional<JsonRpcHttpService> jsonRpc;
private final Optional<JsonRpcHttpService> engineJsonRpc;
private final Optional<MetricsService> metrics;
private final Optional<JsonRpcIpcService> ipcJsonRpc;
private final Optional<Path> pidPath;
private final Optional<WebSocketService> webSocketRpc;
private final Optional<WebSocketService> engineWebSocketRpc;
@ -84,6 +86,7 @@ public class Runner implements AutoCloseable {
final Optional<GraphQLHttpService> graphQLHttp,
final Optional<WebSocketService> webSocketRpc,
final Optional<WebSocketService> engineWebSocketRpc,
final Optional<JsonRpcIpcService> ipcJsonRpc,
final Optional<StratumServer> stratumServer,
final Optional<MetricsService> metrics,
final Optional<EthStatsService> ethStatsService,
@ -101,6 +104,7 @@ public class Runner implements AutoCloseable {
this.engineJsonRpc = engineJsonRpc;
this.webSocketRpc = webSocketRpc;
this.engineWebSocketRpc = engineWebSocketRpc;
this.ipcJsonRpc = ipcJsonRpc;
this.metrics = metrics;
this.ethStatsService = ethStatsService;
this.besuController = besuController;
@ -123,6 +127,10 @@ public class Runner implements AutoCloseable {
webSocketRpc.ifPresent(service -> waitForServiceToStart("websocketRpc", service.start()));
engineWebSocketRpc.ifPresent(
service -> waitForServiceToStart("engineWebsocketRpc", service.start()));
ipcJsonRpc.ifPresent(
service ->
waitForServiceToStart(
"ipcJsonRpc", service.start().toCompletionStage().toCompletableFuture()));
stratumServer.ifPresent(server -> waitForServiceToStart("stratum", server.start()));
autoTransactionLogBloomCachingService.ifPresent(AutoTransactionLogBloomCachingService::start);
ethStatsService.ifPresent(EthStatsService::start);
@ -158,6 +166,10 @@ public class Runner implements AutoCloseable {
webSocketRpc.ifPresent(service -> waitForServiceToStop("websocketRpc", service.stop()));
engineWebSocketRpc.ifPresent(
service -> waitForServiceToStop("engineWebsocketRpc", service.stop()));
ipcJsonRpc.ifPresent(
service ->
waitForServiceToStop(
"ipcJsonRpc", service.stop().toCompletionStage().toCompletableFuture()));
metrics.ifPresent(service -> waitForServiceToStop("metrics", service.stop()));
ethStatsService.ifPresent(EthStatsService::stop);
besuController.getMiningCoordinator().stop();

@ -38,12 +38,18 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.DefaultAuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.EngineAuthService;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.AuthenticatedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.HealthService;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.LivenessCheck;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.ReadinessCheck;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.filter.FilterManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.filter.FilterManagerBuilder;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.methods.JsonRpcMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
@ -184,6 +190,7 @@ public class RunnerBuilder {
private StorageProvider storageProvider;
private Supplier<List<Bytes>> forkIdSupplier;
private RpcEndpointServiceImpl rpcEndpointServiceImpl;
private JsonRpcIpcConfiguration jsonRpcIpcConfiguration;
public RunnerBuilder vertx(final Vertx vertx) {
this.vertx = vertx;
@ -394,6 +401,12 @@ public class RunnerBuilder {
return this;
}
public RunnerBuilder jsonRpcIpcConfiguration(
final JsonRpcIpcConfiguration jsonRpcIpcConfiguration) {
this.jsonRpcIpcConfiguration = jsonRpcIpcConfiguration;
return this;
}
public Runner build() {
Preconditions.checkNotNull(besuController);
@ -828,6 +841,45 @@ public class RunnerBuilder {
ethStatsService = Optional.empty();
}
final Optional<JsonRpcIpcService> jsonRpcIpcService;
if (jsonRpcIpcConfiguration.isEnabled()) {
Map<String, JsonRpcMethod> ipcMethods =
jsonRpcMethods(
protocolSchedule,
context,
besuController,
peerNetwork,
blockchainQueries,
synchronizer,
transactionPool,
miningCoordinator,
metricsSystem,
supportedCapabilities,
jsonRpcIpcConfiguration.getEnabledApis().stream()
.filter(apiGroup -> !apiGroup.toLowerCase().startsWith("engine"))
.collect(Collectors.toList()),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
privacyParameters,
jsonRpcConfiguration,
webSocketConfiguration,
metricsConfiguration,
natService,
besuPluginContext.getNamedPlugins(),
dataDir,
rpcEndpointServiceImpl);
jsonRpcIpcService =
Optional.of(
new JsonRpcIpcService(
vertx,
jsonRpcIpcConfiguration.getPath(),
new JsonRpcExecutor(new BaseJsonRpcProcessor(), ipcMethods)));
} else {
jsonRpcIpcService = Optional.empty();
}
return new Runner(
vertx,
networkRunner,
@ -837,6 +889,7 @@ public class RunnerBuilder {
graphQLHttpService,
webSocketService,
engineWebSocketService,
jsonRpcIpcService,
stratumServer,
metricsService,
ethStatsService,
@ -1119,10 +1172,22 @@ public class RunnerBuilder {
.values()
.forEach(websocketMethodsFactory::addMethods);
final JsonRpcProcessor jsonRpcProcessor;
if (authenticationService.isPresent()) {
jsonRpcProcessor =
new AuthenticatedJsonRpcProcessor(
new BaseJsonRpcProcessor(),
authenticationService.get(),
configuration.getRpcApisNoAuth());
} else {
jsonRpcProcessor = new BaseJsonRpcProcessor();
}
final JsonRpcExecutor jsonRpcExecutor =
new JsonRpcExecutor(jsonRpcProcessor, websocketMethodsFactory.methods());
final WebSocketRequestHandler websocketRequestHandler =
new WebSocketRequestHandler(
vertx,
websocketMethodsFactory.methods(),
jsonRpcExecutor,
besuController.getProtocolManager().ethContext().getScheduler(),
webSocketConfiguration.getTimeoutSec());

@ -60,6 +60,7 @@ import org.hyperledger.besu.cli.options.stable.P2PTLSConfigOptions;
import org.hyperledger.besu.cli.options.unstable.DnsOptions;
import org.hyperledger.besu.cli.options.unstable.EthProtocolOptions;
import org.hyperledger.besu.cli.options.unstable.EvmOptions;
import org.hyperledger.besu.cli.options.unstable.IpcOptions;
import org.hyperledger.besu.cli.options.unstable.LauncherOptions;
import org.hyperledger.besu.cli.options.unstable.MergeOptions;
import org.hyperledger.besu.cli.options.unstable.MetricsCLIOptions;
@ -111,6 +112,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.JwtAlgorithm;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.api.tls.FileBasedPasswordProvider;
import org.hyperledger.besu.ethereum.api.tls.TlsClientAuthConfiguration;
@ -284,6 +286,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
final LauncherOptions unstableLauncherOptions = LauncherOptions.create();
private final PrivacyPluginOptions unstablePrivacyPluginOptions = PrivacyPluginOptions.create();
private final EvmOptions unstableEvmOptions = EvmOptions.create();
private final IpcOptions unstableIpcOptions = IpcOptions.create();
// stable CLI options
private final DataStorageOptions dataStorageOptions = DataStorageOptions.create();
@ -1275,6 +1278,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
private GraphQLConfiguration graphQLConfiguration;
private WebSocketConfiguration webSocketConfiguration;
private WebSocketConfiguration engineWebSocketConfiguration;
private JsonRpcIpcConfiguration jsonRpcIpcConfiguration;
private ApiConfiguration apiConfiguration;
private MetricsConfiguration metricsConfiguration;
private Optional<PermissioningConfiguration> permissioningConfiguration;
@ -1486,6 +1490,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
.put("Launcher", unstableLauncherOptions)
.put("Merge", mergeOptions)
.put("EVM Options", unstableEvmOptions)
.put("IPC Options", unstableIpcOptions)
.build();
UnstableOptionsSubCommand.createUnstableOptions(commandLine, unstableOptions);
@ -1579,6 +1584,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
engineJsonRpcConfiguration,
webSocketConfiguration,
engineWebSocketConfiguration,
jsonRpcIpcConfiguration,
apiConfiguration,
metricsConfiguration,
permissioningConfiguration,
@ -1907,6 +1913,11 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
engineWebSocketConfiguration =
engineWebSocketConfiguration(
engineRPCOptionGroup.engineRpcWsPort, engineRPCOptionGroup.engineHostsAllowlist);
jsonRpcIpcConfiguration =
jsonRpcIpcConfiguration(
unstableIpcOptions.isEnabled(),
unstableIpcOptions.getIpcPath(),
unstableIpcOptions.getRpcIpcApis());
apiConfiguration = apiConfiguration();
// hostsWhitelist is a hidden option. If it is specified, add the list to hostAllowlist
if (!hostsWhitelist.isEmpty()) {
@ -1937,6 +1948,18 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
instantiateSignatureAlgorithmFactory();
}
private JsonRpcIpcConfiguration jsonRpcIpcConfiguration(
final Boolean enabled, final Path ipcPath, final List<String> rpcIpcApis) {
final Path actualPath;
if (ipcPath == null) {
actualPath = IpcOptions.getDefaultPath(dataDir());
} else {
actualPath = ipcPath;
}
return new JsonRpcIpcConfiguration(
vertx.isNativeTransportEnabled() && enabled, actualPath, rpcIpcApis);
}
private GoQuorumPrivacyParameters configureGoQuorumPrivacy(
final KeyValueStorageProvider storageProvider) {
return new GoQuorumPrivacyParameters(
@ -2745,6 +2768,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
final JsonRpcConfiguration engineJsonRpcConfiguration,
final WebSocketConfiguration webSocketConfiguration,
final WebSocketConfiguration engineWebSocketConfiguration,
final JsonRpcIpcConfiguration jsonRpcIpcConfiguration,
final ApiConfiguration apiConfiguration,
final MetricsConfiguration metricsConfiguration,
final Optional<PermissioningConfiguration> permissioningConfiguration,
@ -2781,6 +2805,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
.engineJsonRpcConfiguration(engineJsonRpcConfiguration)
.webSocketConfiguration(webSocketConfiguration)
.engineWebSocketConfiguration(engineWebSocketConfiguration)
.jsonRpcIpcConfiguration(jsonRpcIpcConfiguration)
.apiConfiguration(apiConfiguration)
.pidPath(pidPath)
.dataDir(dataDir())
@ -2810,6 +2835,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
private VertxOptions createVertxOptions(final MetricsSystem metricsSystem) {
return new VertxOptions()
.setPreferNativeTransport(true)
.setMetricsOptions(
new MetricsOptions()
.setEnabled(true)

@ -0,0 +1,71 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.cli.options.unstable;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.DEFAULT_RPC_APIS;
import java.nio.file.Path;
import java.util.List;
import picocli.CommandLine;
public class IpcOptions {
private static final String DEFAULT_IPC_FILE = "besu.ipc";
public static IpcOptions create() {
return new IpcOptions();
}
public static Path getDefaultPath(final Path dataDir) {
return dataDir.resolve(DEFAULT_IPC_FILE);
}
@CommandLine.Option(
names = {"--Xrpc-ipc-enabled"},
hidden = true,
description = "Set to start the JSON-RPC IPC service (default: ${DEFAULT-VALUE})")
private final Boolean enabled = false;
@CommandLine.Option(
names = {"--Xrpc-ipc-path"},
hidden = true,
description =
"IPC socket/pipe file (default: a file named \""
+ DEFAULT_IPC_FILE
+ "\" in the Besu data directory)")
private Path ipcPath;
@CommandLine.Option(
names = {"--Xrpc-ipc-api", "--Xrpc-ipc-apis"},
hidden = true,
paramLabel = "<api name>",
split = " {0,1}, {0,1}",
arity = "1..*",
description =
"Comma separated list of APIs to enable on JSON-RPC IPC service (default: ${DEFAULT-VALUE})")
private final List<String> rpcIpcApis = DEFAULT_RPC_APIS;
public Boolean isEnabled() {
return enabled;
}
public Path getIpcPath() {
return ipcPath;
}
public List<String> getRpcIpcApis() {
return rpcIpcApis;
}
}

@ -39,6 +39,7 @@ import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.blockcreation.PoWMiningCoordinator;
@ -148,6 +149,7 @@ public final class RunnerBuilderTest {
.permissioningService(mock(PermissioningServiceImpl.class))
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.webSocketConfiguration(mock(WebSocketConfiguration.class))
.jsonRpcIpcConfiguration(mock(JsonRpcIpcConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
.vertx(vertx)
.dataDir(dataDir.getRoot().toPath())
@ -192,6 +194,7 @@ public final class RunnerBuilderTest {
.jsonRpcConfiguration(mock(JsonRpcConfiguration.class))
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.webSocketConfiguration(mock(WebSocketConfiguration.class))
.jsonRpcIpcConfiguration(mock(JsonRpcIpcConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
.vertx(Vertx.vertx())
.dataDir(dataDir.getRoot().toPath())
@ -247,6 +250,7 @@ public final class RunnerBuilderTest {
.engineJsonRpcConfiguration(engine)
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.webSocketConfiguration(mock(WebSocketConfiguration.class))
.jsonRpcIpcConfiguration(mock(JsonRpcIpcConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
.vertx(Vertx.vertx())
.dataDir(dataDir.getRoot().toPath())
@ -285,6 +289,7 @@ public final class RunnerBuilderTest {
.permissioningService(mock(PermissioningServiceImpl.class))
.jsonRpcConfiguration(JsonRpcConfiguration.createDefault())
.webSocketConfiguration(wsRpc)
.jsonRpcIpcConfiguration(mock(JsonRpcIpcConfiguration.class))
.engineWebSocketConfiguration(engineWsRpc)
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
@ -326,6 +331,7 @@ public final class RunnerBuilderTest {
.jsonRpcConfiguration(defaultRpcConfig)
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.webSocketConfiguration(defaultWebSockConfig)
.jsonRpcIpcConfiguration(mock(JsonRpcIpcConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
.vertx(Vertx.vertx())
.dataDir(dataDir.getRoot().toPath())
@ -372,6 +378,7 @@ public final class RunnerBuilderTest {
.engineJsonRpcConfiguration(engine)
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.webSocketConfiguration(mock(WebSocketConfiguration.class))
.jsonRpcIpcConfiguration(mock(JsonRpcIpcConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
.vertx(Vertx.vertx())
.dataDir(dataDir.getRoot().toPath())

@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.GasLimitCalculator;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockImporter;
@ -201,6 +202,7 @@ public final class RunnerTest {
final JsonRpcConfiguration aheadJsonRpcConfiguration = jsonRpcConfiguration();
final GraphQLConfiguration aheadGraphQLConfiguration = graphQLConfiguration();
final WebSocketConfiguration aheadWebSocketConfiguration = wsRpcConfiguration();
final JsonRpcIpcConfiguration aheadJsonRpcIpcConfiguration = new JsonRpcIpcConfiguration();
final MetricsConfiguration aheadMetricsConfiguration = metricsConfiguration();
final Path pidPath = temp.getRoot().toPath().resolve("pid");
final RunnerBuilder runnerBuilder =
@ -225,6 +227,7 @@ public final class RunnerTest {
.jsonRpcConfiguration(aheadJsonRpcConfiguration)
.graphQLConfiguration(aheadGraphQLConfiguration)
.webSocketConfiguration(aheadWebSocketConfiguration)
.jsonRpcIpcConfiguration(aheadJsonRpcIpcConfiguration)
.metricsConfiguration(aheadMetricsConfiguration)
.dataDir(dbAhead)
.pidPath(pidPath)

@ -268,6 +268,7 @@ public abstract class CommandTestAbstract {
when(mockRunnerBuilder.engineWebSocketConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.graphQLConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.webSocketConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.jsonRpcIpcConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.apiConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.dataDir(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.bannedNodeIds(any())).thenReturn(mockRunnerBuilder);

@ -76,6 +76,8 @@ dependencies {
implementation "org.immutables:value-annotations"
runtimeOnly 'org.bouncycastle:bcpkix-jdk15on'
runtimeOnly 'io.netty:netty-transport-native-epoll'
runtimeOnly 'io.netty:netty-transport-native-kqueue'
testImplementation project(':config')
testImplementation project(path: ':config', configuration: 'testSupportArtifacts')
@ -89,6 +91,7 @@ dependencies {
testImplementation 'com.squareup.okhttp3:okhttp'
testImplementation 'io.vertx:vertx-auth-jwt'
testImplementation 'io.vertx:vertx-junit5'
testImplementation 'io.vertx:vertx-unit'
testImplementation 'io.vertx:vertx-web-client'
testImplementation 'junit:junit'

@ -25,7 +25,6 @@ import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLRespon
import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLResponseType;
import org.hyperledger.besu.ethereum.api.graphql.internal.response.GraphQLSuccessResponse;
import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutHandler;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.util.NetworkUtility;
@ -41,6 +40,7 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
@ -66,6 +66,7 @@ import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.TimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -164,8 +165,9 @@ public class GraphQLHttpService {
.method(POST)
.produces(APPLICATION_JSON)
.handler(
TimeoutHandler.handler(
Optional.of(new TimeoutOptions(config.getHttpTimeoutSec())), false))
TimeoutHandler.create(
TimeUnit.SECONDS.toMillis(config.getHttpTimeoutSec()),
TimeoutOptions.DEFAULT_ERROR_CODE))
.handler(this::handleGraphQLRequest);
final CompletableFuture<?> resultFuture = new CompletableFuture<>();

@ -0,0 +1,64 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.handlers;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationUtils;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import java.util.Collection;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.ext.web.RoutingContext;
public class AuthenticationHandler {
private AuthenticationHandler() {}
public static Handler<RoutingContext> handler(
final AuthenticationService authenticationService, final Collection<String> noAuthRpcApis) {
return ctx -> {
// first check token if authentication is required
final String token = getAuthToken(ctx);
if (token == null && noAuthRpcApis.isEmpty()) {
// no auth token when auth required
handleJsonRpcUnauthorizedError(ctx);
} else {
authenticationService.authenticate(
token, user -> ctx.put(ContextKey.AUTHENTICATED_USER.name(), user));
ctx.next();
}
};
}
private static String getAuthToken(final RoutingContext routingContext) {
return AuthenticationUtils.getJwtTokenFromAuthorizationHeaderValue(
routingContext.request().getHeader("Authorization"));
}
private static void handleJsonRpcUnauthorizedError(final RoutingContext routingContext) {
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(HttpResponseStatus.UNAUTHORIZED.code())
.end(Json.encode(new JsonRpcErrorResponse(null, JsonRpcError.UNAUTHORIZED)));
}
}
}

@ -14,32 +14,40 @@
*/
package org.hyperledger.besu.ethereum.api.handlers;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import io.opentelemetry.api.trace.Tracer;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
public class HandlerFactory {
private static final Map<HandlerName, Handler<RoutingContext>> HANDLERS =
new ConcurrentHashMap<>();
public static Handler<RoutingContext> timeout(
final TimeoutOptions globalOptions,
final Map<String, JsonRpcMethod> methods,
final boolean decodeJSON) {
final TimeoutOptions globalOptions, final Map<String, JsonRpcMethod> methods) {
assert methods != null && globalOptions != null;
return HANDLERS.computeIfAbsent(
HandlerName.TIMEOUT,
handlerName ->
TimeoutHandler.handler(
Optional.of(globalOptions),
methods.keySet().stream()
.collect(Collectors.toMap(String::new, ignored -> globalOptions)),
decodeJSON));
return TimeoutHandler.handler(
Optional.of(globalOptions),
methods.keySet().stream().collect(Collectors.toMap(String::new, ignored -> globalOptions)));
}
public static Handler<RoutingContext> authentication(
final AuthenticationService authenticationService, final Collection<String> noAuthRpcApis) {
return AuthenticationHandler.handler(authenticationService, noAuthRpcApis);
}
public static Handler<RoutingContext> jsonRpcParser() {
return JsonRpcParserHandler.handler();
}
public static Handler<RoutingContext> jsonRpcExecutor(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer) {
return JsonRpcExecutorHandler.handler(jsonRpcExecutor, tracer);
}
}

@ -1,19 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.handlers;
public enum HandlerName {
TIMEOUT
}

@ -0,0 +1,173 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.handlers;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonResponseStreamer;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JsonRpcExecutorHandler {
private static final Logger LOG = LoggerFactory.getLogger(JsonRpcExecutorHandler.class);
private static final String SPAN_CONTEXT = "span_context";
private static final String APPLICATION_JSON = "application/json";
private static final ObjectWriter JSON_OBJECT_WRITER =
new ObjectMapper()
.registerModule(new Jdk8Module()) // Handle JDK8 Optionals (de)serialization
.writerWithDefaultPrettyPrinter()
.without(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM)
.with(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
private JsonRpcExecutorHandler() {}
public static Handler<RoutingContext> handler(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer) {
return ctx -> {
HttpServerResponse response = ctx.response();
try {
Optional<User> user = ContextKey.AUTHENTICATED_USER.extractFrom(ctx, Optional::empty);
Context spanContext = ctx.get(SPAN_CONTEXT);
response = response.putHeader("Content-Type", APPLICATION_JSON);
if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())) {
JsonObject jsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
JsonRpcResponse jsonRpcResponse =
jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class));
response.setStatusCode(status(jsonRpcResponse).code());
if (jsonRpcResponse.getType() == JsonRpcResponseType.NONE) {
response.end();
} else {
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
JSON_OBJECT_WRITER.writeValue(streamer, jsonRpcResponse);
}
}
} else if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name())) {
JsonArray batchJsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name());
List<JsonRpcResponse> jsonRpcBatchResponse;
try {
List<JsonRpcResponse> responses = new ArrayList<>();
for (int i = 0; i < batchJsonRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRequest.getJsonObject(i);
} catch (ClassCastException e) {
responses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
responses.add(
jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class)));
}
jsonRpcBatchResponse = responses;
} catch (RuntimeException e) {
response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
return;
}
final JsonRpcResponse[] completed =
jsonRpcBatchResponse.stream()
.filter(jsonRpcResponse -> jsonRpcResponse.getType() != JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
JSON_OBJECT_WRITER.writeValue(streamer, completed);
}
} else {
handleJsonRpcError(ctx, null, JsonRpcError.PARSE_ERROR);
}
} catch (IOException ex) {
LOG.error("Error streaming JSON-RPC response", ex);
} catch (RuntimeException e) {
handleJsonRpcError(ctx, null, JsonRpcError.INTERNAL_ERROR);
}
};
}
private static void handleJsonRpcError(
final RoutingContext routingContext, final Object id, final JsonRpcError error) {
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(statusCodeFromError(error).code())
.end(Json.encode(new JsonRpcErrorResponse(id, error)));
}
}
private static HttpResponseStatus status(final JsonRpcResponse response) {
switch (response.getType()) {
case UNAUTHORIZED:
return HttpResponseStatus.UNAUTHORIZED;
case ERROR:
return statusCodeFromError(((JsonRpcErrorResponse) response).getError());
case SUCCESS:
case NONE:
default:
return HttpResponseStatus.OK;
}
}
private static HttpResponseStatus statusCodeFromError(final JsonRpcError error) {
switch (error) {
case INVALID_REQUEST:
case INVALID_PARAMS:
case PARSE_ERROR:
return HttpResponseStatus.BAD_REQUEST;
default:
return HttpResponseStatus.OK;
}
}
}

@ -0,0 +1,68 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.handlers;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.web.RoutingContext;
public class JsonRpcParserHandler {
private JsonRpcParserHandler() {}
public static Handler<RoutingContext> handler() {
return ctx -> {
final HttpServerResponse response = ctx.response();
if (ctx.getBody() == null) {
errorResponse(response, JsonRpcError.PARSE_ERROR);
} else {
try {
ctx.put(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name(), ctx.getBodyAsJson());
} catch (DecodeException jsonObjectDecodeException) {
try {
final JsonArray batchRequest = ctx.getBodyAsJsonArray();
if (batchRequest.isEmpty()) {
errorResponse(response, JsonRpcError.INVALID_REQUEST);
return;
} else {
ctx.put(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name(), batchRequest);
}
} catch (DecodeException jsonArrayDecodeException) {
errorResponse(response, JsonRpcError.PARSE_ERROR);
return;
}
}
ctx.next();
}
};
}
private static void errorResponse(
final HttpServerResponse response, final JsonRpcError rpcError) {
if (!response.closed()) {
response
.setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
.end(Json.encode(new JsonRpcErrorResponse(null, rpcError)));
}
}
}

@ -14,8 +14,6 @@
*/
package org.hyperledger.besu.ethereum.api.handlers;
import static java.util.Collections.emptyMap;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import java.util.Map;
@ -27,50 +25,39 @@ import io.vertx.ext.web.RoutingContext;
public class TimeoutHandler {
public static Handler<RoutingContext> handler(
final Optional<TimeoutOptions> globalOptions, final boolean decodeJSON) {
return handler(globalOptions, emptyMap(), decodeJSON);
}
public static Handler<RoutingContext> handler(
final Optional<TimeoutOptions> globalOptions,
final Map<String, TimeoutOptions> timeoutOptionsByMethod,
final boolean decodeJSON) {
final Map<String, TimeoutOptions> timeoutOptionsByMethod) {
assert timeoutOptionsByMethod != null;
return ctx -> processHandler(ctx, globalOptions, timeoutOptionsByMethod, decodeJSON);
return ctx -> processHandler(ctx, globalOptions, timeoutOptionsByMethod);
}
private static void processHandler(
final RoutingContext ctx,
final Optional<TimeoutOptions> globalOptions,
final Map<String, TimeoutOptions> timeoutOptionsByMethod,
final boolean decodeJSON) {
final Map<String, TimeoutOptions> timeoutOptionsByMethod) {
try {
final String bodyAsString = ctx.getBodyAsString();
if (bodyAsString != null) {
final String json = ctx.getBodyAsString().trim();
Optional<TimeoutOptions> methodTimeoutOptions = Optional.empty();
if (decodeJSON && !json.isEmpty() && json.charAt(0) == '{') {
final JsonObject requestBodyJsonObject = new JsonObject(json);
ctx.put(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name(), requestBodyJsonObject);
final String method = requestBodyJsonObject.getString("method");
methodTimeoutOptions = Optional.ofNullable(timeoutOptionsByMethod.get(method));
}
methodTimeoutOptions
.or(() -> globalOptions)
.ifPresent(
timeoutOptions -> {
long tid =
ctx.vertx()
.setTimer(
timeoutOptions.getTimeoutMillis(),
t -> {
ctx.fail(timeoutOptions.getErrorCode());
ctx.response().close();
});
ctx.addBodyEndHandler(v -> ctx.vertx().cancelTimer(tid));
});
Optional<TimeoutOptions> methodTimeoutOptions = Optional.empty();
if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())) {
final JsonObject requestBodyJsonObject =
ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
final String method = requestBodyJsonObject.getString("method");
methodTimeoutOptions = Optional.ofNullable(timeoutOptionsByMethod.get(method));
}
methodTimeoutOptions
.or(() -> globalOptions)
.ifPresent(
timeoutOptions -> {
long tid =
ctx.vertx()
.setTimer(
timeoutOptions.getTimeoutMillis(),
t -> {
ctx.fail(timeoutOptions.getErrorCode());
ctx.response().close();
});
ctx.addBodyEndHandler(v -> ctx.vertx().cancelTimer(tid));
});
} finally {
ctx.next();
}

@ -18,8 +18,10 @@ import java.time.Duration;
import java.util.concurrent.TimeUnit;
public class TimeoutOptions {
public static final int DEFAULT_ERROR_CODE = 504;
private static final long DEFAULT_TIMEOUT_SECONDS = Duration.ofMinutes(5).toSeconds();
private static final int DEFAULT_ERROR_CODE = 504;
private final long timeoutSec;
private final int errorCode;

@ -24,7 +24,7 @@ import io.vertx.core.net.SocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class JsonResponseStreamer extends OutputStream {
public class JsonResponseStreamer extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(JsonResponseStreamer.class);

@ -16,31 +16,21 @@ package org.hyperledger.besu.ethereum.api.jsonrpc;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Streams.stream;
import static java.util.stream.Collectors.toList;
import static org.apache.tuweni.net.tls.VertxTrustOptions.allowlistClients;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;
import org.hyperledger.besu.ethereum.api.handlers.HandlerFactory;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationUtils;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.DefaultAuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.AuthenticatedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.TimedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.TracedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.HealthService;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestId;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcNoResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import org.hyperledger.besu.ethereum.api.tls.TlsClientAuthConfiguration;
import org.hyperledger.besu.ethereum.api.tls.TlsConfiguration;
import org.hyperledger.besu.ethereum.privacy.MultiTenancyValidationException;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
@ -53,10 +43,8 @@ import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.util.ExceptionUtils;
import org.hyperledger.besu.util.NetworkUtility;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
@ -64,14 +52,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
@ -83,8 +66,6 @@ import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.extension.trace.propagation.JaegerPropagator;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
@ -96,13 +77,9 @@ import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
@ -117,14 +94,6 @@ public class JsonRpcHttpService {
private static final String SPAN_CONTEXT = "span_context";
private static final InetSocketAddress EMPTY_SOCKET_ADDRESS = new InetSocketAddress("0.0.0.0", 0);
private static final String APPLICATION_JSON = "application/json";
private static final JsonRpcResponse NO_RESPONSE = new JsonRpcNoResponse();
private static final ObjectWriter JSON_OBJECT_WRITER =
new ObjectMapper()
.registerModule(new Jdk8Module()) // Handle JDK8 Optionals (de)serialization
.writerWithDefaultPrettyPrinter()
.without(Feature.FLUSH_PASSED_TO_STREAM)
.with(Feature.AUTO_CLOSE_TARGET);
private static final String EMPTY_RESPONSE = "";
private static final TextMapPropagator traceFormats =
TextMapPropagator.composite(
@ -344,14 +313,35 @@ public class JsonRpcHttpService {
.route(HealthService.READINESS_PATH)
.method(HttpMethod.GET)
.handler(readinessService::handleRequest);
router
.route("/")
.method(HttpMethod.POST)
.produces(APPLICATION_JSON)
Route mainRoute = router.route("/").method(HttpMethod.POST).produces(APPLICATION_JSON);
if (authenticationService.isPresent()) {
mainRoute.handler(
HandlerFactory.authentication(authenticationService.get(), config.getNoAuthRpcApis()));
}
mainRoute
.handler(HandlerFactory.jsonRpcParser())
.handler(
HandlerFactory.timeout(
new TimeoutOptions(config.getHttpTimeoutSec()), rpcMethods, true))
.handler(this::handleJsonRPCRequest);
HandlerFactory.timeout(new TimeoutOptions(config.getHttpTimeoutSec()), rpcMethods));
if (authenticationService.isPresent()) {
mainRoute.blockingHandler(
HandlerFactory.jsonRpcExecutor(
new JsonRpcExecutor(
new AuthenticatedJsonRpcProcessor(
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
authenticationService.get(),
config.getNoAuthRpcApis()),
rpcMethods),
tracer));
} else {
mainRoute.blockingHandler(
HandlerFactory.jsonRpcExecutor(
new JsonRpcExecutor(
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
rpcMethods),
tracer));
}
if (authenticationService.isPresent()) {
router
@ -493,11 +483,6 @@ public class JsonRpcHttpService {
};
}
private String getAuthToken(final RoutingContext routingContext) {
return AuthenticationUtils.getJwtTokenFromAuthorizationHeaderValue(
routingContext.request().getHeader("Authorization"));
}
private Optional<String> getAndValidateHostHeader(final RoutingContext event) {
String hostname =
event.request().getHeader(HttpHeaders.HOST) != null
@ -562,280 +547,11 @@ public class JsonRpcHttpService {
return config.getTlsConfiguration().isPresent() ? "https" : "http";
}
private void handleJsonRPCRequest(final RoutingContext routingContext) {
// first check token if authentication is required
final String token = getAuthToken(routingContext);
// we check the no auth api methods actually match what's in the request later on
if (authenticationService.isPresent() && token == null && config.getNoAuthRpcApis().isEmpty()) {
// no auth token when auth required
handleJsonRpcUnauthorizedError(routingContext, null, JsonRpcError.UNAUTHORIZED);
} else {
// Parse json
try {
final String json = routingContext.getBodyAsString().trim();
if (!json.isEmpty() && json.charAt(0) == '{') {
final JsonObject requestBodyJsonObject =
ContextKey.REQUEST_BODY_AS_JSON_OBJECT.extractFrom(
routingContext, () -> new JsonObject(json));
if (authenticationService.isPresent()) {
authenticationService
.get()
.authenticate(
token,
user -> handleJsonSingleRequest(routingContext, requestBodyJsonObject, user));
} else {
handleJsonSingleRequest(routingContext, requestBodyJsonObject, Optional.empty());
}
} else {
final JsonArray array = new JsonArray(json);
if (array.size() < 1) {
handleJsonRpcError(routingContext, null, INVALID_REQUEST);
return;
}
if (authenticationService.isPresent()) {
authenticationService
.get()
.authenticate(token, user -> handleJsonBatchRequest(routingContext, array, user));
} else {
handleJsonBatchRequest(routingContext, array, Optional.empty());
}
}
} catch (final DecodeException | NullPointerException ex) {
handleJsonRpcError(routingContext, null, JsonRpcError.PARSE_ERROR);
}
}
}
// Facilitate remote health-checks in AWS, inter alia.
private void handleEmptyRequest(final RoutingContext routingContext) {
routingContext.response().setStatusCode(201).end();
}
private void handleJsonSingleRequest(
final RoutingContext routingContext, final JsonObject request, final Optional<User> user) {
final HttpServerResponse response = routingContext.response();
vertx.executeBlocking(
future -> {
final JsonRpcResponse jsonRpcResponse = process(routingContext, request, user);
future.complete(jsonRpcResponse);
},
false,
(res) -> {
if (!response.closed() && !response.headWritten()) {
if (res.failed()) {
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
return;
}
final JsonRpcResponse jsonRpcResponse = (JsonRpcResponse) res.result();
response
.setStatusCode(status(jsonRpcResponse).code())
.putHeader("Content-Type", APPLICATION_JSON);
if (jsonRpcResponse.getType() == JsonRpcResponseType.NONE) {
response.end(EMPTY_RESPONSE);
} else {
try {
// underlying output stream lifecycle is managed by the json object writer
JSON_OBJECT_WRITER.writeValue(
new JsonResponseStreamer(response, routingContext.request().remoteAddress()),
jsonRpcResponse);
} catch (IOException ex) {
LOG.error("Error streaming JSON-RPC response", ex);
}
}
}
});
}
private HttpResponseStatus status(final JsonRpcResponse response) {
switch (response.getType()) {
case UNAUTHORIZED:
return HttpResponseStatus.UNAUTHORIZED;
case ERROR:
return statusCodeFromError(((JsonRpcErrorResponse) response).getError());
case SUCCESS:
case NONE:
default:
return HttpResponseStatus.OK;
}
}
private HttpResponseStatus statusCodeFromError(final JsonRpcError error) {
switch (error) {
case INVALID_REQUEST:
case INVALID_PARAMS:
case PARSE_ERROR:
return HttpResponseStatus.BAD_REQUEST;
default:
return HttpResponseStatus.OK;
}
}
@SuppressWarnings("rawtypes")
private void handleJsonBatchRequest(
final RoutingContext routingContext, final JsonArray jsonArray, final Optional<User> user) {
// Interpret json as rpc request
final List<Future> responses =
jsonArray.stream()
.map(
obj -> {
if (!(obj instanceof JsonObject)) {
return Future.succeededFuture(errorResponse(null, INVALID_REQUEST));
}
final JsonObject req = (JsonObject) obj;
return vertx.executeBlocking(
future -> future.complete(process(routingContext, req, user)));
})
.collect(toList());
CompositeFuture.all(responses)
.onComplete(
(res) -> {
final HttpServerResponse response = routingContext.response();
if (response.closed() || response.headWritten()) {
return;
}
if (res.failed()) {
response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
return;
}
final JsonRpcResponse[] completed =
res.result().list().stream()
.map(JsonRpcResponse.class::cast)
.filter(this::isNonEmptyResponses)
.toArray(JsonRpcResponse[]::new);
try {
// underlying output stream lifecycle is managed by the json object writer
JSON_OBJECT_WRITER.writeValue(
new JsonResponseStreamer(response, routingContext.request().remoteAddress()),
completed);
} catch (IOException ex) {
LOG.error("Error streaming JSON-RPC response", ex);
}
});
}
private boolean isNonEmptyResponses(final JsonRpcResponse result) {
return result.getType() != JsonRpcResponseType.NONE;
}
private JsonRpcResponse process(
final RoutingContext ctx, final JsonObject requestJson, final Optional<User> user) {
final JsonRpcRequest requestBody;
Object id = null;
try {
id = new JsonRpcRequestId(requestJson.getValue("id")).getValue();
requestBody = requestJson.mapTo(JsonRpcRequest.class);
} catch (final IllegalArgumentException exception) {
return errorResponse(id, INVALID_REQUEST);
}
Span span =
tracer
.spanBuilder(requestBody.getMethod())
.setSpanKind(SpanKind.INTERNAL)
.setParent(ctx.get(SPAN_CONTEXT))
.startSpan();
try {
// Handle notifications
if (requestBody.isNotification()) {
// Notifications aren't handled so create empty result for now.
return NO_RESPONSE;
}
final Optional<JsonRpcError> unavailableMethod = validateMethodAvailability(requestBody);
if (unavailableMethod.isPresent()) {
span.setStatus(StatusCode.ERROR, "method unavailable");
return errorResponse(id, unavailableMethod.get());
}
final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod());
if (!authenticationService.isPresent()
|| (authenticationService.isPresent()
&& authenticationService
.get()
.isPermitted(user, method, config.getNoAuthRpcApis()))) {
// Generate response
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(requestBody.getMethod()).startTimer()) {
if (user.isPresent()) {
return method.response(
new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed()));
}
return method.response(
new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed()));
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params for method: {}", method.getName(), e);
span.setStatus(StatusCode.ERROR, "Invalid Params");
return errorResponse(id, JsonRpcError.INVALID_PARAMS);
} catch (final MultiTenancyValidationException e) {
span.setStatus(StatusCode.ERROR, "Unauthorized");
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} catch (final RuntimeException e) {
LOG.error("Error processing JSON-RPC requestBody", e);
span.setStatus(StatusCode.ERROR, "Error processing JSON-RPC requestBody");
return errorResponse(id, JsonRpcError.INTERNAL_ERROR);
}
} else {
span.setStatus(StatusCode.ERROR, "Unauthorized");
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
}
} finally {
span.end();
}
}
private Optional<JsonRpcError> validateMethodAvailability(final JsonRpcRequest request) {
final String name = request.getMethod();
LOG.debug("JSON-RPC request -> {} {}", name, request.getParams());
final JsonRpcMethod method = rpcMethods.get(name);
if (method == null) {
if (!RpcMethod.rpcMethodExists(name)) {
return Optional.of(JsonRpcError.METHOD_NOT_FOUND);
}
if (!rpcMethods.containsKey(name)) {
return Optional.of(JsonRpcError.METHOD_NOT_ENABLED);
}
}
return Optional.empty();
}
private void handleJsonRpcError(
final RoutingContext routingContext, final Object id, final JsonRpcError error) {
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(statusCodeFromError(error).code())
.end(Json.encode(new JsonRpcErrorResponse(id, error)));
}
}
private void handleJsonRpcUnauthorizedError(
final RoutingContext routingContext, final Object id, final JsonRpcError error) {
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(HttpResponseStatus.UNAUTHORIZED.code())
.end(Json.encode(new JsonRpcErrorResponse(id, error)));
}
}
private JsonRpcResponse errorResponse(final Object id, final JsonRpcError error) {
return new JsonRpcErrorResponse(id, error);
}
private JsonRpcResponse unauthorizedResponse(final Object id, final JsonRpcError error) {
return new JsonRpcUnauthorizedResponse(id, error);
}
private String buildCorsRegexFromConfig() {
if (config.getCorsAllowedDomains().isEmpty()) {
return "";

@ -19,7 +19,9 @@ import java.util.function.Supplier;
import io.vertx.ext.web.RoutingContext;
public enum ContextKey {
REQUEST_BODY_AS_JSON_OBJECT;
REQUEST_BODY_AS_JSON_OBJECT,
REQUEST_BODY_AS_JSON_ARRAY,
AUTHENTICATED_USER;
public <T> T extractFrom(final RoutingContext ctx, final Supplier<T> defaultSupplier) {
final T value = ctx.get(this.name());

@ -0,0 +1,55 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.execution;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestId;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import java.util.Collection;
import io.opentelemetry.api.trace.Span;
public class AuthenticatedJsonRpcProcessor implements JsonRpcProcessor {
private final JsonRpcProcessor rpcProcessor;
private final AuthenticationService authenticationService;
private final Collection<String> noAuthRpcApis;
public AuthenticatedJsonRpcProcessor(
final JsonRpcProcessor rpcProcessor,
final AuthenticationService authenticationService,
final Collection<String> noAuthRpcApis) {
this.rpcProcessor = rpcProcessor;
this.authenticationService = authenticationService;
this.noAuthRpcApis = noAuthRpcApis;
}
@Override
public JsonRpcResponse process(
final JsonRpcRequestId id,
final JsonRpcMethod method,
final Span metricSpan,
final JsonRpcRequestContext request) {
if (authenticationService.isPermitted(request.getUser(), method, noAuthRpcApis)) {
return rpcProcessor.process(id, method, metricSpan, request);
}
return new JsonRpcUnauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
}
}

@ -0,0 +1,52 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.execution;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestId;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import org.hyperledger.besu.ethereum.privacy.MultiTenancyValidationException;
import io.opentelemetry.api.trace.Span;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BaseJsonRpcProcessor implements JsonRpcProcessor {
private static final Logger LOG = LoggerFactory.getLogger(BaseJsonRpcProcessor.class);
@Override
public JsonRpcResponse process(
final JsonRpcRequestId id,
final JsonRpcMethod method,
final Span metricSpan,
final JsonRpcRequestContext request) {
try {
return method.response(request);
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params for method: {}", method.getName(), e);
return new JsonRpcErrorResponse(id, JsonRpcError.INVALID_PARAMS);
} catch (final MultiTenancyValidationException e) {
return new JsonRpcUnauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} catch (final RuntimeException e) {
return new JsonRpcErrorResponse(id, JsonRpcError.INTERNAL_ERROR);
}
}
}

@ -0,0 +1,120 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.execution;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestId;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcNoResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JsonRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(JsonRpcExecutor.class);
private final JsonRpcProcessor rpcProcessor;
private final Map<String, JsonRpcMethod> rpcMethods;
public JsonRpcExecutor(
final JsonRpcProcessor rpcProcessor, final Map<String, JsonRpcMethod> rpcMethods) {
this.rpcProcessor = rpcProcessor;
this.rpcMethods = rpcMethods;
}
public JsonRpcResponse execute(
final Optional<User> optionalUser,
final Tracer tracer,
final Context spanContext,
final Supplier<Boolean> alive,
final JsonObject jsonRpcRequest,
final Function<JsonObject, JsonRpcRequest> requestBodyProvider) {
try {
final JsonRpcRequest requestBody = requestBodyProvider.apply(jsonRpcRequest);
final JsonRpcRequestId id = new JsonRpcRequestId(requestBody.getId());
// Handle notifications
if (requestBody.isNotification()) {
// Notifications aren't handled so create empty result for now.
return new JsonRpcNoResponse();
}
final Span span;
if (tracer != null) {
span =
tracer
.spanBuilder(requestBody.getMethod())
.setSpanKind(SpanKind.INTERNAL)
.setParent(spanContext)
.startSpan();
} else {
span = Span.getInvalid();
}
final Optional<JsonRpcError> unavailableMethod = validateMethodAvailability(requestBody);
if (unavailableMethod.isPresent()) {
span.setStatus(StatusCode.ERROR, "method unavailable");
return new JsonRpcErrorResponse(id, unavailableMethod.get());
}
final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod());
return rpcProcessor.process(
id, method, span, new JsonRpcRequestContext(requestBody, optionalUser, alive));
} catch (IllegalArgumentException e) {
try {
final Integer id = jsonRpcRequest.getInteger("id", null);
return new JsonRpcErrorResponse(id, INVALID_REQUEST);
} catch (ClassCastException idNotIntegerException) {
return new JsonRpcErrorResponse(null, INVALID_REQUEST);
}
}
}
private Optional<JsonRpcError> validateMethodAvailability(final JsonRpcRequest request) {
final String name = request.getMethod();
LOG.debug("JSON-RPC request -> {} {}", name, request.getParams());
final JsonRpcMethod method = rpcMethods.get(name);
if (method == null) {
if (!RpcMethod.rpcMethodExists(name)) {
return Optional.of(JsonRpcError.METHOD_NOT_FOUND);
}
if (!rpcMethods.containsKey(name)) {
return Optional.of(JsonRpcError.METHOD_NOT_ENABLED);
}
}
return Optional.empty();
}
}

@ -0,0 +1,30 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.execution;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestId;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import io.opentelemetry.api.trace.Span;
public interface JsonRpcProcessor {
JsonRpcResponse process(
final JsonRpcRequestId id,
final JsonRpcMethod method,
final Span metricSpan,
final JsonRpcRequestContext request);
}

@ -0,0 +1,48 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.execution;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestId;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import io.opentelemetry.api.trace.Span;
public class TimedJsonRpcProcessor implements JsonRpcProcessor {
private final JsonRpcProcessor rpcProcessor;
private final LabelledMetric<OperationTimer> requestTimer;
public TimedJsonRpcProcessor(
final JsonRpcProcessor rpcProcessor, final LabelledMetric<OperationTimer> requestTimer) {
this.rpcProcessor = rpcProcessor;
this.requestTimer = requestTimer;
}
@Override
public JsonRpcResponse process(
final JsonRpcRequestId id,
final JsonRpcMethod method,
final Span metricSpan,
final JsonRpcRequestContext request) {
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(request.getRequest().getMethod()).startTimer()) {
return rpcProcessor.process(id, method, metricSpan, request);
}
}
}

@ -0,0 +1,61 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.execution;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestId;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
public class TracedJsonRpcProcessor implements JsonRpcProcessor {
private final JsonRpcProcessor rpcProcessor;
public TracedJsonRpcProcessor(final JsonRpcProcessor rpcProcessor) {
this.rpcProcessor = rpcProcessor;
}
@Override
public JsonRpcResponse process(
final JsonRpcRequestId id,
final JsonRpcMethod method,
final Span metricSpan,
final JsonRpcRequestContext request) {
JsonRpcResponse jsonRpcResponse = rpcProcessor.process(id, method, metricSpan, request);
if (JsonRpcResponseType.ERROR == jsonRpcResponse.getType()) {
JsonRpcErrorResponse errorResponse = (JsonRpcErrorResponse) jsonRpcResponse;
switch (errorResponse.getError()) {
case INVALID_PARAMS:
metricSpan.setStatus(StatusCode.ERROR, "Invalid Params");
break;
case UNAUTHORIZED:
metricSpan.setStatus(StatusCode.ERROR, "Unauthorized");
break;
case INTERNAL_ERROR:
metricSpan.setStatus(StatusCode.ERROR, "Error processing JSON-RPC requestBody");
break;
default:
metricSpan.setStatus(StatusCode.ERROR, "Unexpected error");
}
}
metricSpan.end();
return jsonRpcResponse;
}
}

@ -0,0 +1,51 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.ipc;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
public class JsonRpcIpcConfiguration {
private final boolean enabled;
private final Path ipcPath;
private final Collection<String> enabledApis;
public JsonRpcIpcConfiguration() {
enabled = false;
ipcPath = null;
enabledApis = Collections.emptyList();
}
public JsonRpcIpcConfiguration(
final boolean enabled, final Path ipcPath, final Collection<String> enabledApis) {
this.enabled = enabled;
this.ipcPath = ipcPath;
this.enabledApis = enabledApis;
}
public boolean isEnabled() {
return enabled;
}
public Path getPath() {
return ipcPath;
}
public Collection<String> getEnabledApis() {
return enabledApis;
}
}

@ -0,0 +1,210 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.ipc;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JsonRpcIpcService {
private static final Logger LOG = LoggerFactory.getLogger(JsonRpcIpcService.class);
private static final ObjectWriter JSON_OBJECT_WRITER =
new ObjectMapper()
.registerModule(new Jdk8Module()) // Handle JDK8 Optionals (de)serialization
.writer()
.without(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM)
.with(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
private final Vertx vertx;
private final Path path;
private final JsonRpcExecutor jsonRpcExecutor;
private NetServer netServer;
public JsonRpcIpcService(final Vertx vertx, final Path path, final JsonRpcExecutor rpcExecutor) {
this.vertx = vertx;
this.path = path;
this.jsonRpcExecutor = rpcExecutor;
}
public Future<NetServer> start() {
netServer = vertx.createNetServer(buildNetServerOptions());
netServer.connectHandler(
socket -> {
AtomicBoolean closedSocket = new AtomicBoolean(false);
socket
.closeHandler(unused -> closedSocket.set(true))
.handler(
buffer -> {
if (buffer.length() == 0) {
errorReturn(socket, null, JsonRpcError.INVALID_REQUEST);
} else {
try {
final JsonObject jsonRpcRequest = buffer.toJsonObject();
vertx
.<JsonRpcResponse>executeBlocking(
promise -> {
final JsonRpcResponse jsonRpcResponse =
jsonRpcExecutor.execute(
Optional.empty(),
null,
null,
closedSocket::get,
jsonRpcRequest,
req -> req.mapTo(JsonRpcRequest.class));
promise.complete(jsonRpcResponse);
})
.onSuccess(
jsonRpcResponse -> {
try {
socket.write(
JSON_OBJECT_WRITER.writeValueAsString(jsonRpcResponse)
+ '\n');
} catch (JsonProcessingException e) {
LOG.error("Error streaming JSON-RPC response", e);
}
})
.onFailure(
throwable -> {
try {
final Integer id = jsonRpcRequest.getInteger("id", null);
errorReturn(socket, id, JsonRpcError.INTERNAL_ERROR);
} catch (ClassCastException idNotIntegerException) {
errorReturn(socket, null, JsonRpcError.INTERNAL_ERROR);
}
});
} catch (DecodeException jsonObjectDecodeException) {
try {
final JsonArray batchJsonRpcRequest = buffer.toJsonArray();
if (batchJsonRpcRequest.isEmpty()) {
errorReturn(socket, null, JsonRpcError.INVALID_REQUEST);
} else {
vertx
.<List<JsonRpcResponse>>executeBlocking(
promise -> {
List<JsonRpcResponse> responses = new ArrayList<>();
for (int i = 0; i < batchJsonRpcRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRpcRequest.getJsonObject(i);
} catch (ClassCastException e) {
responses.add(
new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
responses.add(
jsonRpcExecutor.execute(
Optional.empty(),
null,
null,
closedSocket::get,
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class)));
}
promise.complete(responses);
})
.onSuccess(
jsonRpcBatchResponse -> {
try {
final JsonRpcResponse[] completed =
jsonRpcBatchResponse.stream()
.filter(
jsonRpcResponse ->
jsonRpcResponse.getType()
!= JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
socket.write(
JSON_OBJECT_WRITER.writeValueAsString(completed)
+ '\n');
} catch (JsonProcessingException e) {
LOG.error("Error streaming JSON-RPC response", e);
}
})
.onFailure(
throwable ->
errorReturn(socket, null, JsonRpcError.INTERNAL_ERROR));
}
} catch (DecodeException jsonArrayDecodeException) {
errorReturn(socket, null, JsonRpcError.PARSE_ERROR);
}
}
}
});
});
return netServer
.listen(SocketAddress.domainSocketAddress(path.toString()))
.onSuccess(successServer -> LOG.info("IPC endpoint opened: {}", path))
.onFailure(throwable -> LOG.error("Unable to open IPC endpoint", throwable));
}
public Future<Void> stop() {
if (netServer == null) {
return Future.succeededFuture();
} else {
return netServer
.close()
.onComplete(
closeResult -> {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
LOG.error("Unable to delete IPC file", e);
}
});
}
}
private Future<Void> errorReturn(
final NetSocket socket, final Integer id, final JsonRpcError rpcError) {
return socket.write(Buffer.buffer(Json.encode(new JsonRpcErrorResponse(id, rpcError)) + '\n'));
}
private NetServerOptions buildNetServerOptions() {
return new NetServerOptions();
}
}

@ -14,43 +14,30 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;
import static java.util.stream.Collectors.toList;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;
import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.handlers.RpcMethodTimeoutException;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
@ -68,177 +55,109 @@ public class WebSocketRequestHandler {
.with(Feature.AUTO_CLOSE_TARGET);
private final Vertx vertx;
private final Map<String, JsonRpcMethod> methods;
private final JsonRpcExecutor jsonRpcExecutor;
final EthScheduler ethScheduler;
private final long timeoutSec;
public WebSocketRequestHandler(
final Vertx vertx,
final Map<String, JsonRpcMethod> methods,
final JsonRpcExecutor jsonRpcExecutor,
final EthScheduler ethScheduler,
final long timeoutSec) {
this.vertx = vertx;
this.methods = methods;
this.jsonRpcExecutor = jsonRpcExecutor;
this.ethScheduler = ethScheduler;
this.timeoutSec = timeoutSec;
}
// Only for testing
public void handle(final ServerWebSocket websocket, final String payload) {
handle(Optional.empty(), websocket, payload, Optional.empty(), Collections.emptyList());
}
public void handle(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final String payload,
final Optional<User> user,
final Collection<String> noAuthApiMethods) {
vertx.executeBlocking(
executeHandler(authenticationService, websocket, payload, user, noAuthApiMethods),
false,
resultHandler(websocket));
}
private Handler<Promise<Object>> executeHandler(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final String payload,
final Optional<User> user,
final Collection<String> noAuthApiMethods) {
return future -> {
final String json = payload.trim();
if (!json.isEmpty() && json.charAt(0) == '{') {
final ServerWebSocket websocket, final Buffer buffer, final Optional<User> user) {
if (buffer.length() == 0) {
replyToClient(websocket, errorResponse(null, JsonRpcError.INVALID_REQUEST));
} else {
try {
final JsonObject jsonRpcRequest = buffer.toJsonObject();
vertx
.<JsonRpcResponse>executeBlocking(
promise -> {
try {
final JsonRpcResponse jsonRpcResponse =
jsonRpcExecutor.execute(
user,
null,
null,
new IsAliveHandler(ethScheduler, timeoutSec),
jsonRpcRequest,
req -> {
final WebSocketRpcRequest websocketRequest =
req.mapTo(WebSocketRpcRequest.class);
websocketRequest.setConnectionId(websocket.textHandlerID());
return websocketRequest;
});
promise.complete(jsonRpcResponse);
} catch (RuntimeException e) {
promise.fail(e);
}
})
.onSuccess(jsonRpcResponse -> replyToClient(websocket, jsonRpcResponse))
.onFailure(
throwable -> {
try {
final Integer id = jsonRpcRequest.getInteger("id", null);
replyToClient(websocket, errorResponse(id, JsonRpcError.INTERNAL_ERROR));
} catch (ClassCastException idNotIntegerException) {
replyToClient(websocket, errorResponse(null, JsonRpcError.INTERNAL_ERROR));
}
});
} catch (DecodeException jsonObjectDecodeException) {
try {
handleSingleRequest(
authenticationService,
websocket,
user,
future,
getRequest(payload),
noAuthApiMethods);
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
return;
}
} else if (json.length() == 0) {
future.complete(errorResponse(null, INVALID_REQUEST));
return;
} else {
final JsonArray jsonArray = new JsonArray(json);
if (jsonArray.size() < 1) {
future.complete(errorResponse(null, INVALID_REQUEST));
return;
final JsonArray batchJsonRpcRequest = buffer.toJsonArray();
vertx
.<List<JsonRpcResponse>>executeBlocking(
promise -> {
List<JsonRpcResponse> responses = new ArrayList<>();
for (int i = 0; i < batchJsonRpcRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRpcRequest.getJsonObject(i);
} catch (ClassCastException e) {
responses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
responses.add(
jsonRpcExecutor.execute(
user,
null,
null,
new IsAliveHandler(ethScheduler, timeoutSec),
jsonRequest,
req -> {
final WebSocketRpcRequest websocketRequest =
req.mapTo(WebSocketRpcRequest.class);
websocketRequest.setConnectionId(websocket.textHandlerID());
return websocketRequest;
}));
}
promise.complete(responses);
})
.onSuccess(
jsonRpcBatchResponse -> {
final JsonRpcResponse[] completed =
jsonRpcBatchResponse.stream()
.filter(
jsonRpcResponse ->
jsonRpcResponse.getType() != JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
replyToClient(websocket, completed);
})
.onFailure(
throwable ->
replyToClient(websocket, errorResponse(null, JsonRpcError.INTERNAL_ERROR)));
} catch (RuntimeException jsonArrayDecodeException) {
replyToClient(websocket, errorResponse(null, JsonRpcError.INTERNAL_ERROR));
}
// handle batch request
LOG.debug("batch request size {}", jsonArray.size());
handleJsonBatchRequest(authenticationService, websocket, jsonArray, user, noAuthApiMethods);
}
};
}
private JsonRpcResponse process(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final Optional<User> user,
final WebSocketRpcRequest requestBody,
final Collection<String> noAuthApiMethods) {
if (!methods.containsKey(requestBody.getMethod())) {
LOG.debug("Can't find method {}", requestBody.getMethod());
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.METHOD_NOT_FOUND);
}
final JsonRpcMethod method = methods.get(requestBody.getMethod());
try {
LOG.debug("WS-RPC request -> {}", requestBody.getMethod());
requestBody.setConnectionId(websocket.textHandlerID());
if (authenticationService.isEmpty()
|| (authenticationService.isPresent()
&& authenticationService.get().isPermitted(user, method, noAuthApiMethods))) {
final JsonRpcRequestContext requestContext =
new JsonRpcRequestContext(
requestBody, user, new IsAliveHandler(ethScheduler, timeoutSec));
return method.response(requestContext);
} else {
return new JsonRpcUnauthorizedResponse(requestBody.getId(), JsonRpcError.UNAUTHORIZED);
}
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.INVALID_PARAMS);
} catch (final RpcMethodTimeoutException e) {
LOG.error(JsonRpcError.TIMEOUT_ERROR.getMessage(), e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.TIMEOUT_ERROR);
} catch (final Exception e) {
LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.INTERNAL_ERROR);
}
}
private void handleSingleRequest(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final Optional<User> user,
final Promise<Object> future,
final WebSocketRpcRequest requestBody,
final Collection<String> noAuthApiMethods) {
future.complete(process(authenticationService, websocket, user, requestBody, noAuthApiMethods));
}
@SuppressWarnings("rawtypes")
private void handleJsonBatchRequest(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final JsonArray jsonArray,
final Optional<User> user,
final Collection<String> noAuthApiMethods) {
// Interpret json as rpc request
final List<Future> responses =
jsonArray.stream()
.map(
obj -> {
if (!(obj instanceof JsonObject)) {
return Future.succeededFuture(errorResponse(null, INVALID_REQUEST));
}
final JsonObject req = (JsonObject) obj;
return vertx.<JsonRpcResponse>executeBlocking(
future ->
future.complete(
process(
authenticationService,
websocket,
user,
getRequest(req.toString()),
noAuthApiMethods)));
})
.collect(toList());
CompositeFuture.all(responses)
.onComplete(
(res) -> {
final JsonRpcResponse[] completed =
res.result().list().stream()
.map(JsonRpcResponse.class::cast)
.filter(this::isNonEmptyResponses)
.toArray(JsonRpcResponse[]::new);
replyToClient(websocket, completed);
});
}
private WebSocketRpcRequest getRequest(final String payload) {
return Json.decodeValue(payload, WebSocketRpcRequest.class);
}
private Handler<AsyncResult<Object>> resultHandler(final ServerWebSocket websocket) {
return result -> {
if (result.succeeded()) {
replyToClient(websocket, result.result());
} else {
replyToClient(websocket, new JsonRpcErrorResponse(null, JsonRpcError.INTERNAL_ERROR));
}
};
}
private void replyToClient(final ServerWebSocket websocket, final Object result) {
@ -253,8 +172,4 @@ public class WebSocketRequestHandler {
private JsonRpcResponse errorResponse(final Object id, final JsonRpcError error) {
return new JsonRpcErrorResponse(id, error);
}
private boolean isNonEmptyResponses(final JsonRpcResponse result) {
return result.getType() != JsonRpcResponseType.NONE;
}
}

@ -32,6 +32,7 @@ import com.google.common.collect.Iterables;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
@ -126,7 +127,7 @@ public class WebSocketService {
LOG.debug("Websocket Connected ({})", socketAddressAsString(socketAddress));
websocket.binaryMessageHandler(
final Handler<Buffer> socketHandler =
buffer -> {
LOG.debug(
"Received Websocket request (binary frame) {} ({})",
@ -137,52 +138,13 @@ public class WebSocketService {
authenticationService
.get()
.authenticate(
token,
user ->
websocketRequestHandler.handle(
authenticationService,
websocket,
buffer.toString(),
user,
configuration.getRpcApisNoAuth()));
token, user -> websocketRequestHandler.handle(websocket, buffer, user));
} else {
websocketRequestHandler.handle(
Optional.empty(),
websocket,
buffer.toString(),
Optional.empty(),
configuration.getRpcApisNoAuth());
websocketRequestHandler.handle(websocket, buffer, Optional.empty());
}
});
websocket.textMessageHandler(
payload -> {
LOG.debug(
"Received Websocket request {} ({})",
payload,
socketAddressAsString(socketAddress));
if (authenticationService.isPresent()) {
authenticationService
.get()
.authenticate(
token,
user ->
websocketRequestHandler.handle(
authenticationService,
websocket,
payload,
user,
configuration.getRpcApisNoAuth()));
} else {
websocketRequestHandler.handle(
Optional.empty(),
websocket,
payload,
Optional.empty(),
configuration.getRpcApisNoAuth());
}
});
};
websocket.textMessageHandler(text -> socketHandler.handle(Buffer.buffer(text)));
websocket.binaryMessageHandler(socketHandler);
websocket.closeHandler(
v -> {

@ -1413,13 +1413,17 @@ public class JsonRpcHttpServiceTest extends JsonRpcHttpServiceTestBase {
when(jsonRpcMethod.getName()).thenReturn("foo");
when(jsonRpcMethod.response(any())).thenThrow(new RuntimeException("test exception"));
doReturn(Optional.of(jsonRpcMethod)).when(rpcMethods).get("foo");
doReturn(jsonRpcMethod).when(rpcMethods).get("foo");
final RequestBody body =
RequestBody.create(JSON, "{\"jsonrpc\":\"2.0\",\"id\":\"666\",\"method\":\"foo\"}");
try (final Response resp = client.newCall(buildPostRequest(body)).execute()) {
assertThat(resp.code()).isEqualTo(500);
assertThat(resp.code()).isEqualTo(200);
final JsonObject json = new JsonObject(resp.body().string());
final JsonRpcError expectedError = JsonRpcError.INTERNAL_ERROR;
testHelper.assertValidJsonRpcError(
json, "666", expectedError.getCode(), expectedError.getMessage());
}
}
@ -1428,7 +1432,7 @@ public class JsonRpcHttpServiceTest extends JsonRpcHttpServiceTestBase {
final JsonRpcMethod jsonRpcMethod = mock(JsonRpcMethod.class);
when(jsonRpcMethod.getName()).thenReturn("foo");
when(jsonRpcMethod.response(any())).thenThrow(new RuntimeException("test exception"));
doReturn(Optional.of(jsonRpcMethod)).when(rpcMethods).get("foo");
doReturn(jsonRpcMethod).when(rpcMethods).get("foo");
final RequestBody body =
RequestBody.create(
@ -1438,7 +1442,13 @@ public class JsonRpcHttpServiceTest extends JsonRpcHttpServiceTestBase {
+ "{\"jsonrpc\":\"2.0\",\"id\":\"222\",\"method\":\"net_version\"}]");
try (final Response resp = client.newCall(buildPostRequest(body)).execute()) {
assertThat(resp.code()).isEqualTo(400);
assertThat(resp.code()).isEqualTo(200);
final JsonArray array = new JsonArray(resp.body().string());
testHelper.assertValidJsonRpcResult(array.getJsonObject(0), "000");
final JsonRpcError expectedError = JsonRpcError.INTERNAL_ERROR;
testHelper.assertValidJsonRpcError(
array.getJsonObject(1), "111", expectedError.getCode(), expectedError.getMessage());
testHelper.assertValidJsonRpcResult(array.getJsonObject(2), "222");
}
}

@ -0,0 +1,195 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.ipc;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
@EnabledOnOs({OS.LINUX, OS.MAC})
@ExtendWith(VertxExtension.class)
class JsonRpcIpcServiceTest {
@TempDir private Path tempDir;
private Vertx vertx;
private VertxTestContext testContext;
@BeforeEach
public void setUp() {
vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
testContext = new VertxTestContext();
}
@AfterEach
public void after() throws Throwable {
assertThat(testContext.awaitCompletion(5, TimeUnit.SECONDS))
.describedAs("Test completed on time")
.isTrue();
if (testContext.failed()) {
throw testContext.causeOfFailure();
}
}
@Test
void successfulExecution() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final JsonRpcMethod testMethod = mock(JsonRpcMethod.class);
when(testMethod.response(any())).thenReturn(new JsonRpcSuccessResponse(1, "TEST OK"));
final JsonRpcIpcService service =
new JsonRpcIpcService(
vertx,
socketPath,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), Map.of("test_method", testMethod)));
final String expectedResponse = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"TEST OK\"}\n";
assertSocketCall(
service,
socketPath,
expectedResponse,
new JsonObject().put("id", 1).put("method", "test_method").toBuffer());
}
@Test
void successfulBatchExecution() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final JsonRpcMethod fooMethod = mock(JsonRpcMethod.class);
when(fooMethod.response(any())).thenReturn(new JsonRpcSuccessResponse(1, "FOO OK"));
final JsonRpcMethod barMethod = mock(JsonRpcMethod.class);
when(barMethod.response(any())).thenReturn(new JsonRpcSuccessResponse(2, "BAR OK"));
final JsonRpcIpcService service =
new JsonRpcIpcService(
vertx,
socketPath,
new JsonRpcExecutor(
new BaseJsonRpcProcessor(),
Map.of("foo_method", fooMethod, "bar_method", barMethod)));
assertSocketCall(
service,
socketPath,
"[{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"FOO OK\"},{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":\"BAR OK\"}]\n",
new JsonArray(
Arrays.asList(
new JsonObject().put("id", 1).put("method", "foo_method"),
new JsonObject().put("id", 2).put("method", "bar_method")))
.toBuffer());
}
@Test
void validJsonButNotRpcShouldReturnInvalidRequest() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final JsonRpcIpcService service =
new JsonRpcIpcService(
vertx,
socketPath,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), Collections.emptyMap()));
final String expectedResponse =
"{\"jsonrpc\":\"2.0\",\"id\":null,\"error\":{\"code\":-32600,\"message\":\"Invalid Request\"}}\n";
assertSocketCall(service, socketPath, expectedResponse, Buffer.buffer("{\"foo\":\"bar\"}"));
}
@Test
void nonJsonRequestShouldReturnParseError() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final JsonRpcIpcService service =
new JsonRpcIpcService(vertx, socketPath, mock(JsonRpcExecutor.class));
final String expectedResponse =
"{\"jsonrpc\":\"2.0\",\"id\":null,\"error\":{\"code\":-32700,\"message\":\"Parse error\"}}\n";
assertSocketCall(service, socketPath, expectedResponse, Buffer.buffer("bad request"));
}
@Test
void shouldDeleteSocketFileOnStop() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final JsonRpcIpcService service =
new JsonRpcIpcService(vertx, socketPath, mock(JsonRpcExecutor.class));
service
.start()
.onComplete(
testContext.succeeding(
server ->
service
.stop()
.onComplete(
testContext.succeeding(
handler ->
testContext.verify(
() -> {
assertThat(socketPath).doesNotExist();
testContext.completeNow();
})))));
}
private void assertSocketCall(
final JsonRpcIpcService service,
final Path socketPath,
final String expectedResponse,
final Buffer request) {
service
.start()
.onComplete(
testContext.succeeding(
server ->
vertx
.createNetClient()
.connect(SocketAddress.domainSocketAddress(socketPath.toString()))
.onComplete(
testContext.succeeding(
socket ->
socket
.handler(
buffer ->
testContext.verify(
() -> {
assertThat(buffer)
.hasToString(expectedResponse);
service
.stop()
.onComplete(
testContext.succeedingThenComplete());
}))
.write(request)))));
}
}

@ -26,10 +26,10 @@ import org.hyperledger.besu.ethereum.api.handlers.TimeoutHandler;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -52,33 +52,14 @@ public class TimeoutHandlerTest {
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{
Optional.empty(),
ETH_GET_LOGS,
body(ETH_GET_LOGS),
DEFAULT_OPTS.getTimeoutSeconds(),
true
},
{
Optional.empty(),
ETH_GET_LOGS,
body(ETH_BLOCK_NUMBER),
DEFAULT_OPTS.getTimeoutSeconds(),
false
},
{
Optional.of(DEFAULT_OPTS),
ETH_GET_LOGS,
body(ETH_BLOCK_NUMBER),
DEFAULT_OPTS.getTimeoutSeconds(),
true
}
{Optional.empty(), ETH_GET_LOGS, DEFAULT_OPTS.getTimeoutSeconds(), true},
{Optional.empty(), ETH_BLOCK_NUMBER, DEFAULT_OPTS.getTimeoutSeconds(), false},
{Optional.of(DEFAULT_OPTS), ETH_BLOCK_NUMBER, DEFAULT_OPTS.getTimeoutSeconds(), true}
});
}
private static final TimeoutOptions DEFAULT_OPTS = TimeoutOptions.defaultOptions();
private final Optional<TimeoutOptions> globalOptions;
private final String body;
private final RpcMethod method;
private final long timeoutSec;
private final boolean timerMustBeSet;
@ -86,11 +67,9 @@ public class TimeoutHandlerTest {
public TimeoutHandlerTest(
final Optional<TimeoutOptions> globalOptions,
final RpcMethod method,
final String body,
final long timeoutSec,
final boolean timerMustBeSet) {
this.globalOptions = globalOptions;
this.body = body;
this.method = method;
this.timeoutSec = timeoutSec;
this.timerMustBeSet = timerMustBeSet;
@ -98,22 +77,27 @@ public class TimeoutHandlerTest {
@Test
public void test() {
final Map<String, TimeoutOptions> options =
ImmutableMap.of(
method.getMethodName(), new TimeoutOptions(timeoutSec, DEFAULT_OPTS.getErrorCode()));
final Handler<RoutingContext> handler = TimeoutHandler.handler(globalOptions, options, true);
final Map<String, TimeoutOptions> options;
if (timerMustBeSet) {
options =
ImmutableMap.of(
method.getMethodName(), new TimeoutOptions(timeoutSec, DEFAULT_OPTS.getErrorCode()));
} else {
options = Collections.emptyMap();
}
final Handler<RoutingContext> handler = TimeoutHandler.handler(globalOptions, options);
final RoutingContext ctx = Mockito.spy(RoutingContext.class);
final Vertx vertx = Mockito.spy(Vertx.class);
when(ctx.getBodyAsString()).thenReturn(body);
final JsonObject requestBody = Mockito.mock(JsonObject.class);
when(requestBody.getString("method")).thenReturn(method.getMethodName());
when(ctx.data()).thenReturn(Map.of(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name(), requestBody));
when(ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())).thenReturn(requestBody);
when(ctx.vertx()).thenReturn(vertx);
handler.handle(ctx);
verify(ctx).put(eq(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name()), any());
verify(vertx, times(timerMustBeSet ? 1 : 0))
.setTimer(eq(TimeUnit.SECONDS.toMillis(timeoutSec)), any());
verify(ctx, times(timerMustBeSet ? 1 : 0)).addBodyEndHandler(any());
}
private static String body(final RpcMethod method) {
return Json.encodePrettily(new JsonRpcRequest("2.0", method.getMethodName(), null));
}
}

@ -20,6 +20,8 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
@ -78,7 +80,7 @@ public class WebSocketHostAllowlistTest {
spy(
new WebSocketRequestHandler(
vertx,
websocketMethods,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), websocketMethods),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));

@ -22,6 +22,8 @@ import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
@ -35,9 +37,12 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.json.Json;
@ -78,7 +83,7 @@ public class WebSocketRequestHandlerTest {
handler =
new WebSocketRequestHandler(
vertx,
methods,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@ -105,7 +110,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handler.handle(websocketMock, requestJson.toString());
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
@ -133,7 +138,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handler.handle(websocketMock, arrayJson.toString());
handler.handle(websocketMock, arrayJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
@ -152,7 +157,7 @@ public class WebSocketRequestHandlerTest {
WebSocketRequestHandler handleBadCalls =
new WebSocketRequestHandler(
vertx,
methods,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds());
@ -170,7 +175,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handleBadCalls.handle(websocketMock, arrayJson.toString());
handleBadCalls.handle(websocketMock, arrayJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
@ -189,7 +194,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handler.handle(websocketMock, "");
handler.handle(websocketMock, Buffer.buffer(), Optional.empty());
async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
@ -208,7 +213,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handler.handle(websocketMock, "{}");
handler.handle(websocketMock, new JsonObject().toBuffer(), Optional.empty());
async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
@ -229,7 +234,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handler.handle(websocketMock, requestJson.toString());
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
@ -253,7 +258,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handler.handle(websocketMock, requestJson.toString());
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
@ -275,7 +280,7 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
handler.handle(websocketMock, requestJson.toString());
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
@ -292,10 +297,10 @@ public class WebSocketRequestHandlerTest {
return frame.isFinal();
}
private Answer<ServerWebSocket> completeOnLastFrame(final Async async) {
private Answer<Future<Void>> completeOnLastFrame(final Async async) {
return invocation -> {
async.complete();
return websocketMock;
return Future.succeededFuture();
};
}
}

@ -29,6 +29,10 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.DefaultAuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.AuthenticatedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.filter.FilterManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.methods.JsonRpcMethodsFactory;
@ -187,7 +191,12 @@ public class WebSocketServiceLoginTest {
spy(
new WebSocketRequestHandler(
vertx,
websocketMethods,
new JsonRpcExecutor(
new AuthenticatedJsonRpcProcessor(
new BaseJsonRpcProcessor(),
DefaultAuthenticationService.create(vertx, websocketConfiguration).get(),
websocketConfiguration.getRpcApisNoAuth()),
websocketMethods),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));

@ -21,6 +21,8 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
@ -81,7 +83,7 @@ public class WebSocketServiceTest {
spy(
new WebSocketRequestHandler(
vertx,
websocketMethods,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), websocketMethods),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));

@ -21,6 +21,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
@ -33,9 +35,11 @@ import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
@ -69,7 +73,7 @@ public class EthSubscribeIntegrationTest {
webSocketRequestHandler =
new WebSocketRequestHandler(
vertx,
webSocketMethodsFactory.methods(),
new JsonRpcExecutor(new BaseJsonRpcProcessor(), webSocketMethodsFactory.methods()),
Mockito.mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@ -88,7 +92,8 @@ public class EthSubscribeIntegrationTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame)))
.then(completeOnLastFrame(async, websocketMock));
webSocketRequestHandler.handle(websocketMock, Json.encode(subscribeRequestBody));
webSocketRequestHandler.handle(
websocketMock, Json.encodeToBuffer(subscribeRequestBody), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);
@ -113,16 +118,16 @@ public class EthSubscribeIntegrationTest {
final ServerWebSocket websocketMock1 = mock(ServerWebSocket.class);
when(websocketMock1.textHandlerID()).thenReturn(CONNECTION_ID_1);
when(websocketMock1.writeFrame(argThat(this::isFinalFrame)))
.then(countDownOnLastFrame(async, websocketMock1));
when(websocketMock1.writeFrame(argThat(this::isFinalFrame))).then(countDownOnLastFrame(async));
final ServerWebSocket websocketMock2 = mock(ServerWebSocket.class);
when(websocketMock2.textHandlerID()).thenReturn(CONNECTION_ID_2);
when(websocketMock2.writeFrame(argThat(this::isFinalFrame)))
.then(countDownOnLastFrame(async, websocketMock2));
when(websocketMock2.writeFrame(argThat(this::isFinalFrame))).then(countDownOnLastFrame(async));
webSocketRequestHandler.handle(websocketMock1, Json.encode(subscribeRequestBody1));
webSocketRequestHandler.handle(websocketMock2, Json.encode(subscribeRequestBody2));
webSocketRequestHandler.handle(
websocketMock1, Json.encodeToBuffer(subscribeRequestBody1), Optional.empty());
webSocketRequestHandler.handle(
websocketMock2, Json.encodeToBuffer(subscribeRequestBody2), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);
@ -178,11 +183,10 @@ public class EthSubscribeIntegrationTest {
};
}
private Answer<ServerWebSocket> countDownOnLastFrame(
final Async async, final ServerWebSocket websocket) {
private Answer<Future<Void>> countDownOnLastFrame(final Async async) {
return invocation -> {
async.countDown();
return websocket;
return Future.succeededFuture();
};
}
}

@ -21,6 +21,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
@ -31,7 +33,9 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.HashMap;
import java.util.Optional;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
@ -63,7 +67,7 @@ public class EthUnsubscribeIntegrationTest {
webSocketRequestHandler =
new WebSocketRequestHandler(
vertx,
webSocketMethodsFactory.methods(),
new JsonRpcExecutor(new BaseJsonRpcProcessor(), webSocketMethodsFactory.methods()),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds());
}
@ -86,10 +90,10 @@ public class EthUnsubscribeIntegrationTest {
final ServerWebSocket websocketMock = mock(ServerWebSocket.class);
when(websocketMock.textHandlerID()).thenReturn(CONNECTION_ID);
when(websocketMock.writeFrame(argThat(this::isFinalFrame)))
.then(completeOnLastFrame(async, websocketMock));
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
webSocketRequestHandler.handle(websocketMock, Json.encode(unsubscribeRequestBody));
webSocketRequestHandler.handle(
websocketMock, Json.encodeToBuffer(unsubscribeRequestBody), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);
assertThat(subscriptionManager.getSubscriptionById(subscriptionId)).isNull();
@ -118,10 +122,10 @@ public class EthUnsubscribeIntegrationTest {
final ServerWebSocket websocketMock = mock(ServerWebSocket.class);
when(websocketMock.textHandlerID()).thenReturn(CONNECTION_ID);
when(websocketMock.writeFrame(argThat(this::isFinalFrame)))
.then(completeOnLastFrame(async, websocketMock));
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
webSocketRequestHandler.handle(websocketMock, Json.encode(unsubscribeRequestBody));
webSocketRequestHandler.handle(
websocketMock, Json.encodeToBuffer(unsubscribeRequestBody), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);
assertThat(subscriptionManager.getSubscriptionById(subscriptionId1)).isNotNull();
@ -149,11 +153,10 @@ public class EthUnsubscribeIntegrationTest {
return frame.isFinal();
}
private Answer<ServerWebSocket> completeOnLastFrame(
final Async async, final ServerWebSocket websocket) {
private Answer<Future<Void>> completeOnLastFrame(final Async async) {
return invocation -> {
async.complete();
return websocket;
return Future.succeededFuture();
};
}
}

@ -72,6 +72,7 @@ downloadLicenses {
ext.cc0 = license('Public Domain (CC0) License 1.0', 'https://creativecommons.org/publicdomain/zero/1.0')
ext.cddl = license('Common Development and Distribution License 1.0', 'http://opensource.org/licenses/CDDL-1.0')
ext.epl1 = license('Eclipse Public License 1.0', 'https://www.eclipse.org/legal/epl-v10.html')
ext.epl2 = license('Eclipse Public License - v 2.0', 'https://www.eclipse.org/legal/epl-v20.html')
ext.mit = license('MIT License', 'http://www.opensource.org/licenses/mit-license.php')
aliases = [
(apache) : [
@ -123,6 +124,10 @@ downloadLicenses {
'Eclipse Public License - v 1.0',
'Eclipse Public License 1.0 ',
],
(epl2) : [
'Eclipse Public License - v 2.0',
'Eclipse Public License version 2.0'
],
(mit) : [
'MIT',
'The MIT License',
@ -161,6 +166,9 @@ downloadLicenses {
// Explicitly declare that we are using the EPL 1.0 license
(group('javax.persistence')) : epl1,
// jnr-posix is released under a tri EPL v2.0/GPL/LGPL license
'com.github.jnr:jnr-posix:3.0.47' : epl2,
// io.netty:netty-tcnative-boringssl-static license markings are not machine readable.
// io.netty:netty-tcnative-classes license markings are not machine readable.
// Explicitly state Apache 2 License for license scanning.

@ -69,6 +69,7 @@ dependencyManagement {
dependency 'io.netty:netty-all:4.1.74.Final'
dependency 'io.netty:netty-tcnative-boringssl-static:2.0.50.Final'
dependency group: 'io.netty', name: 'netty-transport-native-epoll', version:'4.1.74.Final', classifier: 'linux-x86_64'
dependency group: 'io.netty', name: 'netty-transport-native-kqueue', version:'4.1.74.Final', classifier: 'osx-x86_64'
dependency 'io.opentelemetry:opentelemetry-api:1.6.0'
dependency 'io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.6.0-alpha'
@ -97,6 +98,7 @@ dependencyManagement {
entry 'vertx-auth-jwt'
entry 'vertx-codegen'
entry 'vertx-core'
entry 'vertx-junit5'
entry 'vertx-unit'
entry 'vertx-web'
entry 'vertx-web-client'
@ -172,9 +174,7 @@ dependencyManagement {
dependency 'org.web3j:abi:4.8.9'
dependency 'org.web3j:besu:4.8.9'
dependency('org.web3j:core:4.8.9') {
exclude group: 'com.github.jnr', name: 'jnr-unixsocket'
}
dependency 'org.web3j:core:4.8.9'
dependency 'org.web3j:crypto:4.8.9'
dependency 'org.web3j:quorum:4.8.4'

Loading…
Cancel
Save