Check to see if vertx response is closed before responding (#613)

Occasionally the other side of an HTTP connection will drop the
connection before we are done processing.  This results in a harmless
but annoying exception showing up in the log.  If we check before we
write we won't get that exception.

Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
pull/634/head
Danno Ferrin 5 years ago committed by GitHub
parent 4a1b5b0dbd
commit a4143dc410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      build.gradle
  2. 21
      enclave/src/integration-test/java/org/hyperledger/besu/enclave/TlsEnabledHttpServerFactory.java
  3. 50
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/graphql/GraphQLHttpService.java
  4. 53
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java
  5. 2
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/authentication/AuthenticationService.java
  6. 7
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/health/HealthService.java
  7. 22
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/WebSocketService.java
  8. 16
      metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/MetricsHttpService.java

@ -242,7 +242,9 @@ allprojects {
'--add-opens',
'java.base/java.util=ALL-UNNAMED',
'--add-opens',
'java.base/java.util.concurrent=ALL-UNNAMED'
'java.base/java.util.concurrent=ALL-UNNAMED',
'--add-opens',
'java.base/java.util.concurrent.atomic=ALL-UNNAMED'
]
Set toImport = [
'test.ethereum.include',

@ -34,25 +34,27 @@ import io.vertx.core.http.ClientAuth;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.net.PfxOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import org.apache.tuweni.net.tls.VertxTrustOptions;
public class TlsEnabledHttpServerFactory {
class TlsEnabledHttpServerFactory {
private final Vertx vertx;
private final List<HttpServer> serversCreated = Lists.newArrayList();
public TlsEnabledHttpServerFactory() {
TlsEnabledHttpServerFactory() {
this.vertx = Vertx.vertx();
}
public void shutdown() {
void shutdown() {
serversCreated.forEach(HttpServer::close);
vertx.close();
}
public HttpServer create(
HttpServer create(
final TlsCertificateDefinition serverCert,
final TlsCertificateDefinition acceptedClientCerts,
final Path workDir,
@ -78,7 +80,7 @@ public class TlsEnabledHttpServerFactory {
router
.route(HttpMethod.GET, "/upcheck")
.produces(HttpHeaderValues.APPLICATION_JSON.toString())
.handler(context -> context.response().end("I'm up!"));
.handler(TlsEnabledHttpServerFactory::handleRequest);
final HttpServer mockOrionHttpServer = vertx.createHttpServer(web3HttpServerOptions);
@ -89,7 +91,7 @@ public class TlsEnabledHttpServerFactory {
serversCreated.add(mockOrionHttpServer);
return mockOrionHttpServer;
} catch (KeyStoreException
} catch (final KeyStoreException
| NoSuchAlgorithmException
| CertificateException
| IOException
@ -98,4 +100,11 @@ public class TlsEnabledHttpServerFactory {
throw new RuntimeException("Failed to construct a TLS Enabled Server", e);
}
}
private static void handleRequest(final RoutingContext context) {
final HttpServerResponse response = context.response();
if (!response.closed()) {
response.end("I'm up!");
}
}
}

@ -32,6 +32,7 @@ import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
@ -72,8 +73,7 @@ public class GraphQLHttpService {
private static final MediaType MEDIA_TYPE_JUST_JSON = MediaType.JSON_UTF_8.withoutParameters();
private static final String EMPTY_RESPONSE = "";
private static final TypeReference<Map<String, Object>> MAP_TYPE =
new TypeReference<Map<String, Object>>() {};
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {};
private final Vertx vertx;
private final GraphQLConfiguration config;
@ -185,12 +185,14 @@ public class GraphQLHttpService {
|| (hostHeader.isPresent() && hostIsInWhitelist(hostHeader.get()))) {
event.next();
} else {
event
.response()
final HttpServerResponse response = event.response();
if (!response.closed()) {
response
.setStatusCode(403)
.putHeader("Content-Type", "application/json; charset=utf-8")
.end("{\"message\":\"Host not authorized.\"}");
}
}
};
}
@ -246,7 +248,10 @@ public class GraphQLHttpService {
// Empty Get/Post requests to / will be redirected to /graphql using 308 Permanent Redirect
private void handleEmptyRequestAndRedirect(final RoutingContext routingContext) {
HttpServerResponse response = routingContext.response();
final HttpServerResponse response = routingContext.response();
if (response.closed()) {
return;
}
response.setStatusCode(HttpResponseStatus.PERMANENT_REDIRECT.code());
response.putHeader("Location", "/graphql");
response.end();
@ -262,11 +267,7 @@ public class GraphQLHttpService {
switch (request.method()) {
case GET:
final String queryString = request.getParam("query");
if (queryString == null) {
query = "";
} else {
query = queryString;
}
query = Objects.requireNonNullElse(queryString, "");
operationName = request.getParam("operationName");
final String variableString = request.getParam("variables");
if (variableString != null) {
@ -282,26 +283,14 @@ public class GraphQLHttpService {
final GraphQLJsonRequest jsonRequest =
Json.decodeValue(requestBody, GraphQLJsonRequest.class);
final String jsonQuery = jsonRequest.getQuery();
if (jsonQuery == null) {
query = "";
} else {
query = jsonQuery;
}
query = Objects.requireNonNullElse(jsonQuery, "");
operationName = jsonRequest.getOperationName();
Map<String, Object> jsonVariables = jsonRequest.getVariables();
if (jsonVariables != null) {
variables = jsonVariables;
} else {
variables = Collections.emptyMap();
}
final Map<String, Object> jsonVariables = jsonRequest.getVariables();
variables = Objects.requireNonNullElse(jsonVariables, Collections.emptyMap());
} else {
// treat all else as application/graphql
final String requestQuery = routingContext.getBodyAsString().trim();
if (requestQuery == null) {
query = "";
} else {
query = requestQuery;
}
query = Objects.requireNonNullElse(requestQuery, "");
operationName = null;
variables = Collections.emptyMap();
}
@ -326,6 +315,9 @@ public class GraphQLHttpService {
},
false,
(res) -> {
if (response.closed()) {
return;
}
response.putHeader("Content-Type", MediaType.JSON_UTF_8.toString());
if (res.failed()) {
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
@ -393,11 +385,13 @@ public class GraphQLHttpService {
private void handleGraphQLError(final RoutingContext routingContext, final Exception ex) {
LOG.debug("Error handling GraphQL request", ex);
routingContext
.response()
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
.end(Json.encode(new GraphQLErrorResponse(ex.getMessage())));
}
}
private String buildCorsRegexFromConfig() {
if (config.getCorsAllowedDomains().isEmpty()) {

@ -194,11 +194,10 @@ public class JsonRpcHttpService {
natService.ifNatEnvironment(
NatMethod.UPNP,
natManager -> {
natManager ->
((UpnpNatManager) natManager)
.requestPortForward(
config.getPort(), NetworkProtocol.TCP, NatServiceType.JSON_RPC);
});
config.getPort(), NetworkProtocol.TCP, NatServiceType.JSON_RPC));
return;
}
@ -343,12 +342,14 @@ public class JsonRpcHttpService {
|| (hostHeader.isPresent() && hostIsInWhitelist(hostHeader.get()))) {
event.next();
} else {
event
.response()
final HttpServerResponse response = event.response();
if (!response.closed()) {
response
.setStatusCode(403)
.putHeader("Content-Type", "application/json; charset=utf-8")
.end("{\"message\":\"Host not authorized.\"}");
}
}
};
}
@ -413,7 +414,7 @@ public class JsonRpcHttpService {
private void handleJsonRPCRequest(final RoutingContext routingContext) {
// first check token if authentication is required
String token = getAuthToken(routingContext);
final String token = getAuthToken(routingContext);
if (authenticationService.isPresent() && token == null) {
// no auth token when auth required
handleJsonRpcUnauthorizedError(routingContext, null, JsonRpcError.UNAUTHORIZED);
@ -425,9 +426,7 @@ public class JsonRpcHttpService {
AuthenticationUtils.getUser(
authenticationService,
token,
user -> {
handleJsonSingleRequest(routingContext, new JsonObject(json), user);
});
user -> handleJsonSingleRequest(routingContext, new JsonObject(json), user));
} else {
final JsonArray array = new JsonArray(json);
if (array.size() < 1) {
@ -437,9 +436,7 @@ public class JsonRpcHttpService {
AuthenticationUtils.getUser(
authenticationService,
token,
user -> {
handleJsonBatchRequest(routingContext, array, user);
});
user -> handleJsonBatchRequest(routingContext, array, user));
}
} catch (final DecodeException ex) {
handleJsonRpcError(routingContext, null, JsonRpcError.PARSE_ERROR);
@ -468,9 +465,12 @@ public class JsonRpcHttpService {
}
final JsonRpcResponse jsonRpcResponse = (JsonRpcResponse) res.result();
response.setStatusCode(status(jsonRpcResponse).code());
response.putHeader("Content-Type", APPLICATION_JSON);
response.end(serialize(jsonRpcResponse));
if (!response.closed()) {
response
.setStatusCode(status(jsonRpcResponse).code())
.putHeader("Content-Type", APPLICATION_JSON)
.end(serialize(jsonRpcResponse));
}
});
}
@ -529,11 +529,12 @@ public class JsonRpcHttpService {
CompositeFuture.all(responses)
.setHandler(
(res) -> {
final HttpServerResponse response = routingContext.response();
if (response.closed()) {
return;
}
if (res.failed()) {
routingContext
.response()
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
.end();
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
return;
}
final JsonRpcResponse[] completed =
@ -542,7 +543,7 @@ public class JsonRpcHttpService {
.filter(this::isNonEmptyResponses)
.toArray(JsonRpcResponse[]::new);
routingContext.response().end(Json.encode(completed));
response.end(Json.encode(completed));
});
}
@ -612,19 +613,23 @@ public class JsonRpcHttpService {
private void handleJsonRpcError(
final RoutingContext routingContext, final Object id, final JsonRpcError error) {
routingContext
.response()
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
.end(Json.encode(new JsonRpcErrorResponse(id, error)));
}
}
private void handleJsonRpcUnauthorizedError(
final RoutingContext routingContext, final Object id, final JsonRpcError error) {
routingContext
.response()
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(HttpResponseStatus.UNAUTHORIZED.code())
.end(Json.encode(new JsonRpcErrorResponse(id, error)));
}
}
private JsonRpcResponse errorResponse(final Object id, final JsonRpcError error) {
return new JsonRpcErrorResponse(id, error);

@ -193,10 +193,12 @@ public class AuthenticationService {
final JsonObject responseBody = new JsonObject().put("token", token);
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response.setStatusCode(200);
response.putHeader("Content-Type", "application/json");
response.end(responseBody.encode());
}
}
});
}

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.api.jsonrpc.health;
import static java.util.Collections.singletonMap;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
@ -48,11 +49,13 @@ public final class HealthService {
statusCode = UNHEALTHY_STATUS_CODE;
statusText = UNHEALTHY_STATUS_TEXT;
}
routingContext
.response()
final HttpServerResponse response = routingContext.response();
if (!response.closed()) {
response
.setStatusCode(statusCode)
.end(new JsonObject(singletonMap("status", statusText)).encodePrettily());
}
}
@FunctionalInterface
public interface HealthCheck {

@ -33,6 +33,7 @@ import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.Router;
@ -166,16 +167,17 @@ public class WebSocketService {
.handler(AuthenticationService::handleDisabledLogin);
}
router
.route()
.handler(
http ->
http.response()
.setStatusCode(400)
.end("Websocket endpoint can't handle HTTP requests"));
router.route().handler(WebSocketService::handleHttpNotSupported);
return router;
}
private static void handleHttpNotSupported(final RoutingContext http) {
final HttpServerResponse response = http.response();
if (!response.closed()) {
response.setStatusCode(400).end("Websocket endpoint can't handle HTTP requests");
}
}
private Handler<AsyncResult<HttpServer>> startHandler(final CompletableFuture<?> resultFuture) {
return res -> {
if (res.succeeded()) {
@ -234,12 +236,14 @@ public class WebSocketService {
if (hasWhitelistedHostnameHeader(Optional.ofNullable(event.request().host()))) {
event.next();
} else {
event
.response()
final HttpServerResponse response = event.response();
if (!response.closed()) {
response
.setStatusCode(403)
.putHeader("Content-Type", "application/json; charset=utf-8")
.end("{\"message\":\"Host not authorized.\"}");
}
}
};
}

@ -137,11 +137,13 @@ class MetricsHttpService implements MetricsService {
|| (hostHeader.isPresent() && hostIsInWhitelist(hostHeader.get()))) {
event.next();
} else {
event
.response()
final HttpServerResponse response = event.response();
if (!response.closed()) {
response
.setStatusCode(403)
.putHeader("Content-Type", "text/plain; charset=utf-8")
.end("Host not authorized.");
.putHeader("Content-Type", "application/json; charset=utf-8")
.end("{\"message\":\"Host not authorized.\"}");
}
}
};
}
@ -206,11 +208,13 @@ class MetricsHttpService implements MetricsService {
},
false,
(res) -> {
if (response.closed()) {
// Request for metrics closed before response was generated
return;
}
if (res.failed()) {
LOG.error("Request for metrics failed", res.cause());
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
} else if (response.closed()) {
LOG.trace("Request for metrics closed before response was generated");
} else {
response.setStatusCode(HttpResponseStatus.OK.code());
response.putHeader("Content-Type", TextFormat.CONTENT_TYPE_004);

Loading…
Cancel
Save