limit number of active WS connections (#2006)

* add config option for limiting max number of active WS connections

* update test websocket method to not deprecated version

Signed-off-by: Sally MacFarlane <sally.macfarlane@consensys.net>
pull/2015/head
Sally MacFarlane 4 years ago committed by GitHub
parent 4f3868c45d
commit 52c17e3bdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CHANGELOG.md
  2. 58
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 1
      besu/src/main/java/org/hyperledger/besu/cli/DefaultCommandValues.java
  4. 31
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  5. 1
      besu/src/test/resources/everything_config.toml
  6. 11
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketConfiguration.java
  7. 35
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketService.java
  8. 2
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketConfigurationTest.java
  9. 79
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketServiceTest.java

@ -3,7 +3,9 @@
## 21.1.2
### Additions and Improvements
- Added option to set a limit for JSON-RPC HTTP connections `--rpc-http-max-active-connections` [\#1996](https://github.com/hyperledger/besu/pull/1996)
- Added option to set a limit for JSON-RPC connections
* HTTP connections `--rpc-http-max-active-connections` [\#1996](https://github.com/hyperledger/besu/pull/1996)
* WS connections `--rpc-ws-max-active-connections` [\#2006](https://github.com/hyperledger/besu/pull/2006)
### Bug Fixes

@ -516,7 +516,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
@Option(
names = {"--rpc-http-max-active-connections"},
description =
"Maximum number of connections allowed for JSON-RPC HTTP (default: ${DEFAULT-VALUE}). Once this limit is reached, incoming connections will be rejected.",
"Maximum number of HTTP connections allowed for JSON-RPC (default: ${DEFAULT-VALUE}). Once this limit is reached, incoming connections will be rejected.",
arity = "1")
private final Integer rpcHttpMaxConnections = DEFAULT_HTTP_MAX_CONNECTIONS;
@ -617,6 +617,13 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
arity = "1")
private final Integer rpcWsPort = DEFAULT_WEBSOCKET_PORT;
@Option(
names = {"--rpc-ws-max-active-connections"},
description =
"Maximum number of WebSocket connections allowed for JSON-RPC (default: ${DEFAULT-VALUE}). Once this limit is reached, incoming connections will be rejected.",
arity = "1")
private final Integer rpcWsMaxConnections = DEFAULT_WS_MAX_CONNECTIONS;
@Option(
names = {"--rpc-ws-api", "--rpc-ws-apis"},
paramLabel = "<api name>",
@ -1697,28 +1704,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
private JsonRpcConfiguration jsonRpcConfiguration() {
checkRpcTlsClientAuthOptionsDependencies();
checkRpcTlsOptionsDependencies();
CommandLineUtils.checkOptionDependencies(
logger,
commandLine,
"--rpc-http-enabled",
!isRpcHttpEnabled,
asList(
"--rpc-http-api",
"--rpc-http-apis",
"--rpc-http-cors-origins",
"--rpc-http-host",
"--rpc-http-port",
"--rpc-http-max-active-connections",
"--rpc-http-authentication-enabled",
"--rpc-http-authentication-credentials-file",
"--rpc-http-authentication-public-key-file",
"--rpc-http-tls-enabled",
"--rpc-http-tls-keystore-file",
"--rpc-http-tls-keystore-password-file",
"--rpc-http-tls-client-auth-enabled",
"--rpc-http-tls-known-clients-file",
"--rpc-http-tls-ca-clients-enabled"));
checkRpcHttpOptionsDependencies();
if (isRpcHttpAuthenticationEnabled
&& rpcHttpAuthenticationCredentialsFile() == null
@ -1744,6 +1730,30 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
return jsonRpcConfiguration;
}
private void checkRpcHttpOptionsDependencies() {
CommandLineUtils.checkOptionDependencies(
logger,
commandLine,
"--rpc-http-enabled",
!isRpcHttpEnabled,
asList(
"--rpc-http-api",
"--rpc-http-apis",
"--rpc-http-cors-origins",
"--rpc-http-host",
"--rpc-http-port",
"--rpc-http-max-active-connections",
"--rpc-http-authentication-enabled",
"--rpc-http-authentication-credentials-file",
"--rpc-http-authentication-public-key-file",
"--rpc-http-tls-enabled",
"--rpc-http-tls-keystore-file",
"--rpc-http-tls-keystore-password-file",
"--rpc-http-tls-client-auth-enabled",
"--rpc-http-tls-known-clients-file",
"--rpc-http-tls-ca-clients-enabled"));
}
private void checkRpcTlsOptionsDependencies() {
CommandLineUtils.checkOptionDependencies(
logger,
@ -1839,6 +1849,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
"--rpc-ws-apis",
"--rpc-ws-host",
"--rpc-ws-port",
"--rpc-ws-max-active-connections",
"--rpc-ws-authentication-enabled",
"--rpc-ws-authentication-credentials-file",
"--rpc-ws-authentication-public-key-file"));
@ -1855,6 +1866,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
webSocketConfiguration.setEnabled(isRpcWsEnabled);
webSocketConfiguration.setHost(rpcWsHost);
webSocketConfiguration.setPort(rpcWsPort);
webSocketConfiguration.setMaxActiveConnections(rpcWsMaxConnections);
webSocketConfiguration.setRpcApis(rpcWsApis);
webSocketConfiguration.setAuthenticationEnabled(isRpcWsAuthenticationEnabled);
webSocketConfiguration.setAuthenticationCredentialsFile(rpcWsAuthenticationCredentialsFile());

@ -56,6 +56,7 @@ public interface DefaultCommandValues {
int FAST_SYNC_MIN_PEER_COUNT = 5;
int DEFAULT_MAX_PEERS = 25;
int DEFAULT_HTTP_MAX_CONNECTIONS = 80;
int DEFAULT_WS_MAX_CONNECTIONS = 80;
float DEFAULT_FRACTION_REMOTE_WIRE_CONNECTIONS_ALLOWED =
RlpxConfiguration.DEFAULT_FRACTION_REMOTE_CONNECTIONS_ALLOWED;
String DEFAULT_KEY_VALUE_STORAGE_NAME = "rocksdb";

@ -1895,6 +1895,21 @@ public class BesuCommandTest extends CommandTestAbstract {
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void rpcWsMaxActiveConnectionsPropertyMustBeUsed() {
int maxConnections = 99;
parseCommand("--rpc-ws-max-active-connections", String.valueOf(maxConnections));
verify(mockRunnerBuilder).webSocketConfiguration(wsRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();
assertThat(wsRpcConfigArgumentCaptor.getValue().getMaxActiveConnections())
.isEqualTo(maxConnections);
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void rpcHttpTlsRequiresRpcHttpEnabled() {
parseCommand("--rpc-http-tls-enabled");
@ -2543,10 +2558,22 @@ public class BesuCommandTest extends CommandTestAbstract {
@Test
public void rpcWsOptionsRequiresServiceToBeEnabled() {
parseCommand("--rpc-ws-api", "ETH,NET", "--rpc-ws-host", "0.0.0.0", "--rpc-ws-port", "1234");
parseCommand(
"--rpc-ws-api",
"ETH,NET",
"--rpc-ws-host",
"0.0.0.0",
"--rpc-ws-port",
"1234",
"--rpc-ws-max-active-connections",
"77");
verifyOptionsConstraintLoggerCall(
"--rpc-ws-enabled", "--rpc-ws-host", "--rpc-ws-port", "--rpc-ws-api");
"--rpc-ws-enabled",
"--rpc-ws-host",
"--rpc-ws-port",
"--rpc-ws-api",
"--rpc-ws-max-active-connections");
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();

@ -88,6 +88,7 @@ rpc-ws-api=["DEBUG","ETH"]
rpc-ws-apis=["DEBUG","ETH"]
rpc-ws-host="9.10.11.12"
rpc-ws-port=9101
rpc-ws-max-active-connections=101
rpc-ws-authentication-enabled=false
rpc-ws-authentication-credentials-file="none"
rpc-ws-authentication-jwt-public-key-file="none"

@ -32,6 +32,7 @@ public class WebSocketConfiguration {
public static final int DEFAULT_WEBSOCKET_PORT = 8546;
public static final List<RpcApi> DEFAULT_WEBSOCKET_APIS =
Arrays.asList(RpcApis.ETH, RpcApis.NET, RpcApis.WEB3);
public static final int DEFAULT_MAX_ACTIVE_CONNECTIONS = 80;
private boolean enabled;
private int port;
@ -42,6 +43,7 @@ public class WebSocketConfiguration {
private List<String> hostsAllowlist = Arrays.asList("localhost", "127.0.0.1");
private File authenticationPublicKeyFile;
private long timeoutSec;
private int maxActiveConnections;
public static WebSocketConfiguration createDefault() {
final WebSocketConfiguration config = new WebSocketConfiguration();
@ -50,6 +52,7 @@ public class WebSocketConfiguration {
config.setPort(DEFAULT_WEBSOCKET_PORT);
config.setRpcApis(DEFAULT_WEBSOCKET_APIS);
config.setTimeoutSec(TimeoutOptions.defaultOptions().getTimeoutSeconds());
config.setMaxActiveConnections(DEFAULT_MAX_ACTIVE_CONNECTIONS);
return config;
}
@ -175,4 +178,12 @@ public class WebSocketConfiguration {
.add("timeoutSec", timeoutSec)
.toString();
}
public int getMaxActiveConnections() {
return maxActiveConnections;
}
public void setMaxActiveConnections(final int maxActiveConnections) {
this.maxActiveConnections = maxActiveConnections;
}
}

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.Subscrip
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
@ -30,6 +31,7 @@ import com.google.common.collect.Iterables;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
@ -49,6 +51,9 @@ public class WebSocketService {
private static final InetSocketAddress EMPTY_SOCKET_ADDRESS = new InetSocketAddress("0.0.0.0", 0);
private static final String APPLICATION_JSON = "application/json";
private final int maxActiveConnections;
private final AtomicInteger activeConnectionsCount = new AtomicInteger();
private final Vertx vertx;
private final WebSocketConfiguration configuration;
private final WebSocketRequestHandler websocketRequestHandler;
@ -77,6 +82,7 @@ public class WebSocketService {
this.configuration = configuration;
this.websocketRequestHandler = websocketRequestHandler;
this.authenticationService = authenticationService;
this.maxActiveConnections = configuration.getMaxActiveConnections();
}
public CompletableFuture<?> start() {
@ -95,6 +101,7 @@ public class WebSocketService {
.setCompressionSupported(true)
.addWebSocketSubProtocol("undefined"))
.webSocketHandler(websocketHandler())
.connectionHandler(connectionHandler())
.requestHandler(httpHandler())
.listen(startHandler(resultFuture));
@ -150,6 +157,34 @@ public class WebSocketService {
};
}
private Handler<HttpConnection> connectionHandler() {
return connection -> {
if (activeConnectionsCount.get() >= maxActiveConnections) {
// disallow new connections to prevent DoS
LOG.warn(
"Rejecting new connection from {}. {}/{} max active connections limit reached.",
connection.remoteAddress(),
activeConnectionsCount.getAndIncrement(),
maxActiveConnections);
connection.close();
} else {
LOG.debug(
"Opened connection from {}. Total of active connections: {}/{}",
connection.remoteAddress(),
activeConnectionsCount.incrementAndGet(),
maxActiveConnections);
}
connection.closeHandler(
c ->
LOG.debug(
"Connection closed from {}. Total of active connections: {}/{}",
connection.remoteAddress(),
activeConnectionsCount.decrementAndGet(),
maxActiveConnections));
};
}
private Handler<HttpServerRequest> httpHandler() {
final Router router = Router.router(vertx);

@ -31,5 +31,7 @@ public class WebSocketConfigurationTest {
assertThat(configuration.getPort()).isEqualTo(8546);
assertThat(configuration.getRpcApis())
.containsExactlyInAnyOrder(RpcApis.ETH, RpcApis.NET, RpcApis.WEB3);
assertThat(configuration.getMaxActiveConnections())
.isEqualTo(WebSocketConfiguration.DEFAULT_MAX_ACTIVE_CONNECTIONS);
}
}

@ -59,6 +59,7 @@ public class WebSocketServiceTest {
private WebSocketRequestHandler webSocketRequestHandlerSpy;
private WebSocketService websocketService;
private HttpClient httpClient;
private final int maxConnections = 3;
@Before
public void before() {
@ -67,6 +68,7 @@ public class WebSocketServiceTest {
websocketConfiguration = WebSocketConfiguration.createDefault();
websocketConfiguration.setPort(0);
websocketConfiguration.setHostsAllowlist(Collections.singletonList("*"));
websocketConfiguration.setMaxActiveConnections(maxConnections);
final Map<String, JsonRpcMethod> websocketMethods =
new WebSocketMethodsFactory(
@ -100,6 +102,45 @@ public class WebSocketServiceTest {
websocketService.stop();
}
@Test
public void limitActiveConnections(final TestContext context) {
// expecting maxConnections successful responses
final Async asyncResponse = context.async(maxConnections);
// and a number of rejections
final int countRejections = 2;
final Async asyncRejected = context.async(countRejections);
final String request = "{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"syncing\"]}";
// the number in the response is the subscription ID, so in successive responses this increments
final String expectedResponse1 = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x1\"}";
// attempt to exceed max connections - but only maxConnections should succeed
for (int i = 0; i < maxConnections + countRejections; i++) {
httpClient.webSocket(
"/",
future -> {
if (future.succeeded()) {
WebSocket ws = future.result();
ws.handler(
buffer -> {
context.assertNotNull(buffer.toString());
// assert a successful response
context.assertTrue(
buffer.toString().startsWith(expectedResponse1.substring(0, 36)));
asyncResponse.countDown();
});
ws.writeTextMessage(request);
} else {
// count down the rejected WS connections
asyncRejected.countDown();
}
});
}
// wait for successful responses AND rejected connections
asyncResponse.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
asyncRejected.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
}
@Test
public void websocketServiceExecutesHandlerOnMessage(final TestContext context) {
final Async async = context.async();
@ -107,16 +148,21 @@ public class WebSocketServiceTest {
final String request = "{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"syncing\"]}";
final String expectedResponse = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x1\"}";
httpClient.websocket(
httpClient.webSocket(
"/",
webSocket -> {
webSocket.handler(
buffer -> {
context.assertEquals(expectedResponse, buffer.toString());
async.complete();
});
webSocket.writeTextMessage(request);
future -> {
if (future.succeeded()) {
WebSocket ws = future.result();
ws.handler(
buffer -> {
context.assertEquals(expectedResponse, buffer.toString());
async.complete();
});
ws.writeTextMessage(request);
} else {
context.fail("websocket connection failed");
}
});
async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
@ -146,11 +192,16 @@ public class WebSocketServiceTest {
final byte[] bigMessage = new byte[HttpServerOptions.DEFAULT_MAX_WEBSOCKET_MESSAGE_SIZE + 1];
Arrays.fill(bigMessage, (byte) 1);
httpClient.websocket(
httpClient.webSocket(
"/",
webSocket -> {
webSocket.write(Buffer.buffer(bigMessage));
webSocket.closeHandler(v -> async.complete());
future -> {
if (future.succeeded()) {
WebSocket ws = future.result();
ws.write(Buffer.buffer(bigMessage));
ws.closeHandler(v -> async.complete());
} else {
context.fail("websocket connection failed");
}
});
async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
@ -196,7 +247,7 @@ public class WebSocketServiceTest {
}
@Test
public void webSocketDoesNotToHandlePingPayloadAsJsonRpcRequest(final TestContext context) {
public void webSocketDoesNotHandlePingPayloadAsJsonRpcRequest(final TestContext context) {
final Async async = context.async();
httpClient.webSocket(

Loading…
Cancel
Save