mirror of https://github.com/hyperledger/besu
Stream JSON RPC responses to avoid creating big JSON strings in memory (#3076)
* Stream JSON RPC responses to avoid creating big JSON string in memory for large responses Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> * Adapt code to last development on result with Optionals Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> * Log an error if there is an IOException during the streaming of the response Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> * Remove the intermediate String object creation, writing directly to a Buffer Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> * Implement response streaming for web socket Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> * Fix log messages Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> * Move inner classes to outer level, to avoid too big class files Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> * Fix copyright Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>pull/3225/head
parent
c4a81f5f11
commit
6b47c8fc4b
@ -0,0 +1,74 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu contributors |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.api.jsonrpc; |
||||
|
||||
import java.io.IOException; |
||||
import java.io.OutputStream; |
||||
import java.util.concurrent.Semaphore; |
||||
|
||||
import io.vertx.core.buffer.Buffer; |
||||
import io.vertx.core.http.HttpServerResponse; |
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
class JsonResponseStreamer extends OutputStream { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
|
||||
private final HttpServerResponse response; |
||||
private final Semaphore paused = new Semaphore(0); |
||||
private final byte[] singleByteBuf = new byte[1]; |
||||
private boolean chunked = false; |
||||
|
||||
public JsonResponseStreamer(final HttpServerResponse response) { |
||||
this.response = response; |
||||
} |
||||
|
||||
@Override |
||||
public void write(final int b) throws IOException { |
||||
singleByteBuf[0] = (byte) b; |
||||
write(singleByteBuf, 0, 1); |
||||
} |
||||
|
||||
@Override |
||||
public void write(final byte[] bbuf, final int off, final int len) throws IOException { |
||||
if (!chunked) { |
||||
response.setChunked(true); |
||||
chunked = true; |
||||
} |
||||
|
||||
if (response.writeQueueFull()) { |
||||
LOG.debug("HttpResponse write queue is full pausing streaming"); |
||||
response.drainHandler(e -> paused.release()); |
||||
try { |
||||
paused.acquire(); |
||||
LOG.debug("HttpResponse write queue is not accepting more data, resuming streaming"); |
||||
} catch (InterruptedException ex) { |
||||
Thread.currentThread().interrupt(); |
||||
throw new IOException( |
||||
"Interrupted while waiting for HttpServerResponse to drain the write queue", ex); |
||||
} |
||||
} |
||||
|
||||
Buffer buf = Buffer.buffer(len); |
||||
buf.appendBytes(bbuf, off, len); |
||||
response.write(buf); |
||||
} |
||||
|
||||
@Override |
||||
public void close() throws IOException { |
||||
response.end(); |
||||
} |
||||
} |
@ -0,0 +1,83 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu contributors |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket; |
||||
|
||||
import java.io.IOException; |
||||
import java.io.OutputStream; |
||||
import java.util.concurrent.Semaphore; |
||||
|
||||
import io.vertx.core.buffer.Buffer; |
||||
import io.vertx.core.http.ServerWebSocket; |
||||
import io.vertx.core.http.WebSocketFrame; |
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
class JsonResponseStreamer extends OutputStream { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private static final Buffer EMPTY_BUFFER = Buffer.buffer(); |
||||
|
||||
private final ServerWebSocket response; |
||||
private final Semaphore paused = new Semaphore(0); |
||||
private final byte[] singleByteBuf = new byte[1]; |
||||
private boolean firstFrame = true; |
||||
private Buffer buffer = EMPTY_BUFFER; |
||||
|
||||
public JsonResponseStreamer(final ServerWebSocket response) { |
||||
this.response = response; |
||||
} |
||||
|
||||
@Override |
||||
public void write(final int b) throws IOException { |
||||
singleByteBuf[0] = (byte) b; |
||||
write(singleByteBuf, 0, 1); |
||||
} |
||||
|
||||
@Override |
||||
public void write(final byte[] bbuf, final int off, final int len) throws IOException { |
||||
if (buffer != EMPTY_BUFFER) { |
||||
writeFrame(buffer, false); |
||||
} |
||||
Buffer buf = Buffer.buffer(len); |
||||
buf.appendBytes(bbuf, off, len); |
||||
buffer = buf; |
||||
} |
||||
|
||||
private void writeFrame(final Buffer buf, final boolean isFinal) throws IOException { |
||||
if (response.writeQueueFull()) { |
||||
LOG.debug("WebSocketResponse write queue is full pausing streaming"); |
||||
response.drainHandler(e -> paused.release()); |
||||
try { |
||||
paused.acquire(); |
||||
LOG.debug("WebSocketResponse write queue is not accepting more data, resuming streaming"); |
||||
} catch (InterruptedException ex) { |
||||
Thread.currentThread().interrupt(); |
||||
throw new IOException( |
||||
"Interrupted while waiting for HttpServerResponse to drain the write queue", ex); |
||||
} |
||||
} |
||||
if (firstFrame) { |
||||
response.writeFrame(WebSocketFrame.textFrame(buf.toString(), isFinal)); |
||||
firstFrame = false; |
||||
} else { |
||||
response.writeFrame(WebSocketFrame.continuationFrame(buf, isFinal)); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() throws IOException { |
||||
writeFrame(buffer, true); |
||||
} |
||||
} |
@ -0,0 +1,120 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu contributors |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.api.jsonrpc; |
||||
|
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.argThat; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import io.vertx.core.Handler; |
||||
import io.vertx.core.buffer.Buffer; |
||||
import io.vertx.core.http.HttpServerResponse; |
||||
import org.junit.Test; |
||||
import org.mockito.ArgumentMatcher; |
||||
import org.mockito.invocation.InvocationOnMock; |
||||
|
||||
public class JsonResponseStreamerTest { |
||||
|
||||
@Test |
||||
public void writeSingleChar() throws IOException { |
||||
HttpServerResponse httpResponse = mock(HttpServerResponse.class); |
||||
|
||||
JsonResponseStreamer streamer = new JsonResponseStreamer(httpResponse); |
||||
streamer.write('x'); |
||||
|
||||
verify(httpResponse).write(argThat(bufferContains("x"))); |
||||
} |
||||
|
||||
@Test |
||||
public void writeString() throws IOException { |
||||
HttpServerResponse httpResponse = mock(HttpServerResponse.class); |
||||
|
||||
JsonResponseStreamer streamer = new JsonResponseStreamer(httpResponse); |
||||
streamer.write("xyz".getBytes(StandardCharsets.UTF_8)); |
||||
|
||||
verify(httpResponse).write(argThat(bufferContains("xyz"))); |
||||
} |
||||
|
||||
@Test |
||||
public void writeSubString() throws IOException { |
||||
HttpServerResponse httpResponse = mock(HttpServerResponse.class); |
||||
|
||||
JsonResponseStreamer streamer = new JsonResponseStreamer(httpResponse); |
||||
streamer.write("abcxyz".getBytes(StandardCharsets.UTF_8), 1, 3); |
||||
|
||||
verify(httpResponse).write(argThat(bufferContains("bcx"))); |
||||
} |
||||
|
||||
@Test |
||||
public void writeTwice() throws IOException { |
||||
HttpServerResponse httpResponse = mock(HttpServerResponse.class); |
||||
|
||||
JsonResponseStreamer streamer = new JsonResponseStreamer(httpResponse); |
||||
streamer.write("xyz".getBytes(StandardCharsets.UTF_8)); |
||||
streamer.write('\n'); |
||||
|
||||
verify(httpResponse).write(argThat(bufferContains("xyz"))); |
||||
verify(httpResponse).write(argThat(bufferContains("\n"))); |
||||
} |
||||
|
||||
@Test |
||||
public void writeStringAndClose() throws IOException { |
||||
HttpServerResponse httpResponse = mock(HttpServerResponse.class); |
||||
|
||||
try (JsonResponseStreamer streamer = new JsonResponseStreamer(httpResponse)) { |
||||
streamer.write("xyz".getBytes(StandardCharsets.UTF_8)); |
||||
} |
||||
|
||||
verify(httpResponse).write(argThat(bufferContains("xyz"))); |
||||
verify(httpResponse).end(); |
||||
} |
||||
|
||||
@Test |
||||
public void waitQueueIsDrained() throws IOException { |
||||
HttpServerResponse httpResponse = mock(HttpServerResponse.class); |
||||
when(httpResponse.writeQueueFull()).thenReturn(Boolean.TRUE, Boolean.FALSE); |
||||
|
||||
when(httpResponse.drainHandler(any())).then(this::emptyQueueAfterAWhile); |
||||
|
||||
try (JsonResponseStreamer streamer = new JsonResponseStreamer(httpResponse)) { |
||||
streamer.write("xyz".getBytes(StandardCharsets.UTF_8)); |
||||
streamer.write("123".getBytes(StandardCharsets.UTF_8)); |
||||
} |
||||
|
||||
verify(httpResponse).write(argThat(bufferContains("xyz"))); |
||||
verify(httpResponse).write(argThat(bufferContains("123"))); |
||||
verify(httpResponse).end(); |
||||
} |
||||
|
||||
private HttpServerResponse emptyQueueAfterAWhile(final InvocationOnMock invocation) { |
||||
Handler<Void> handler = invocation.getArgument(0); |
||||
|
||||
Executors.newSingleThreadScheduledExecutor() |
||||
.schedule(() -> handler.handle(null), 1, TimeUnit.SECONDS); |
||||
|
||||
return (HttpServerResponse) invocation.getMock(); |
||||
} |
||||
|
||||
private ArgumentMatcher<Buffer> bufferContains(final String text) { |
||||
return buf -> buf.toString().equals(text); |
||||
} |
||||
} |
@ -0,0 +1,112 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu contributors |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket; |
||||
|
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.argThat; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import io.vertx.core.Handler; |
||||
import io.vertx.core.http.ServerWebSocket; |
||||
import io.vertx.core.http.WebSocketFrame; |
||||
import org.junit.Test; |
||||
import org.mockito.ArgumentMatcher; |
||||
import org.mockito.invocation.InvocationOnMock; |
||||
|
||||
public class JsonResponseStreamerTest { |
||||
|
||||
@Test |
||||
public void writeSingleChar() throws IOException { |
||||
final ServerWebSocket response = mock(ServerWebSocket.class); |
||||
|
||||
try (JsonResponseStreamer streamer = new JsonResponseStreamer(response)) { |
||||
streamer.write('x'); |
||||
} |
||||
|
||||
verify(response).writeFrame(argThat(frameContains("x", true))); |
||||
} |
||||
|
||||
@Test |
||||
public void writeString() throws IOException { |
||||
final ServerWebSocket response = mock(ServerWebSocket.class); |
||||
|
||||
try (JsonResponseStreamer streamer = new JsonResponseStreamer(response)) { |
||||
streamer.write("xyz".getBytes(StandardCharsets.UTF_8), 0, 3); |
||||
} |
||||
|
||||
verify(response).writeFrame(argThat(frameContains("xyz", true))); |
||||
} |
||||
|
||||
@Test |
||||
public void writeSubString() throws IOException { |
||||
final ServerWebSocket response = mock(ServerWebSocket.class); |
||||
|
||||
try (JsonResponseStreamer streamer = new JsonResponseStreamer(response)) { |
||||
streamer.write("abcxyz".getBytes(StandardCharsets.UTF_8), 1, 3); |
||||
} |
||||
|
||||
verify(response).writeFrame(argThat(frameContains("bcx", true))); |
||||
} |
||||
|
||||
@Test |
||||
public void writeTwice() throws IOException { |
||||
final ServerWebSocket response = mock(ServerWebSocket.class); |
||||
|
||||
try (JsonResponseStreamer streamer = new JsonResponseStreamer(response)) { |
||||
streamer.write("xyz".getBytes(StandardCharsets.UTF_8)); |
||||
streamer.write('\n'); |
||||
} |
||||
|
||||
verify(response).writeFrame(argThat(frameContains("xyz", false))); |
||||
verify(response).writeFrame(argThat(frameContains("\n", true))); |
||||
} |
||||
|
||||
@Test |
||||
public void waitQueueIsDrained() throws IOException { |
||||
final ServerWebSocket response = mock(ServerWebSocket.class); |
||||
|
||||
when(response.writeQueueFull()).thenReturn(Boolean.TRUE, Boolean.FALSE); |
||||
|
||||
when(response.drainHandler(any())).then(this::emptyQueueAfterAWhile); |
||||
|
||||
try (JsonResponseStreamer streamer = new JsonResponseStreamer(response)) { |
||||
streamer.write("xyz".getBytes(StandardCharsets.UTF_8)); |
||||
streamer.write("123".getBytes(StandardCharsets.UTF_8)); |
||||
} |
||||
|
||||
verify(response).writeFrame(argThat(frameContains("xyz", false))); |
||||
verify(response).writeFrame(argThat(frameContains("123", true))); |
||||
} |
||||
|
||||
private ServerWebSocket emptyQueueAfterAWhile(final InvocationOnMock invocation) { |
||||
Handler<Void> handler = invocation.getArgument(0); |
||||
|
||||
Executors.newSingleThreadScheduledExecutor() |
||||
.schedule(() -> handler.handle(null), 1, TimeUnit.SECONDS); |
||||
|
||||
return (ServerWebSocket) invocation.getMock(); |
||||
} |
||||
|
||||
private ArgumentMatcher<WebSocketFrame> frameContains(final String text, final boolean isFinal) { |
||||
return frame -> frame.textData().equals(text) && frame.isFinal() == isFinal; |
||||
} |
||||
} |
Loading…
Reference in new issue