handle batch requests in WebSockets (#1583)

* handle batch requests in WebSockets

Signed-off-by: Sally MacFarlane <sally.macfarlane@consensys.net>
pull/1592/head
Sally MacFarlane 4 years ago committed by GitHub
parent a8e6af0b55
commit 0dd90d016d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      CHANGELOG.md
  2. 164
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketRequestHandler.java
  3. 70
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketRequestHandlerTest.java

@ -1,5 +1,10 @@
# Changelog
## 20.10.2
### Additions and Improvements
* Added support for batched requests in WebSockets. [#1583](https://github.com/hyperledger/besu/pull/1583)
## 20.10.1
### Additions and Improvements

@ -14,6 +14,9 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;
import static java.util.stream.Collectors.toList;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;
import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.handlers.RpcMethodTimeoutException;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
@ -23,20 +26,27 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonR
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -80,46 +90,124 @@ public class WebSocketRequestHandler {
final String payload,
final Optional<User> user) {
return future -> {
final WebSocketRpcRequest request;
try {
request = Json.decodeValue(payload, WebSocketRpcRequest.class);
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
return;
}
if (!methods.containsKey(request.getMethod())) {
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.METHOD_NOT_FOUND));
LOG.debug("Can't find method {}", request.getMethod());
final String json = payload.trim();
if (!json.isEmpty() && json.charAt(0) == '{') {
try {
handleSingleRequest(authenticationService, id, user, future, getRequest(payload));
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
return;
}
} else if (json.length() == 0) {
future.complete(errorResponse(null, INVALID_REQUEST));
return;
}
final JsonRpcMethod method = methods.get(request.getMethod());
try {
LOG.debug("WS-RPC request -> {}", request.getMethod());
request.setConnectionId(id);
if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
final JsonRpcRequestContext requestContext =
new JsonRpcRequestContext(
request, user, new IsAliveHandler(ethScheduler, timeoutSec));
future.complete(method.response(requestContext));
} else {
future.complete(
new JsonRpcUnauthorizedResponse(request.getId(), JsonRpcError.UNAUTHORIZED));
} else {
final JsonArray jsonArray = new JsonArray(json);
if (jsonArray.size() < 1) {
future.complete(errorResponse(null, INVALID_REQUEST));
return;
}
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INVALID_PARAMS));
} catch (final RpcMethodTimeoutException e) {
LOG.error(JsonRpcError.TIMEOUT_ERROR.getMessage(), e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.TIMEOUT_ERROR));
} catch (final Exception e) {
LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INTERNAL_ERROR));
// handle batch request
LOG.debug("batch request size {}", jsonArray.size());
handleJsonBatchRequest(authenticationService, id, jsonArray, user);
}
};
}
private JsonRpcResponse process(
final Optional<AuthenticationService> authenticationService,
final String id,
final Optional<User> user,
final WebSocketRpcRequest requestBody) {
if (!methods.containsKey(requestBody.getMethod())) {
LOG.debug("Can't find method {}", requestBody.getMethod());
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.METHOD_NOT_FOUND);
}
final JsonRpcMethod method = methods.get(requestBody.getMethod());
try {
LOG.debug("WS-RPC request -> {}", requestBody.getMethod());
requestBody.setConnectionId(id);
if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
final JsonRpcRequestContext requestContext =
new JsonRpcRequestContext(
requestBody, user, new IsAliveHandler(ethScheduler, timeoutSec));
return method.response(requestContext);
} else {
return new JsonRpcUnauthorizedResponse(requestBody.getId(), JsonRpcError.UNAUTHORIZED);
}
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.INVALID_PARAMS);
} catch (final RpcMethodTimeoutException e) {
LOG.error(JsonRpcError.TIMEOUT_ERROR.getMessage(), e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.TIMEOUT_ERROR);
} catch (final Exception e) {
LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.INTERNAL_ERROR);
}
}
private void handleSingleRequest(
final Optional<AuthenticationService> authenticationService,
final String id,
final Optional<User> user,
final Promise<Object> future,
final WebSocketRpcRequest requestBody) {
future.complete(process(authenticationService, id, user, requestBody));
}
@SuppressWarnings("rawtypes")
private void handleJsonBatchRequest(
final Optional<AuthenticationService> authenticationService,
final String id,
final JsonArray jsonArray,
final Optional<User> user) {
// Interpret json as rpc request
final List<Future> responses =
jsonArray.stream()
.map(
obj -> {
if (!(obj instanceof JsonObject)) {
return Future.succeededFuture(errorResponse(null, INVALID_REQUEST));
}
final JsonObject req = (JsonObject) obj;
final Future<JsonRpcResponse> fut = Future.future();
vertx.executeBlocking(
future ->
future.complete(
process(authenticationService, id, user, getRequest(req.toString()))),
false,
ar -> {
if (ar.failed()) {
fut.fail(ar.cause());
} else {
fut.complete((JsonRpcResponse) ar.result());
}
});
return fut;
})
.collect(toList());
CompositeFuture.all(responses)
.setHandler(
(res) -> {
final JsonRpcResponse[] completed =
res.result().list().stream()
.map(JsonRpcResponse.class::cast)
.filter(this::isNonEmptyResponses)
.toArray(JsonRpcResponse[]::new);
vertx.eventBus().send(id, Json.encode(completed));
});
}
private WebSocketRpcRequest getRequest(final String payload) {
return Json.decodeValue(payload, WebSocketRpcRequest.class);
}
private Handler<AsyncResult<Object>> resultHandler(final String id) {
return result -> {
if (result.succeeded()) {
@ -134,4 +222,12 @@ public class WebSocketRequestHandler {
private void replyToClient(final String id, final Buffer request) {
vertx.eventBus().send(id, request.toString());
}
private JsonRpcResponse errorResponse(final Object id, final JsonRpcError error) {
return new JsonRpcErrorResponse(id, error);
}
private boolean isNonEmptyResponses(final JsonRpcResponse result) {
return result.getType() != JsonRpcResponseType.NONE;
}
}

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
@ -31,11 +32,13 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcR
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
@ -103,6 +106,73 @@ public class WebSocketRequestHandlerTest {
.completionHandler(v -> handler.handle(websocketId, requestJson.toString()));
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
verify(jsonRpcMethodMock).response(eq(expectedRequest));
}
@Test
public void handlerBatchRequestDeliversResponseSuccessfully(final TestContext context) {
final Async async = context.async();
final JsonObject requestJson = new JsonObject().put("id", 1).put("method", "eth_x");
final JsonArray arrayJson = new JsonArray(List.of(requestJson, requestJson));
final JsonRpcRequest requestBody = requestJson.mapTo(WebSocketRpcRequest.class);
final JsonRpcRequestContext expectedRequest = new JsonRpcRequestContext(requestBody);
final JsonRpcSuccessResponse expectedSingleResponse =
new JsonRpcSuccessResponse(requestBody.getId(), null);
final JsonArray expectedBatchResponse =
new JsonArray(List.of(expectedSingleResponse, expectedSingleResponse));
when(jsonRpcMethodMock.response(eq(expectedRequest))).thenReturn(expectedSingleResponse);
final String websocketId = UUID.randomUUID().toString();
vertx
.eventBus()
.consumer(websocketId)
.handler(
msg -> {
context.assertEquals(Json.encode(expectedBatchResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, arrayJson.toString()));
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
// can verify only after async not before
verify(jsonRpcMethodMock, Mockito.times(2)).response(eq(expectedRequest));
}
@Test
public void handlerBatchRequestContainingErrorsShouldRespondWithBatchErrors(
final TestContext context) {
final Async async = context.async();
final JsonObject requestJson =
new JsonObject().put("id", 1).put("method", "eth_nonexistentMethod");
final JsonRpcErrorResponse expectedErrorResponse1 =
new JsonRpcErrorResponse(1, JsonRpcError.METHOD_NOT_FOUND);
final JsonArray arrayJson = new JsonArray(List.of(requestJson, ""));
final JsonRpcErrorResponse expectedErrorResponse2 =
new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST);
final JsonArray expectedBatchResponse =
new JsonArray(List.of(expectedErrorResponse1, expectedErrorResponse2));
final String websocketId = UUID.randomUUID().toString();
vertx
.eventBus()
.consumer(websocketId)
.handler(
msg -> {
context.assertEquals(Json.encode(expectedBatchResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, arrayJson.toString()));
async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
}
@Test

Loading…
Cancel
Save