3619 unify rpc ports (#3834)

* Introduces new JsonRPCService that handles both http and websockets
* Removes websocket specific configs
* renames WebSocketRequestHandler to WebSocketMessageHandler to be more in line with websocket semantics.

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

Signed-off-by: Justin Florentine <justin+github@florentine.us>

Co-authored-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/3852/head
Justin Florentine 3 years ago committed by GitHub
parent 8bbcea682e
commit e9884b2130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/BesuNode.java
  2. 6
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java
  3. 7
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/configuration/BesuNodeConfiguration.java
  4. 15
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/configuration/BesuNodeConfigurationBuilder.java
  5. 9
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/configuration/BesuNodeFactory.java
  6. 2
      acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/configuration/NodeConfiguration.java
  7. 2
      acceptance-tests/tests/src/test/java/org/hyperledger/besu/tests/acceptance/jsonrpc/ExecutionEngineAcceptanceTest.java
  8. 16
      besu/src/main/java/org/hyperledger/besu/Runner.java
  9. 177
      besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
  10. 51
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  11. 30
      besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java
  12. 12
      besu/src/test/java/org/hyperledger/besu/RunnerBuilderTest.java
  13. 7
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  14. 1
      besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java
  15. 4
      besu/src/test/resources/everything_config.toml
  16. 5
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcConfiguration.java
  17. 652
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcService.java
  18. 2
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/authentication/EngineAuthService.java
  19. 6
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketMessageHandler.java
  20. 14
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketService.java
  21. 10
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketHostAllowlistTest.java
  22. 22
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketMessageHandlerTest.java
  23. 10
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketServiceLoginTest.java
  24. 12
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketServiceTest.java
  25. 14
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/methods/EthSubscribeIntegrationTest.java
  26. 12
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/methods/EthUnsubscribeIntegrationTest.java
  27. 6
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/NetworkingConfiguration.java

@ -298,7 +298,7 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
}
}
public Optional<String> engineHttpUrl() {
public Optional<String> engineRpcUrl() {
if (isEngineRpcEnabled()) {
final Optional<Integer> maybeEngineRpcPort = getEngineJsonRpcPort();
if (maybeEngineRpcPort.isEmpty()) {
@ -369,15 +369,6 @@ public class BesuNode implements NodeConfiguration, RunnableNode, AutoCloseable
}
}
@Override
public Optional<Integer> getEngineJsonRpcWebSocketPort() {
if (isWebSocketsRpcEnabled()) {
return Optional.of(Integer.valueOf(portsProperties.getProperty("engine-ws-rpc")));
} else {
return Optional.empty();
}
}
@Override
public String getHostName() {
return LOCALHOST;

@ -194,7 +194,8 @@ public class ProcessBesuNodeRunner implements BesuNodeRunner {
params.add("--Xmerge-support");
params.add("true");
params.add("--engine-rpc-http-port");
params.add("--engine-rpc-enabled");
params.add("--engine-rpc-port");
params.add(node.jsonEngineListenPort().get().toString());
}
@ -226,9 +227,6 @@ public class ProcessBesuNodeRunner implements BesuNodeRunner {
params.add("--rpc-ws-authentication-jwt-algorithm");
params.add(node.webSocketConfiguration().getAuthenticationAlgorithm().toString());
}
// TODO: properly handle engine rpc, set port to 0 to make tests pass
params.add("--engine-rpc-ws-port");
params.add("0");
}
if (node.isJsonRpcIpcEnabled()) {

@ -41,7 +41,6 @@ public class BesuNodeConfiguration {
private final Optional<JsonRpcConfiguration> engineRpcConfiguration;
private final WebSocketConfiguration webSocketConfiguration;
private final JsonRpcIpcConfiguration jsonRpcIpcConfiguration;
private final Optional<WebSocketConfiguration> engineWebSocketConfiguration;
private final MetricsConfiguration metricsConfiguration;
private final Optional<PermissioningConfiguration> permissioningConfiguration;
private final Optional<String> keyFilePath;
@ -75,7 +74,6 @@ public class BesuNodeConfiguration {
final Optional<JsonRpcConfiguration> engineRpcConfiguration,
final WebSocketConfiguration webSocketConfiguration,
final JsonRpcIpcConfiguration jsonRpcIpcConfiguration,
final Optional<WebSocketConfiguration> engineWebSocketConfiguration,
final MetricsConfiguration metricsConfiguration,
final Optional<PermissioningConfiguration> permissioningConfiguration,
final Optional<String> keyFilePath,
@ -106,7 +104,6 @@ public class BesuNodeConfiguration {
this.engineRpcConfiguration = engineRpcConfiguration;
this.webSocketConfiguration = webSocketConfiguration;
this.jsonRpcIpcConfiguration = jsonRpcIpcConfiguration;
this.engineWebSocketConfiguration = engineWebSocketConfiguration;
this.metricsConfiguration = metricsConfiguration;
this.permissioningConfiguration = permissioningConfiguration;
this.keyFilePath = keyFilePath;
@ -158,10 +155,6 @@ public class BesuNodeConfiguration {
return jsonRpcIpcConfiguration;
}
public Optional<WebSocketConfiguration> getEngineWebSocketConfiguration() {
return engineWebSocketConfiguration;
}
public MetricsConfiguration getMetricsConfiguration() {
return metricsConfiguration;
}

@ -61,8 +61,6 @@ public class BesuNodeConfigurationBuilder {
private JsonRpcConfiguration engineRpcConfiguration = JsonRpcConfiguration.createEngineDefault();
private WebSocketConfiguration webSocketConfiguration = WebSocketConfiguration.createDefault();
private JsonRpcIpcConfiguration jsonRpcIpcConfiguration = new JsonRpcIpcConfiguration();
private WebSocketConfiguration engineWebSocketConfiguration =
WebSocketConfiguration.createDefault();
private MetricsConfiguration metricsConfiguration = MetricsConfiguration.builder().build();
private Optional<PermissioningConfiguration> permissioningConfiguration = Optional.empty();
private String keyFilePath = null;
@ -92,8 +90,6 @@ public class BesuNodeConfigurationBuilder {
// Check connections more frequently during acceptance tests to cut down on
// intermittent failures due to the fact that we're running over a real network
networkingConfiguration.setInitiateConnectionsFrequency(5);
engineRpcConfiguration.setPort(JsonRpcConfiguration.DEFAULT_ENGINE_JSON_RPC_PORT);
engineWebSocketConfiguration.setPort(WebSocketConfiguration.DEFAULT_WEBSOCKET_ENGINE_PORT);
}
public BesuNodeConfigurationBuilder name(final String name) {
@ -139,12 +135,6 @@ public class BesuNodeConfigurationBuilder {
return this;
}
public BesuNodeConfigurationBuilder engineWebSocketConfiguration(
final WebSocketConfiguration engineConfig) {
this.engineWebSocketConfiguration = engineConfig;
return this;
}
public BesuNodeConfigurationBuilder jsonRpcEnabled() {
this.jsonRpcConfiguration.setEnabled(true);
this.jsonRpcConfiguration.setPort(0);
@ -153,8 +143,8 @@ public class BesuNodeConfigurationBuilder {
return this;
}
public BesuNodeConfigurationBuilder engineRpcEnabled() {
this.engineRpcConfiguration.setEnabled(true);
public BesuNodeConfigurationBuilder engineRpcEnabled(final boolean enabled) {
this.engineRpcConfiguration.setEnabled(enabled);
this.engineRpcConfiguration.setPort(0);
this.engineRpcConfiguration.setHostsAllowlist(singletonList("*"));
@ -496,7 +486,6 @@ public class BesuNodeConfigurationBuilder {
Optional.of(engineRpcConfiguration),
webSocketConfiguration,
jsonRpcIpcConfiguration,
Optional.of(engineWebSocketConfiguration),
metricsConfiguration,
permissioningConfiguration,
Optional.ofNullable(keyFilePath),

@ -328,7 +328,12 @@ public class BesuNodeFactory {
}
public BesuNode createNodeWithNoDiscovery(final String name) throws IOException {
return create(new BesuNodeConfigurationBuilder().name(name).discoveryEnabled(false).build());
return create(
new BesuNodeConfigurationBuilder()
.name(name)
.discoveryEnabled(false)
.engineRpcEnabled(false)
.build());
}
public BesuNode createCliqueNode(final String name) throws IOException {
@ -515,7 +520,7 @@ public class BesuNodeFactory {
.bootnodeEligible(false)
.miningEnabled()
.jsonRpcEnabled()
.engineRpcEnabled()
.engineRpcEnabled(true)
.build());
}

@ -36,8 +36,6 @@ public interface NodeConfiguration {
Optional<Integer> getEngineJsonRpcPort();
Optional<Integer> getEngineJsonRpcWebSocketPort();
String getHostName();
boolean isJsonRpcEnabled();

@ -80,7 +80,7 @@ public class ExecutionEngineAcceptanceTest {
final Call preparePayloadRequest =
consensusClient.newCall(
new Request.Builder()
.url(executionEngine.engineHttpUrl().get())
.url(executionEngine.engineRpcUrl().get())
.post(RequestBody.create(testCase.getRequest().toString(), MEDIA_TYPE_JSON))
.build());
final Response response = preparePayloadRequest.execute();

@ -17,6 +17,7 @@ package org.hyperledger.besu;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService;
import org.hyperledger.besu.ethereum.api.query.cache.AutoTransactionLogBloomCachingService;
@ -63,12 +64,11 @@ public class Runner implements AutoCloseable {
private final Optional<EthStatsService> ethStatsService;
private final Optional<GraphQLHttpService> graphQLHttp;
private final Optional<JsonRpcHttpService> jsonRpc;
private final Optional<JsonRpcHttpService> engineJsonRpc;
private final Optional<JsonRpcService> engineJsonRpc;
private final Optional<MetricsService> metrics;
private final Optional<JsonRpcIpcService> ipcJsonRpc;
private final Optional<Path> pidPath;
private final Optional<WebSocketService> webSocketRpc;
private final Optional<WebSocketService> engineWebSocketRpc;
private final TransactionPoolEvictionService transactionPoolEvictionService;
private final BesuController besuController;
@ -82,10 +82,9 @@ public class Runner implements AutoCloseable {
final NetworkRunner networkRunner,
final NatService natService,
final Optional<JsonRpcHttpService> jsonRpc,
final Optional<JsonRpcHttpService> engineJsonRpc,
final Optional<JsonRpcService> engineJsonRpc,
final Optional<GraphQLHttpService> graphQLHttp,
final Optional<WebSocketService> webSocketRpc,
final Optional<WebSocketService> engineWebSocketRpc,
final Optional<JsonRpcIpcService> ipcJsonRpc,
final Optional<StratumServer> stratumServer,
final Optional<MetricsService> metrics,
@ -103,7 +102,6 @@ public class Runner implements AutoCloseable {
this.jsonRpc = jsonRpc;
this.engineJsonRpc = engineJsonRpc;
this.webSocketRpc = webSocketRpc;
this.engineWebSocketRpc = engineWebSocketRpc;
this.ipcJsonRpc = ipcJsonRpc;
this.metrics = metrics;
this.ethStatsService = ethStatsService;
@ -125,8 +123,6 @@ public class Runner implements AutoCloseable {
engineJsonRpc.ifPresent(service -> waitForServiceToStart("engineJsonRpc", service.start()));
graphQLHttp.ifPresent(service -> waitForServiceToStart("graphQLHttp", service.start()));
webSocketRpc.ifPresent(service -> waitForServiceToStart("websocketRpc", service.start()));
engineWebSocketRpc.ifPresent(
service -> waitForServiceToStart("engineWebsocketRpc", service.start()));
ipcJsonRpc.ifPresent(
service ->
waitForServiceToStart(
@ -164,8 +160,6 @@ public class Runner implements AutoCloseable {
engineJsonRpc.ifPresent(service -> waitForServiceToStop("engineJsonRpc", service.stop()));
graphQLHttp.ifPresent(service -> waitForServiceToStop("graphQLHttp", service.stop()));
webSocketRpc.ifPresent(service -> waitForServiceToStop("websocketRpc", service.stop()));
engineWebSocketRpc.ifPresent(
service -> waitForServiceToStop("engineWebsocketRpc", service.stop()));
ipcJsonRpc.ifPresent(
service ->
waitForServiceToStop(
@ -350,10 +344,6 @@ public class Runner implements AutoCloseable {
return webSocketRpc.map(service -> service.socketAddress().getPort());
}
public Optional<Integer> getEngineWebsocketPort() {
return engineWebSocketRpc.map(service -> service.socketAddress().getPort());
}
public Optional<Integer> getMetricsPort() {
if (metrics.isPresent()) {
return metrics.get().getPort();

@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLProvider;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.DefaultAuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.EngineAuthService;
@ -52,7 +53,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.methods.JsonRpcMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketMessageHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.PrivateWebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
@ -174,7 +175,6 @@ public class RunnerBuilder {
private Optional<JsonRpcConfiguration> engineJsonRpcConfiguration = Optional.empty();
private GraphQLConfiguration graphQLConfiguration;
private WebSocketConfiguration webSocketConfiguration;
private Optional<WebSocketConfiguration> engineWebSocketConfiguration = Optional.empty();
private ApiConfiguration apiConfiguration;
private Path dataDir;
private Optional<Path> pidPath = Optional.empty();
@ -319,12 +319,6 @@ public class RunnerBuilder {
return this;
}
public RunnerBuilder engineWebSocketConfiguration(
final WebSocketConfiguration engineWebSocketConfig) {
this.engineWebSocketConfiguration = Optional.of(engineWebSocketConfig);
return this;
}
public RunnerBuilder apiConfiguration(final ApiConfiguration apiConfiguration) {
this.apiConfiguration = apiConfiguration;
return this;
@ -591,7 +585,7 @@ public class RunnerBuilder {
AccountPermissioningController::getAccountLocalConfigPermissioningController);
Optional<JsonRpcHttpService> jsonRpcHttpService = Optional.empty();
Optional<JsonRpcHttpService> engineJsonRpcHttpService = Optional.empty();
if (jsonRpcConfiguration.isEnabled()) {
final Map<String, JsonRpcMethod> nonEngineMethods =
jsonRpcMethods(
@ -631,55 +625,64 @@ public class RunnerBuilder {
nonEngineMethods,
new HealthService(new LivenessCheck()),
new HealthService(new ReadinessCheck(peerNetwork, synchronizer))));
}
if (engineJsonRpcConfiguration.isPresent() && engineJsonRpcConfiguration.get().isEnabled()) {
final Map<String, JsonRpcMethod> engineMethods =
jsonRpcMethods(
protocolSchedule,
context,
besuController,
peerNetwork,
blockchainQueries,
synchronizer,
transactionPool,
miningCoordinator,
metricsSystem,
supportedCapabilities,
engineJsonRpcConfiguration.get().getRpcApis(),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
privacyParameters,
engineJsonRpcConfiguration.get(),
webSocketConfiguration,
metricsConfiguration,
natService,
besuPluginContext.getNamedPlugins(),
dataDir,
rpcEndpointServiceImpl);
Optional<AuthenticationService> authToUse =
engineJsonRpcConfiguration.get().isAuthenticationEnabled()
? Optional.of(
new EngineAuthService(
vertx,
Optional.ofNullable(
engineJsonRpcConfiguration.get().getAuthenticationPublicKeyFile()),
dataDir))
: Optional.empty();
engineJsonRpcHttpService =
Optional.of(
new JsonRpcHttpService(
vertx,
dataDir,
engineJsonRpcConfiguration.orElse(JsonRpcConfiguration.createEngineDefault()),
metricsSystem,
natService,
engineMethods,
authToUse,
new HealthService(new LivenessCheck()),
new HealthService(new ReadinessCheck(peerNetwork, synchronizer))));
}
Optional<JsonRpcService> engineJsonRpcService = Optional.empty();
if (engineJsonRpcConfiguration.isPresent() && engineJsonRpcConfiguration.get().isEnabled()) {
final Map<String, JsonRpcMethod> engineMethods =
jsonRpcMethods(
protocolSchedule,
context,
besuController,
peerNetwork,
blockchainQueries,
synchronizer,
transactionPool,
miningCoordinator,
metricsSystem,
supportedCapabilities,
engineJsonRpcConfiguration.get().getRpcApis(),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
privacyParameters,
engineJsonRpcConfiguration.get(),
webSocketConfiguration,
metricsConfiguration,
natService,
besuPluginContext.getNamedPlugins(),
dataDir,
rpcEndpointServiceImpl);
Optional<AuthenticationService> authToUse =
engineJsonRpcConfiguration.get().isAuthenticationEnabled()
? Optional.of(
new EngineAuthService(
vertx,
Optional.ofNullable(
engineJsonRpcConfiguration.get().getAuthenticationPublicKeyFile()),
dataDir))
: Optional.empty();
WebSocketConfiguration engineSocketConfig =
webSocketConfiguration.isEnabled()
? webSocketConfiguration
: WebSocketConfiguration.createEngineDefault();
engineJsonRpcService =
Optional.of(
new JsonRpcService(
vertx,
dataDir,
engineJsonRpcConfiguration.orElse(JsonRpcConfiguration.createEngineDefault()),
metricsSystem,
natService,
engineMethods,
Optional.ofNullable(engineSocketConfig),
besuController.getProtocolManager().ethContext().getScheduler(),
authToUse,
new HealthService(new LivenessCheck()),
new HealthService(new ReadinessCheck(peerNetwork, synchronizer))));
}
Optional<GraphQLHttpService> graphQLHttpService = Optional.empty();
@ -712,7 +715,6 @@ public class RunnerBuilder {
}
Optional<WebSocketService> webSocketService = Optional.empty();
Optional<WebSocketService> engineWebSocketService = Optional.empty();
if (webSocketConfiguration.isEnabled()) {
final Map<String, JsonRpcMethod> nonEngineMethods =
jsonRpcMethods(
@ -768,56 +770,6 @@ public class RunnerBuilder {
DefaultAuthenticationService.create(vertx, webSocketConfiguration)));
createPrivateTransactionObserver(subscriptionManager, privacyParameters);
if (engineWebSocketConfiguration.isPresent()
&& engineWebSocketConfiguration.get().isEnabled()) {
final Map<String, JsonRpcMethod> engineMethods =
jsonRpcMethods(
protocolSchedule,
context,
besuController,
peerNetwork,
blockchainQueries,
synchronizer,
transactionPool,
miningCoordinator,
metricsSystem,
supportedCapabilities,
engineWebSocketConfiguration.get().getRpcApis(),
filterManager,
accountLocalConfigPermissioningController,
nodeLocalConfigPermissioningController,
privacyParameters,
jsonRpcConfiguration,
engineWebSocketConfiguration.get(),
metricsConfiguration,
natService,
besuPluginContext.getNamedPlugins(),
dataDir,
rpcEndpointServiceImpl);
Optional<AuthenticationService> authToUse =
engineWebSocketConfiguration.get().isAuthenticationEnabled()
? Optional.of(
new EngineAuthService(
vertx,
Optional.ofNullable(
engineWebSocketConfiguration.get().getAuthenticationPublicKeyFile()),
dataDir))
: Optional.empty();
engineWebSocketService =
Optional.of(
createWebsocketService(
vertx,
engineWebSocketConfiguration.get(),
subscriptionManager,
engineMethods,
privacyParameters,
protocolSchedule,
blockchainQueries,
authToUse));
}
}
Optional<MetricsService> metricsService = createMetricsService(vertx, metricsConfiguration);
@ -885,10 +837,9 @@ public class RunnerBuilder {
networkRunner,
natService,
jsonRpcHttpService,
engineJsonRpcHttpService,
engineJsonRpcService,
graphQLHttpService,
webSocketService,
engineWebSocketService,
jsonRpcIpcService,
stratumServer,
metricsService,
@ -1184,15 +1135,15 @@ public class RunnerBuilder {
}
final JsonRpcExecutor jsonRpcExecutor =
new JsonRpcExecutor(jsonRpcProcessor, websocketMethodsFactory.methods());
final WebSocketRequestHandler websocketRequestHandler =
new WebSocketRequestHandler(
final WebSocketMessageHandler websocketMessageHandler =
new WebSocketMessageHandler(
vertx,
jsonRpcExecutor,
besuController.getProtocolManager().ethContext().getScheduler(),
webSocketConfiguration.getTimeoutSec());
return new WebSocketService(
vertx, configuration, websocketRequestHandler, authenticationService);
vertx, configuration, websocketMessageHandler, authenticationService);
}
private Optional<MetricsService> createMetricsService(

@ -29,7 +29,6 @@ import static org.hyperledger.besu.ethereum.api.graphql.GraphQLConfiguration.DEF
import static org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration.DEFAULT_ENGINE_JSON_RPC_PORT;
import static org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration.DEFAULT_JSON_RPC_PORT;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.DEFAULT_RPC_APIS;
import static org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration.DEFAULT_WEBSOCKET_ENGINE_PORT;
import static org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration.DEFAULT_WEBSOCKET_PORT;
import static org.hyperledger.besu.ethereum.permissioning.GoQuorumPermissioningConfiguration.QIP714_DEFAULT_BLOCK;
import static org.hyperledger.besu.metrics.BesuMetricCategory.DEFAULT_METRIC_CATEGORIES;
@ -569,19 +568,16 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
static class EngineRPCOptionGroup {
@Option(
names = {"--engine-rpc-http-port"},
paramLabel = MANDATORY_PORT_FORMAT_HELP,
description = "Port to provide consensus client APIS on (default: ${DEFAULT-VALUE})",
arity = "1")
private final Integer engineRpcHttpPort = DEFAULT_ENGINE_JSON_RPC_PORT;
names = {"--engine-rpc-enabled"},
description = "Set to start the Engine JSON-RPC service (default: ${DEFAULT-VALUE})")
private final Boolean isEngineRpcEnabled = false;
@Option(
names = {"--engine-rpc-ws-port"},
names = {"--engine-rpc-port"},
paramLabel = MANDATORY_PORT_FORMAT_HELP,
description =
"Port for Execution Engine JSON-RPC WebSocket service to listen on (default: ${DEFAULT-VALUE})",
description = "Port to provide consensus client APIS on (default: ${DEFAULT-VALUE})",
arity = "1")
private final Integer engineRpcWsPort = DEFAULT_WEBSOCKET_ENGINE_PORT;
private final Integer engineRpcPort = DEFAULT_ENGINE_JSON_RPC_PORT;
@Option(
names = {"--engine-jwt-secret"},
@ -1295,7 +1291,6 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
private JsonRpcConfiguration engineJsonRpcConfiguration;
private GraphQLConfiguration graphQLConfiguration;
private WebSocketConfiguration webSocketConfiguration;
private WebSocketConfiguration engineWebSocketConfiguration;
private JsonRpcIpcConfiguration jsonRpcIpcConfiguration;
private ApiConfiguration apiConfiguration;
private MetricsConfiguration metricsConfiguration;
@ -1601,7 +1596,6 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
jsonRpcConfiguration,
engineJsonRpcConfiguration,
webSocketConfiguration,
engineWebSocketConfiguration,
jsonRpcIpcConfiguration,
apiConfiguration,
metricsConfiguration,
@ -1920,7 +1914,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
jsonRPCHttpOptionGroup.rpcHttpPort, jsonRPCHttpOptionGroup.rpcHttpApis, hostsAllowlist);
engineJsonRpcConfiguration =
createEngineJsonRpcConfiguration(
engineRPCOptionGroup.engineRpcHttpPort, engineRPCOptionGroup.engineHostsAllowlist);
engineRPCOptionGroup.engineRpcPort, engineRPCOptionGroup.engineHostsAllowlist);
p2pTLSConfiguration = p2pTLSConfigOptions.p2pTLSConfiguration(commandLine);
graphQLConfiguration = graphQLConfiguration();
webSocketConfiguration =
@ -1928,9 +1922,6 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
jsonRPCWebsocketOptionGroup.rpcWsPort,
jsonRPCWebsocketOptionGroup.rpcWsApis,
hostsAllowlist);
engineWebSocketConfiguration =
engineWebSocketConfiguration(
engineRPCOptionGroup.engineRpcWsPort, engineRPCOptionGroup.engineHostsAllowlist);
jsonRpcIpcConfiguration =
jsonRpcIpcConfiguration(
unstableIpcOptions.isEnabled(),
@ -2117,7 +2108,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
final Integer listenPort, final List<String> allowCallsFrom) {
JsonRpcConfiguration engineConfig =
jsonRpcConfiguration(listenPort, Arrays.asList("ENGINE", "ETH"), allowCallsFrom);
engineConfig.setEnabled(isMergeEnabled());
engineConfig.setEnabled(engineRPCOptionGroup.isEngineRpcEnabled);
if (engineRPCOptionGroup.isEngineAuthEnabled) {
engineConfig.setAuthenticationEnabled(true);
engineConfig.setAuthenticationAlgorithm(JwtAlgorithm.HS256);
@ -2132,24 +2123,6 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
return engineConfig;
}
private WebSocketConfiguration engineWebSocketConfiguration(
final Integer listenPort, final List<String> allowCallsFrom) {
final WebSocketConfiguration webSocketConfiguration =
webSocketConfiguration(listenPort, Arrays.asList("ENGINE", "ETH"), allowCallsFrom);
webSocketConfiguration.setEnabled(isMergeEnabled());
if (Boolean.TRUE.equals(engineRPCOptionGroup.isEngineAuthEnabled)) {
webSocketConfiguration.setAuthenticationEnabled(true);
webSocketConfiguration.setAuthenticationAlgorithm(JwtAlgorithm.HS256);
if (engineRPCOptionGroup.engineJwtKeyFile != null
&& java.nio.file.Files.exists(engineRPCOptionGroup.engineJwtKeyFile)) { // NOSONAR
webSocketConfiguration.setAuthenticationPublicKeyFile(
engineRPCOptionGroup.engineJwtKeyFile.toFile());
}
}
return webSocketConfiguration;
}
private JsonRpcConfiguration jsonRpcConfiguration(
final Integer listenPort, final List<String> apiGroups, final List<String> allowCallsFrom) {
checkRpcTlsClientAuthOptionsDependencies();
@ -2785,7 +2758,6 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
final JsonRpcConfiguration jsonRpcConfiguration,
final JsonRpcConfiguration engineJsonRpcConfiguration,
final WebSocketConfiguration webSocketConfiguration,
final WebSocketConfiguration engineWebSocketConfiguration,
final JsonRpcIpcConfiguration jsonRpcIpcConfiguration,
final ApiConfiguration apiConfiguration,
final MetricsConfiguration metricsConfiguration,
@ -2824,7 +2796,6 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
.jsonRpcConfiguration(jsonRpcConfiguration)
.engineJsonRpcConfiguration(engineJsonRpcConfiguration)
.webSocketConfiguration(webSocketConfiguration)
.engineWebSocketConfiguration(engineWebSocketConfiguration)
.jsonRpcIpcConfiguration(jsonRpcIpcConfiguration)
.apiConfiguration(apiConfiguration)
.pidPath(pidPath)
@ -3092,8 +3063,10 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
effectivePorts,
jsonRPCWebsocketOptionGroup.rpcWsPort,
jsonRPCWebsocketOptionGroup.isRpcWsEnabled);
addPortIfEnabled(effectivePorts, engineRPCOptionGroup.engineRpcHttpPort, isMergeEnabled());
addPortIfEnabled(effectivePorts, engineRPCOptionGroup.engineRpcWsPort, isMergeEnabled());
addPortIfEnabled(
effectivePorts,
engineRPCOptionGroup.engineRpcPort,
engineRPCOptionGroup.isEngineRpcEnabled);
addPortIfEnabled(
effectivePorts, metricsOptionGroup.metricsPort, metricsOptionGroup.isMetricsEnabled);
addPortIfEnabled(

@ -54,10 +54,9 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
@CommandLine.Option(
names = DNS_DISCOVERY_SERVER_OVERRIDE_FLAG,
hidden = true,
defaultValue = "",
description =
"DNS server host to use for doing DNS Discovery of peers, rather than the machine's configured DNS server")
private String dnsDiscoveryServerOverride = null;
private Optional<String> dnsDiscoveryServerOverride = Optional.empty();
private NetworkingOptions() {}
@ -71,8 +70,8 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
networkingConfig.getCheckMaintainedConnectionsFrequencySec();
cliOptions.initiateConnectionsFrequencySec =
networkingConfig.getInitiateConnectionsFrequencySec();
cliOptions.dnsDiscoveryServerOverride =
networkingConfig.getDnsDiscoveryServerOverride().orElse("");
cliOptions.dnsDiscoveryServerOverride = networkingConfig.getDnsDiscoveryServerOverride();
return cliOptions;
}
@ -81,19 +80,24 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
NetworkingConfiguration config = NetworkingConfiguration.create();
config.setCheckMaintainedConnectionsFrequency(checkMaintainedConnectionsFrequencySec);
config.setInitiateConnectionsFrequency(initiateConnectionsFrequencySec);
config.setDnsDiscoveryServerOverride(
Optional.of(dnsDiscoveryServerOverride).filter(z -> !z.isBlank()).orElse(null));
config.setDnsDiscoveryServerOverride(dnsDiscoveryServerOverride);
return config;
}
@Override
public List<String> getCLIOptions() {
return Arrays.asList(
CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_FLAG,
OptionParser.format(checkMaintainedConnectionsFrequencySec),
INITIATE_CONNECTIONS_FREQUENCY_FLAG,
OptionParser.format(initiateConnectionsFrequencySec),
DNS_DISCOVERY_SERVER_OVERRIDE_FLAG,
dnsDiscoveryServerOverride);
List<String> retval =
Arrays.asList(
CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_FLAG,
OptionParser.format(checkMaintainedConnectionsFrequencySec),
INITIATE_CONNECTIONS_FREQUENCY_FLAG,
OptionParser.format(initiateConnectionsFrequencySec));
if (dnsDiscoveryServerOverride.isPresent()) {
retval.add(DNS_DISCOVERY_SERVER_OVERRIDE_FLAG);
retval.add(dnsDiscoveryServerOverride.get());
}
return retval;
}
}

@ -265,15 +265,15 @@ public final class RunnerBuilderTest {
}
@Test
public void whenEngineApiAddedWebSocketReadyOnDefaultPort() {
public void whenEngineApiAddedWebSocketReadyOnSamePort() {
WebSocketConfiguration wsRpc = WebSocketConfiguration.createDefault();
wsRpc.setEnabled(true);
WebSocketConfiguration engineWsRpc = WebSocketConfiguration.createEngineDefault();
engineWsRpc.setEnabled(true);
EthNetworkConfig mockMainnet = mock(EthNetworkConfig.class);
when(mockMainnet.getNetworkId()).thenReturn(BigInteger.ONE);
MergeConfigOptions.setMergeEnabled(true);
when(besuController.getMiningCoordinator()).thenReturn(mock(MergeMiningCoordinator.class));
JsonRpcConfiguration engineConf = JsonRpcConfiguration.createEngineDefault();
engineConf.setEnabled(true);
final Runner runner =
new RunnerBuilder()
@ -288,9 +288,9 @@ public final class RunnerBuilderTest {
.metricsSystem(mock(ObservableMetricsSystem.class))
.permissioningService(mock(PermissioningServiceImpl.class))
.jsonRpcConfiguration(JsonRpcConfiguration.createDefault())
.engineJsonRpcConfiguration(engineConf)
.webSocketConfiguration(wsRpc)
.jsonRpcIpcConfiguration(mock(JsonRpcIpcConfiguration.class))
.engineWebSocketConfiguration(engineWsRpc)
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
.vertx(Vertx.vertx())
@ -301,8 +301,7 @@ public final class RunnerBuilderTest {
.besuPluginContext(mock(BesuPluginContextImpl.class))
.build();
assertThat(runner.getWebSocketPort()).isPresent();
assertThat(runner.getEngineWebsocketPort()).isPresent();
assertThat(runner.getEngineJsonRpcPort()).isPresent();
}
@Test
@ -343,7 +342,6 @@ public final class RunnerBuilderTest {
assertThat(runner.getJsonRpcPort()).isPresent();
assertThat(runner.getEngineJsonRpcPort()).isEmpty();
assertThat(runner.getEngineWebsocketPort()).isEmpty();
}
@Test

@ -4767,18 +4767,19 @@ public class BesuCommandTest extends CommandTestAbstract {
@Test
public void assertThatCheckPortClashRejectsAsExpectedForEngineApi() throws Exception {
// use WS port for HTTP
final int port = 8551;
final int port = 8545;
parseCommand(
"--Xmerge-support",
"true",
"--rpc-http-enabled",
"--engine-rpc-http-port",
"--engine-rpc-enabled",
"--engine-rpc-port",
String.valueOf(port),
"--rpc-ws-enabled");
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8))
.contains(
"Port number '8551' has been specified multiple times. Please review the supplied configuration.");
"Port number '8545' has been specified multiple times. Please review the supplied configuration.");
}
@Test

@ -265,7 +265,6 @@ public abstract class CommandTestAbstract {
when(mockRunnerBuilder.natMethodFallbackEnabled(anyBoolean())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.jsonRpcConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.engineJsonRpcConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.engineWebSocketConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.graphQLConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.webSocketConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.jsonRpcIpcConfiguration(any())).thenReturn(mockRunnerBuilder);

@ -60,7 +60,8 @@ network-id=303
rpc-http-enabled=false
rpc-http-host="5.6.7.8"
rpc-http-port=5678
engine-rpc-http-port=5679
engine-rpc-port=5679
engine-rpc-enabled=true
rpc-http-max-active-connections=100
rpc-http-api=["DEBUG","ETH"]
rpc-http-apis=["DEBUG","ETH"]
@ -101,7 +102,6 @@ rpc-ws-api=["DEBUG","ETH"]
rpc-ws-apis=["DEBUG","ETH"]
rpc-ws-host="9.10.11.12"
rpc-ws-port=9101
engine-rpc-ws-port=9102
rpc-ws-max-active-connections=101
rpc-ws-max-frame-size=65535
rpc-ws-authentication-enabled=false

@ -34,7 +34,7 @@ import com.google.common.base.MoreObjects;
public class JsonRpcConfiguration {
private static final String DEFAULT_JSON_RPC_HOST = "127.0.0.1";
public static final int DEFAULT_JSON_RPC_PORT = 8545;
public static final int DEFAULT_ENGINE_JSON_RPC_PORT = 8550;
public static final int DEFAULT_ENGINE_JSON_RPC_PORT = 8551;
public static final int DEFAULT_MAX_ACTIVE_CONNECTIONS = 80;
private boolean enabled;
@ -65,8 +65,9 @@ public class JsonRpcConfiguration {
public static JsonRpcConfiguration createEngineDefault() {
final JsonRpcConfiguration config = createDefault();
config.setEnabled(false);
config.setPort(DEFAULT_ENGINE_JSON_RPC_PORT);
List<String> engineMethodGroup = new ArrayList<>();
List<String> engineMethodGroup = new ArrayList<>(2);
engineMethodGroup.add(RpcApis.ENGINE.name());
engineMethodGroup.add(RpcApis.ETH.name());
config.setRpcApis(engineMethodGroup);

@ -0,0 +1,652 @@
/*
* Copyright Hyperledger Besu contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Streams.stream;
import static org.apache.tuweni.net.tls.VertxTrustOptions.allowlistClients;
import org.hyperledger.besu.ethereum.api.handlers.HandlerFactory;
import org.hyperledger.besu.ethereum.api.handlers.TimeoutOptions;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationUtils;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.DefaultAuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.AuthenticatedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.TimedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.TracedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.HealthService;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketMessageHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.tls.TlsClientAuthConfiguration;
import org.hyperledger.besu.ethereum.api.tls.TlsConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
import org.hyperledger.besu.nat.upnp.UpnpNatManager;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.util.ExceptionUtils;
import org.hyperledger.besu.util.NetworkUtility;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.extension.trace.propagation.JaegerPropagator;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JsonRpcService {
private static final Logger LOG = LoggerFactory.getLogger(JsonRpcService.class);
private static final String SPAN_CONTEXT = "span_context";
private static final InetSocketAddress EMPTY_SOCKET_ADDRESS = new InetSocketAddress("0.0.0.0", 0);
private static final String APPLICATION_JSON = "application/json";
private static final TextMapPropagator traceFormats =
TextMapPropagator.composite(
JaegerPropagator.getInstance(),
B3Propagator.injectingSingleHeader(),
W3CBaggagePropagator.getInstance());
private static final TextMapGetter<HttpServerRequest> requestAttributesGetter =
new TextMapGetter<>() {
@Override
public Iterable<String> keys(final HttpServerRequest carrier) {
return carrier.headers().names();
}
@Nullable
@Override
public String get(final @Nullable HttpServerRequest carrier, final String key) {
if (carrier == null) {
return null;
}
return carrier.headers().get(key);
}
};
private final Vertx vertx;
private final JsonRpcConfiguration config;
private final Map<String, JsonRpcMethod> rpcMethods;
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private Tracer tracer;
private final int maxActiveConnections;
private final AtomicInteger activeConnectionsCount = new AtomicInteger();
private final WebSocketConfiguration socketConfiguration;
private final Optional<WebSocketMessageHandler> webSocketMessageHandler;
@VisibleForTesting public final Optional<AuthenticationService> authenticationService;
private HttpServer httpServer;
private final HealthService livenessService;
private final HealthService readinessService;
/**
* Construct a JsonRpcService to handle either http or websocket clients
*
* @param vertx The vertx process that will be running this service
* @param dataDir The data directory where requests can be buffered
* @param config Configuration for the rpc methods being loaded
* @param metricsSystem The metrics service that activities should be reported to
* @param natService The NAT environment manager.
* @param methods The json rpc methods that should be enabled
* @param maybeSockets websocket configuration to use
* @param scheduler for managing ETH tasks
* @param authenticationService used to grant or deny access to methods served
* @param livenessService A service responsible for reporting whether this node is live
* @param readinessService A service responsible for reporting whether this node has fully started
*/
public JsonRpcService(
final Vertx vertx,
final Path dataDir,
final JsonRpcConfiguration config,
final MetricsSystem metricsSystem,
final NatService natService,
final Map<String, JsonRpcMethod> methods,
final Optional<WebSocketConfiguration> maybeSockets,
final EthScheduler scheduler,
final Optional<AuthenticationService> authenticationService,
final HealthService livenessService,
final HealthService readinessService) {
this.dataDir = dataDir;
this.requestTimer =
metricsSystem.createLabelledTimer(
BesuMetricCategory.RPC,
"request_time",
"Time taken to process a JSON-RPC request",
"methodName");
JsonRpcProcessor jsonRpcProcessor = new BaseJsonRpcProcessor();
this.socketConfiguration =
maybeSockets.isPresent() ? maybeSockets.get() : WebSocketConfiguration.createDefault();
if (authenticationService.isPresent()) {
jsonRpcProcessor =
new AuthenticatedJsonRpcProcessor(
jsonRpcProcessor,
authenticationService.get(),
this.socketConfiguration.getRpcApisNoAuth());
}
final JsonRpcExecutor jsonRpcExecutor = new JsonRpcExecutor(jsonRpcProcessor, methods);
this.webSocketMessageHandler =
Optional.of(
new WebSocketMessageHandler(
vertx, jsonRpcExecutor, scheduler, this.socketConfiguration.getTimeoutSec()));
validateConfig(config);
this.config = config;
this.vertx = vertx;
this.natService = natService;
this.rpcMethods = methods;
this.authenticationService = authenticationService;
this.livenessService = livenessService;
this.readinessService = readinessService;
this.maxActiveConnections = config.getMaxActiveConnections();
}
public CompletableFuture<Void> start() {
LOG.info("Starting JSON-RPC service on {}:{}", config.getHost(), config.getPort());
LOG.debug("max number of active connections {}", maxActiveConnections);
this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0");
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
try {
// Create the HTTP server and a router object.
httpServer = vertx.createHttpServer(getHttpServerOptions());
httpServer.webSocketHandler(webSocketHandler());
httpServer.connectionHandler(connectionHandler());
httpServer
.requestHandler(buildRouter())
.listen(
res -> {
if (!res.failed()) {
resultFuture.complete(null);
config.setPort(httpServer.actualPort());
LOG.info(
"JSON-RPC service started and listening on {}:{}{}",
config.getHost(),
config.getPort(),
tlsLogMessage());
natService.ifNatEnvironment(
NatMethod.UPNP,
natManager ->
((UpnpNatManager) natManager)
.requestPortForward(
config.getPort(), NetworkProtocol.TCP, NatServiceType.JSON_RPC));
return;
}
httpServer = null;
resultFuture.completeExceptionally(getFailureException(res.cause()));
});
} catch (final JsonRpcServiceException tlsException) {
httpServer = null;
resultFuture.completeExceptionally(tlsException);
} catch (final VertxException listenException) {
httpServer = null;
resultFuture.completeExceptionally(
new JsonRpcServiceException(
String.format(
"Ethereum JSON-RPC listener failed to start: %s",
ExceptionUtils.rootCause(listenException).getMessage())));
}
return resultFuture;
}
public CompletableFuture<Void> stop() {
if (httpServer == null) {
return CompletableFuture.completedFuture(null);
}
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
httpServer.close(
res -> {
if (res.failed()) {
resultFuture.completeExceptionally(res.cause());
} else {
httpServer = null;
resultFuture.complete(null);
}
});
return resultFuture;
}
private Handler<HttpConnection> connectionHandler() {
return connection -> {
if (activeConnectionsCount.get() >= maxActiveConnections) {
// disallow new connections to prevent DoS
LOG.warn(
"Rejecting new connection from {}. Max {} active connections limit reached.",
connection.remoteAddress(),
activeConnectionsCount.getAndIncrement());
connection.close();
} else {
LOG.debug(
"Opened connection from {}. Total of active connections: {}/{}",
connection.remoteAddress(),
activeConnectionsCount.incrementAndGet(),
maxActiveConnections);
}
connection.closeHandler(
c ->
LOG.debug(
"Connection closed from {}. Total of active connections: {}/{}",
connection.remoteAddress(),
activeConnectionsCount.decrementAndGet(),
maxActiveConnections));
};
}
private Handler<ServerWebSocket> webSocketHandler() {
return websocket -> {
final SocketAddress socketAddress = websocket.remoteAddress();
final String connectionId = websocket.textHandlerID();
final String token =
AuthenticationUtils.getJwtTokenFromAuthorizationHeaderValue(
websocket.headers().get("Authorization"));
if (token != null) {
LOG.trace("Websocket authentication token {}", token);
}
if (!hostIsInAllowlist(
Optional.ofNullable(websocket.headers().get("Host")).orElse("NOHOST"))) {
websocket.reject(403);
}
if (authenticationService.isPresent()) {
authenticationService
.get()
.authenticate(
token,
user -> {
if (user.isEmpty()) {
websocket.reject(403);
}
});
}
LOG.debug("Websocket Connected ({})", socketAddressAsString(socketAddress));
final Handler<Buffer> socketHandler =
buffer -> {
LOG.debug(
"Received Websocket request (binary frame) {} ({})",
buffer.toString(),
socketAddressAsString(socketAddress));
if (webSocketMessageHandler.isPresent()) {
webSocketMessageHandler.get().handle(websocket, buffer, Optional.empty());
} else {
LOG.error("No socket request handler configured");
}
};
websocket.textMessageHandler(text -> socketHandler.handle(Buffer.buffer(text)));
websocket.binaryMessageHandler(socketHandler);
websocket.closeHandler(
v -> {
LOG.debug("Websocket Disconnected ({})", socketAddressAsString(socketAddress));
vertx
.eventBus()
.publish(SubscriptionManager.EVENTBUS_REMOVE_SUBSCRIPTIONS_ADDRESS, connectionId);
});
websocket.exceptionHandler(
t -> {
LOG.debug(
"Unrecoverable error on Websocket: {} ({})",
t.getMessage(),
socketAddressAsString(socketAddress));
websocket.close();
});
};
}
private void validateConfig(final JsonRpcConfiguration config) {
checkArgument(
config.getPort() == 0 || NetworkUtility.isValidPort(config.getPort()),
"Invalid port configuration.");
checkArgument(config.getHost() != null, "Required host is not configured.");
checkArgument(
config.getMaxActiveConnections() > 0, "Invalid max active connections configuration.");
}
private Router buildRouter() {
// Handle json rpc requests
final Router router = Router.router(vertx);
router.route().handler(this::createSpan);
// Verify Host header to avoid rebind attack.
router.route().handler(denyRouteToBlockedHost());
router
.route()
.handler(
CorsHandler.create(buildCorsRegexFromConfig())
.allowedHeader("*")
.allowedHeader("content-type"));
router
.route()
.handler(
BodyHandler.create()
.setUploadsDirectory(dataDir.resolve("uploads").toString())
.setDeleteUploadedFilesOnEnd(true));
router.route("/").method(HttpMethod.GET).handler(this::handleEmptyRequest);
router
.route(HealthService.LIVENESS_PATH)
.method(HttpMethod.GET)
.handler(livenessService::handleRequest);
router
.route(HealthService.READINESS_PATH)
.method(HttpMethod.GET)
.handler(readinessService::handleRequest);
Route mainRoute = router.route("/").method(HttpMethod.POST).produces(APPLICATION_JSON);
if (authenticationService.isPresent()) {
mainRoute.handler(
HandlerFactory.authentication(authenticationService.get(), config.getNoAuthRpcApis()));
}
mainRoute
.handler(HandlerFactory.jsonRpcParser())
.handler(
HandlerFactory.timeout(new TimeoutOptions(config.getHttpTimeoutSec()), rpcMethods));
if (authenticationService.isPresent()) {
mainRoute.blockingHandler(
HandlerFactory.jsonRpcExecutor(
new JsonRpcExecutor(
new AuthenticatedJsonRpcProcessor(
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
authenticationService.get(),
config.getNoAuthRpcApis()),
rpcMethods),
tracer));
} else {
mainRoute.blockingHandler(
HandlerFactory.jsonRpcExecutor(
new JsonRpcExecutor(
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
rpcMethods),
tracer));
}
if (authenticationService.isPresent()) {
router
.route("/login")
.method(HttpMethod.POST)
.produces(APPLICATION_JSON)
.handler(authenticationService.get()::handleLogin);
} else {
router
.route("/login")
.method(HttpMethod.POST)
.produces(APPLICATION_JSON)
.handler(DefaultAuthenticationService::handleDisabledLogin);
}
return router;
}
private void createSpan(final RoutingContext routingContext) {
final SocketAddress address = routingContext.request().connection().remoteAddress();
Context parent =
traceFormats.extract(Context.current(), routingContext.request(), requestAttributesGetter);
final Span serverSpan =
tracer
.spanBuilder(address.host() + ":" + address.port())
.setParent(parent)
.setSpanKind(SpanKind.SERVER)
.startSpan();
routingContext.put(SPAN_CONTEXT, Context.current().with(serverSpan));
routingContext.addEndHandler(
event -> {
if (event.failed()) {
serverSpan.recordException(event.cause());
serverSpan.setStatus(StatusCode.ERROR);
}
serverSpan.end();
});
routingContext.next();
}
private HttpServerOptions getHttpServerOptions() {
final HttpServerOptions httpServerOptions =
new HttpServerOptions()
.setHost(config.getHost())
.setPort(config.getPort())
.setHandle100ContinueAutomatically(true)
.setCompressionSupported(true);
httpServerOptions.setMaxWebSocketFrameSize(socketConfiguration.getMaxFrameSize());
httpServerOptions.setMaxWebSocketMessageSize(socketConfiguration.getMaxFrameSize() * 4);
applyTlsConfig(httpServerOptions);
return httpServerOptions;
}
private void applyTlsConfig(final HttpServerOptions httpServerOptions) {
final Optional<TlsConfiguration> maybeTlsConfig = config.getTlsConfiguration();
if (maybeTlsConfig.isPresent()) {
final TlsConfiguration tlsConfiguration = maybeTlsConfig.get();
try {
httpServerOptions
.setSsl(true)
.setPfxKeyCertOptions(
new PfxOptions()
.setPath(tlsConfiguration.getKeyStorePath().toString())
.setPassword(tlsConfiguration.getKeyStorePassword()))
.setUseAlpn(true);
tlsConfiguration
.getSecureTransportProtocols()
.ifPresent(httpServerOptions::setEnabledSecureTransportProtocols);
tlsConfiguration
.getCipherSuites()
.ifPresent(
cipherSuites -> {
for (String cs : cipherSuites) {
httpServerOptions.addEnabledCipherSuite(cs);
}
});
tlsConfiguration
.getClientAuthConfiguration()
.ifPresent(
clientAuthConfiguration ->
applyTlsClientAuth(clientAuthConfiguration, httpServerOptions));
} catch (final RuntimeException re) {
throw new JsonRpcServiceException(
String.format(
"TLS options failed to initialize for Ethereum JSON-RPC listener: %s",
re.getMessage()));
}
}
}
private void applyTlsClientAuth(
final TlsClientAuthConfiguration clientAuthConfiguration,
final HttpServerOptions httpServerOptions) {
httpServerOptions.setClientAuth(ClientAuth.REQUIRED);
clientAuthConfiguration
.getKnownClientsFile()
.ifPresent(
knownClientsFile ->
httpServerOptions.setTrustOptions(
allowlistClients(
knownClientsFile, clientAuthConfiguration.isCaClientsEnabled())));
}
private String tlsLogMessage() {
return config.getTlsConfiguration().isPresent() ? " with TLS enabled." : "";
}
private Throwable getFailureException(final Throwable listenFailure) {
JsonRpcServiceException servFail =
new JsonRpcServiceException(
String.format(
"Failed to bind Ethereum JSON-RPC listener to %s:%s: %s",
config.getHost(), config.getPort(), listenFailure.getMessage()));
servFail.initCause(listenFailure);
return servFail;
}
private Handler<RoutingContext> denyRouteToBlockedHost() {
return event -> {
final Optional<String> hostHeader = getAndValidateHostHeader(event);
if (config.getHostsAllowlist().contains("*")
|| (hostHeader.isPresent() && hostIsInAllowlist(hostHeader.get()))) {
event.next();
} else {
final HttpServerResponse response = event.response();
if (!response.closed()) {
response
.setStatusCode(403)
.putHeader("Content-Type", "application/json; charset=utf-8")
.end("{\"message\":\"Host not authorized.\"}");
}
}
};
}
private Optional<String> getAndValidateHostHeader(final RoutingContext event) {
String hostname =
event.request().getHeader(HttpHeaders.HOST) != null
? event.request().getHeader(HttpHeaders.HOST)
: event.request().host();
final Iterable<String> splitHostHeader = Splitter.on(':').split(hostname);
final long hostPieces = stream(splitHostHeader).count();
if (hostPieces > 1) {
// If the host contains a colon, verify the host is correctly formed - host [ ":" port ]
if (hostPieces > 2 || !Iterables.get(splitHostHeader, 1).matches("\\d{1,5}+")) {
return Optional.empty();
}
}
return Optional.ofNullable(Iterables.get(splitHostHeader, 0));
}
private boolean hostIsInAllowlist(final String hostHeader) {
if (config.getHostsAllowlist().contains("*")
|| config.getHostsAllowlist().stream()
.anyMatch(
allowlistEntry -> allowlistEntry.toLowerCase().equals(hostHeader.toLowerCase()))) {
return true;
} else {
LOG.trace("Host not in allowlist: '{}'", hostHeader);
return false;
}
}
public InetSocketAddress socketAddress() {
if (httpServer == null) {
return EMPTY_SOCKET_ADDRESS;
}
return new InetSocketAddress(config.getHost(), httpServer.actualPort());
}
private String socketAddressAsString(final SocketAddress socketAddress) {
return String.format("host=%s, port=%d", socketAddress.host(), socketAddress.port());
}
@VisibleForTesting
public String url() {
if (httpServer == null) {
return "";
}
return NetworkUtility.urlForSocketAddress(getScheme(), socketAddress());
}
private String getScheme() {
return config.getTlsConfiguration().isPresent() ? "https" : "http";
}
// Facilitate remote health-checks in AWS, inter alia.
private void handleEmptyRequest(final RoutingContext routingContext) {
routingContext.response().setStatusCode(201).end();
}
private String buildCorsRegexFromConfig() {
if (config.getCorsAllowedDomains().isEmpty()) {
return "";
}
if (config.getCorsAllowedDomains().contains("*")) {
return ".*";
} else {
final StringJoiner stringJoiner = new StringJoiner("|");
config.getCorsAllowedDomains().stream().filter(s -> !s.isEmpty()).forEach(stringJoiner::add);
return stringJoiner.toString();
}
}
}

@ -155,7 +155,7 @@ public class EngineAuthService implements AuthenticationService {
final Optional<User> optionalUser,
final JsonRpcMethod jsonRpcMethod,
final Collection<String> noAuthMethods) {
return true; // no AuthZ for engine APIs
return noAuthMethods.contains(jsonRpcMethod.getName()) || optionalUser.isPresent();
}
private boolean issuedRecently(final long iat) {

@ -44,9 +44,9 @@ import io.vertx.ext.auth.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketRequestHandler {
public class WebSocketMessageHandler {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketRequestHandler.class);
private static final Logger LOG = LoggerFactory.getLogger(WebSocketMessageHandler.class);
private static final ObjectWriter JSON_OBJECT_WRITER =
new ObjectMapper()
.registerModule(new Jdk8Module()) // Handle JDK8 Optionals (de)serialization
@ -59,7 +59,7 @@ public class WebSocketRequestHandler {
final EthScheduler ethScheduler;
private final long timeoutSec;
public WebSocketRequestHandler(
public WebSocketMessageHandler(
final Vertx vertx,
final JsonRpcExecutor jsonRpcExecutor,
final EthScheduler ethScheduler,

@ -58,7 +58,7 @@ public class WebSocketService {
private final Vertx vertx;
private final WebSocketConfiguration configuration;
private final WebSocketRequestHandler websocketRequestHandler;
private final WebSocketMessageHandler websocketMessageHandler;
private HttpServer httpServer;
@ -67,22 +67,22 @@ public class WebSocketService {
public WebSocketService(
final Vertx vertx,
final WebSocketConfiguration configuration,
final WebSocketRequestHandler websocketRequestHandler) {
final WebSocketMessageHandler websocketMessageHandler) {
this(
vertx,
configuration,
websocketRequestHandler,
websocketMessageHandler,
DefaultAuthenticationService.create(vertx, configuration));
}
public WebSocketService(
final Vertx vertx,
final WebSocketConfiguration configuration,
final WebSocketRequestHandler websocketRequestHandler,
final WebSocketMessageHandler websocketMessageHandler,
final Optional<AuthenticationService> authenticationService) {
this.vertx = vertx;
this.configuration = configuration;
this.websocketRequestHandler = websocketRequestHandler;
this.websocketMessageHandler = websocketMessageHandler;
this.authenticationService = authenticationService;
this.maxActiveConnections = configuration.getMaxActiveConnections();
}
@ -138,9 +138,9 @@ public class WebSocketService {
authenticationService
.get()
.authenticate(
token, user -> websocketRequestHandler.handle(websocket, buffer, user));
token, user -> websocketMessageHandler.handle(websocket, buffer, user));
} else {
websocketRequestHandler.handle(websocket, buffer, Optional.empty());
websocketMessageHandler.handle(websocket, buffer, Optional.empty());
}
};
websocket.textMessageHandler(text -> socketHandler.handle(Buffer.buffer(text)));

@ -61,7 +61,7 @@ public class WebSocketHostAllowlistTest {
private final WebSocketConfiguration webSocketConfiguration =
WebSocketConfiguration.createDefault();
private static WebSocketRequestHandler webSocketRequestHandlerSpy;
private static WebSocketMessageHandler webSocketMessageHandlerSpy;
private WebSocketService websocketService;
private HttpClient httpClient;
private static final int VERTX_AWAIT_TIMEOUT_MILLIS = 10000;
@ -76,16 +76,16 @@ public class WebSocketHostAllowlistTest {
new WebSocketMethodsFactory(
new SubscriptionManager(new NoOpMetricsSystem()), new HashMap<>())
.methods();
webSocketRequestHandlerSpy =
webSocketMessageHandlerSpy =
spy(
new WebSocketRequestHandler(
new WebSocketMessageHandler(
vertx,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), websocketMethods),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));
websocketService =
new WebSocketService(vertx, webSocketConfiguration, webSocketRequestHandlerSpy);
new WebSocketService(vertx, webSocketConfiguration, webSocketMessageHandlerSpy);
websocketService.start().join();
final InetSocketAddress inetSocketAddress = websocketService.socketAddress();
@ -100,7 +100,7 @@ public class WebSocketHostAllowlistTest {
@After
public void after() {
reset(webSocketRequestHandlerSpy);
reset(webSocketMessageHandlerSpy);
websocketService.stop();
}

@ -60,12 +60,12 @@ import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
@RunWith(VertxUnitRunner.class)
public class WebSocketRequestHandlerTest {
public class WebSocketMessageHandlerTest {
private static final int VERTX_AWAIT_TIMEOUT_MILLIS = 10000;
private Vertx vertx;
private WebSocketRequestHandler handler;
private WebSocketMessageHandler handler;
private JsonRpcMethod jsonRpcMethodMock;
private ServerWebSocket websocketMock;
private final Map<String, JsonRpcMethod> methods = new HashMap<>();
@ -81,7 +81,7 @@ public class WebSocketRequestHandlerTest {
methods.put("eth_x", jsonRpcMethodMock);
handler =
new WebSocketRequestHandler(
new WebSocketMessageHandler(
vertx,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
mock(EthScheduler.class),
@ -112,7 +112,7 @@ public class WebSocketRequestHandlerTest {
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
async.awaitSuccess(WebSocketMessageHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
verify(websocketMock).writeFrame(argThat(isFrameWithText(Json.encode(expectedResponse))));
@ -140,7 +140,7 @@ public class WebSocketRequestHandlerTest {
handler.handle(websocketMock, arrayJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
async.awaitSuccess(WebSocketMessageHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
verify(websocketMock).writeFrame(argThat(isFrameWithText(Json.encode(expectedBatchResponse))));
verify(websocketMock).writeFrame(argThat(this::isFinalFrame));
@ -154,8 +154,8 @@ public class WebSocketRequestHandlerTest {
when(websocketMock.textHandlerID()).thenReturn(UUID.randomUUID().toString());
WebSocketRequestHandler handleBadCalls =
new WebSocketRequestHandler(
WebSocketMessageHandler handleBadCalls =
new WebSocketMessageHandler(
vertx,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
mock(EthScheduler.class),
@ -177,7 +177,7 @@ public class WebSocketRequestHandlerTest {
handleBadCalls.handle(websocketMock, arrayJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
async.awaitSuccess(WebSocketMessageHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
verify(websocketMock).writeFrame(argThat(isFrameWithText(Json.encode(expectedBatchResponse))));
@ -236,7 +236,7 @@ public class WebSocketRequestHandlerTest {
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
async.awaitSuccess(WebSocketMessageHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
verify(websocketMock).writeFrame(argThat(isFrameWithText(Json.encode(expectedResponse))));
verify(websocketMock).writeFrame(argThat(this::isFinalFrame));
@ -260,7 +260,7 @@ public class WebSocketRequestHandlerTest {
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
async.awaitSuccess(WebSocketMessageHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
verify(websocketMock).writeFrame(argThat(isFrameWithText(Json.encode(expectedResponse))));
@ -282,7 +282,7 @@ public class WebSocketRequestHandlerTest {
handler.handle(websocketMock, requestJson.toBuffer(), Optional.empty());
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
async.awaitSuccess(WebSocketMessageHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
verify(websocketMock).writeFrame(argThat(isFrameWithText(Json.encode(expectedResponse))));

@ -123,7 +123,7 @@ public class WebSocketServiceLoginTest {
protected static JWTAuth jwtAuth;
protected static final NatService natService = new NatService(Optional.empty());
private WebSocketConfiguration websocketConfiguration;
private WebSocketRequestHandler webSocketRequestHandlerSpy;
private WebSocketMessageHandler webSocketMessageHandlerSpy;
private WebSocketService websocketService;
private HttpClient httpClient;
@ -187,9 +187,9 @@ public class WebSocketServiceLoginTest {
mock(EthPeers.class)));
websocketMethods.putAll(rpcMethods);
webSocketRequestHandlerSpy =
webSocketMessageHandlerSpy =
spy(
new WebSocketRequestHandler(
new WebSocketMessageHandler(
vertx,
new JsonRpcExecutor(
new AuthenticatedJsonRpcProcessor(
@ -201,7 +201,7 @@ public class WebSocketServiceLoginTest {
TimeoutOptions.defaultOptions().getTimeoutSeconds()));
websocketService =
new WebSocketService(vertx, websocketConfiguration, webSocketRequestHandlerSpy);
new WebSocketService(vertx, websocketConfiguration, webSocketMessageHandlerSpy);
websocketService.start().join();
jwtAuth = websocketService.authenticationService.get().getJwtAuthProvider();
@ -217,7 +217,7 @@ public class WebSocketServiceLoginTest {
@After
public void after() {
reset(webSocketRequestHandlerSpy);
reset(webSocketMessageHandlerSpy);
websocketService.stop();
}

@ -58,7 +58,7 @@ public class WebSocketServiceTest {
private Vertx vertx;
private WebSocketConfiguration websocketConfiguration;
private WebSocketRequestHandler webSocketRequestHandlerSpy;
private WebSocketMessageHandler webSocketMessageHandlerSpy;
private Map<String, JsonRpcMethod> websocketMethods;
private WebSocketService websocketService;
private HttpClient httpClient;
@ -79,16 +79,16 @@ public class WebSocketServiceTest {
new WebSocketMethodsFactory(
new SubscriptionManager(new NoOpMetricsSystem()), new HashMap<>())
.methods();
webSocketRequestHandlerSpy =
webSocketMessageHandlerSpy =
spy(
new WebSocketRequestHandler(
new WebSocketMessageHandler(
vertx,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), websocketMethods),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));
websocketService =
new WebSocketService(vertx, websocketConfiguration, webSocketRequestHandlerSpy);
new WebSocketService(vertx, websocketConfiguration, webSocketMessageHandlerSpy);
websocketService.start().join();
websocketConfiguration.setPort(websocketService.socketAddress().getPort());
@ -103,7 +103,7 @@ public class WebSocketServiceTest {
@After
public void after() {
reset(webSocketRequestHandlerSpy);
reset(webSocketMessageHandlerSpy);
websocketService.stop();
}
@ -312,7 +312,7 @@ public class WebSocketServiceTest {
websocket.closeHandler(
h -> {
verifyNoInteractions(webSocketRequestHandlerSpy);
verifyNoInteractions(webSocketMessageHandlerSpy);
async.complete();
});

@ -25,7 +25,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketMessageHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.Subscription;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
@ -58,7 +58,7 @@ import org.mockito.stubbing.Answer;
public class EthSubscribeIntegrationTest {
private Vertx vertx;
private WebSocketRequestHandler webSocketRequestHandler;
private WebSocketMessageHandler webSocketMessageHandler;
private SubscriptionManager subscriptionManager;
private WebSocketMethodsFactory webSocketMethodsFactory;
private final int ASYNC_TIMEOUT = 5000;
@ -70,8 +70,8 @@ public class EthSubscribeIntegrationTest {
vertx = Vertx.vertx();
subscriptionManager = new SubscriptionManager(new NoOpMetricsSystem());
webSocketMethodsFactory = new WebSocketMethodsFactory(subscriptionManager, new HashMap<>());
webSocketRequestHandler =
new WebSocketRequestHandler(
webSocketMessageHandler =
new WebSocketMessageHandler(
vertx,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), webSocketMethodsFactory.methods()),
Mockito.mock(EthScheduler.class),
@ -92,7 +92,7 @@ public class EthSubscribeIntegrationTest {
when(websocketMock.writeFrame(argThat(this::isFinalFrame)))
.then(completeOnLastFrame(async, websocketMock));
webSocketRequestHandler.handle(
webSocketMessageHandler.handle(
websocketMock, Json.encodeToBuffer(subscribeRequestBody), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);
@ -124,9 +124,9 @@ public class EthSubscribeIntegrationTest {
when(websocketMock2.textHandlerID()).thenReturn(CONNECTION_ID_2);
when(websocketMock2.writeFrame(argThat(this::isFinalFrame))).then(countDownOnLastFrame(async));
webSocketRequestHandler.handle(
webSocketMessageHandler.handle(
websocketMock1, Json.encodeToBuffer(subscribeRequestBody1), Optional.empty());
webSocketRequestHandler.handle(
webSocketMessageHandler.handle(
websocketMock2, Json.encodeToBuffer(subscribeRequestBody2), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);

@ -25,7 +25,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketMessageHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscribeRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
@ -53,7 +53,7 @@ import org.mockito.stubbing.Answer;
public class EthUnsubscribeIntegrationTest {
private Vertx vertx;
private WebSocketRequestHandler webSocketRequestHandler;
private WebSocketMessageHandler webSocketMessageHandler;
private SubscriptionManager subscriptionManager;
private WebSocketMethodsFactory webSocketMethodsFactory;
private final int ASYNC_TIMEOUT = 5000;
@ -64,8 +64,8 @@ public class EthUnsubscribeIntegrationTest {
vertx = Vertx.vertx();
subscriptionManager = new SubscriptionManager(new NoOpMetricsSystem());
webSocketMethodsFactory = new WebSocketMethodsFactory(subscriptionManager, new HashMap<>());
webSocketRequestHandler =
new WebSocketRequestHandler(
webSocketMessageHandler =
new WebSocketMessageHandler(
vertx,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), webSocketMethodsFactory.methods()),
mock(EthScheduler.class),
@ -92,7 +92,7 @@ public class EthUnsubscribeIntegrationTest {
when(websocketMock.textHandlerID()).thenReturn(CONNECTION_ID);
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
webSocketRequestHandler.handle(
webSocketMessageHandler.handle(
websocketMock, Json.encodeToBuffer(unsubscribeRequestBody), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);
@ -124,7 +124,7 @@ public class EthUnsubscribeIntegrationTest {
when(websocketMock.textHandlerID()).thenReturn(CONNECTION_ID);
when(websocketMock.writeFrame(argThat(this::isFinalFrame))).then(completeOnLastFrame(async));
webSocketRequestHandler.handle(
webSocketMessageHandler.handle(
websocketMock, Json.encodeToBuffer(unsubscribeRequestBody), Optional.empty());
async.awaitSuccess(ASYNC_TIMEOUT);

@ -28,7 +28,7 @@ public class NetworkingConfiguration {
private int initiateConnectionsFrequencySec = DEFAULT_INITIATE_CONNECTIONS_FREQUENCY_SEC;
private int checkMaintainedConnectionsFrequencySec =
DEFAULT_CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_SEC;
private String dnsDiscoveryServerOverride = null;
private Optional<String> dnsDiscoveryServerOverride = Optional.empty();
public static NetworkingConfiguration create() {
return new NetworkingConfiguration();
@ -68,13 +68,13 @@ public class NetworkingConfiguration {
}
public NetworkingConfiguration setDnsDiscoveryServerOverride(
final String dnsDiscoveryServerOverride) {
final Optional<String> dnsDiscoveryServerOverride) {
this.dnsDiscoveryServerOverride = dnsDiscoveryServerOverride;
return this;
}
public Optional<String> getDnsDiscoveryServerOverride() {
return Optional.ofNullable(dnsDiscoveryServerOverride);
return dnsDiscoveryServerOverride;
}
public NetworkingConfiguration setCheckMaintainedConnectionsFrequency(

Loading…
Cancel
Save