|
|
|
@ -52,9 +52,11 @@ public class JsonRpcExecutorHandler { |
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(JsonRpcExecutorHandler.class); |
|
|
|
|
private static final String SPAN_CONTEXT = "span_context"; |
|
|
|
|
private static final String APPLICATION_JSON = "application/json"; |
|
|
|
|
private static final ObjectWriter JSON_OBJECT_WRITER = |
|
|
|
|
private static final ObjectMapper JSON_OBJECT_MAPPER = |
|
|
|
|
new ObjectMapper() |
|
|
|
|
.registerModule(new Jdk8Module()) // Handle JDK8 Optionals (de)serialization
|
|
|
|
|
.registerModule(new Jdk8Module()); // Handle JDK8 Optionals (de)serialization
|
|
|
|
|
private static final ObjectWriter JSON_OBJECT_WRITER = |
|
|
|
|
JSON_OBJECT_MAPPER |
|
|
|
|
.writerWithDefaultPrettyPrinter() |
|
|
|
|
.without(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM) |
|
|
|
|
.with(JsonGenerator.Feature.AUTO_CLOSE_TARGET); |
|
|
|
@ -72,6 +74,7 @@ public class JsonRpcExecutorHandler { |
|
|
|
|
|
|
|
|
|
if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())) { |
|
|
|
|
JsonObject jsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name()); |
|
|
|
|
lazyTraceLogger(jsonRequest::toString); |
|
|
|
|
JsonRpcResponse jsonRpcResponse = |
|
|
|
|
jsonRpcExecutor.execute( |
|
|
|
|
user, |
|
|
|
@ -87,23 +90,24 @@ public class JsonRpcExecutorHandler { |
|
|
|
|
try (final JsonResponseStreamer streamer = |
|
|
|
|
new JsonResponseStreamer(response, ctx.request().remoteAddress())) { |
|
|
|
|
// underlying output stream lifecycle is managed by the json object writer
|
|
|
|
|
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(jsonRpcResponse)); |
|
|
|
|
JSON_OBJECT_WRITER.writeValue(streamer, jsonRpcResponse); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name())) { |
|
|
|
|
JsonArray batchJsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name()); |
|
|
|
|
List<JsonRpcResponse> jsonRpcBatchResponse; |
|
|
|
|
lazyTraceLogger(batchJsonRequest::toString); |
|
|
|
|
List<JsonRpcResponse> jsonRpcBatchResponses = new ArrayList<>(); |
|
|
|
|
try { |
|
|
|
|
List<JsonRpcResponse> responses = new ArrayList<>(); |
|
|
|
|
for (int i = 0; i < batchJsonRequest.size(); i++) { |
|
|
|
|
final JsonObject jsonRequest; |
|
|
|
|
try { |
|
|
|
|
jsonRequest = batchJsonRequest.getJsonObject(i); |
|
|
|
|
} catch (ClassCastException e) { |
|
|
|
|
responses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST)); |
|
|
|
|
jsonRpcBatchResponses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST)); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
responses.add( |
|
|
|
|
jsonRpcBatchResponses.add( |
|
|
|
|
jsonRpcExecutor.execute( |
|
|
|
|
user, |
|
|
|
|
tracer, |
|
|
|
@ -112,18 +116,18 @@ public class JsonRpcExecutorHandler { |
|
|
|
|
jsonRequest, |
|
|
|
|
req -> req.mapTo(JsonRpcRequest.class))); |
|
|
|
|
} |
|
|
|
|
jsonRpcBatchResponse = responses; |
|
|
|
|
} catch (RuntimeException e) { |
|
|
|
|
response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
final JsonRpcResponse[] completed = |
|
|
|
|
jsonRpcBatchResponse.stream() |
|
|
|
|
jsonRpcBatchResponses.stream() |
|
|
|
|
.filter(jsonRpcResponse -> jsonRpcResponse.getType() != JsonRpcResponseType.NONE) |
|
|
|
|
.toArray(JsonRpcResponse[]::new); |
|
|
|
|
try (final JsonResponseStreamer streamer = |
|
|
|
|
new JsonResponseStreamer(response, ctx.request().remoteAddress())) { |
|
|
|
|
// underlying output stream lifecycle is managed by the json object writer
|
|
|
|
|
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(completed)); |
|
|
|
|
JSON_OBJECT_WRITER.writeValue(streamer, completed); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
@ -170,4 +174,19 @@ public class JsonRpcExecutorHandler { |
|
|
|
|
return HttpResponseStatus.OK; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@FunctionalInterface |
|
|
|
|
private interface ExceptionThrowingSupplier<T> { |
|
|
|
|
T get() throws Exception; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static void lazyTraceLogger(final ExceptionThrowingSupplier<String> logMessageSupplier) { |
|
|
|
|
if (LOG.isTraceEnabled()) { |
|
|
|
|
try { |
|
|
|
|
LOG.trace(logMessageSupplier.get()); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
throw new RuntimeException(e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|