[NC-1772] Avoid allocating and returning a ByteBuf from MessageFramer.frame. (#116)

We can write directly to the target instead and avoid the caller needing to release the buffer.
Fixes memory leak when clients are repeatedly connecting and disconnecting.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 63b2578b2f
commit a2e28b1603
  1. 2
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/AbstractHandshakeHandler.java
  2. 2
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/MessageFramer.java
  3. 20
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java
  4. 8
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java

@ -144,7 +144,7 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
|| outboundMessage.getData().getCode() != WireMessageCodes.HELLO) { || outboundMessage.getData().getCode() != WireMessageCodes.HELLO) {
throw new IllegalStateException("First wire message sent wasn't a HELLO."); throw new IllegalStateException("First wire message sent wasn't a HELLO.");
} }
out.writeBytes(framer.frame(outboundMessage.getData())); framer.frame(outboundMessage.getData(), out);
context.pipeline().remove(this); context.pipeline().remove(this);
} }
} }

@ -32,6 +32,6 @@ final class MessageFramer extends MessageToByteEncoder<OutboundMessage> {
@Override @Override
protected void encode( protected void encode(
final ChannelHandlerContext ctx, final OutboundMessage msg, final ByteBuf out) { final ChannelHandlerContext ctx, final OutboundMessage msg, final ByteBuf out) {
out.writeBytes(framer.frame(multiplexer.multiplex(msg.getCapability(), msg.getData()))); framer.frame(multiplexer.multiplex(msg.getCapability(), msg.getData()), out);
} }
} }

@ -288,9 +288,9 @@ public class Framer {
* MACs. * MACs.
* *
* @param message The message to frame. * @param message The message to frame.
* @return The framed message, as byte buffer. * @param output The {@link ByteBuf} to write framed data to.
*/ */
public synchronized ByteBuf frame(final MessageData message) { public synchronized void frame(final MessageData message, final ByteBuf output) {
Preconditions.checkArgument( Preconditions.checkArgument(
message.getSize() < LENGTH_MAX_MESSAGE_FRAME, "Message size in excess of maximum length."); message.getSize() < LENGTH_MAX_MESSAGE_FRAME, "Message size in excess of maximum length.");
// Compress message // Compress message
@ -306,24 +306,22 @@ public class Framer {
// Construct new, compressed message // Construct new, compressed message
final ByteBuf compressedBuf = NetworkMemoryPool.allocate(compressed.length); final ByteBuf compressedBuf = NetworkMemoryPool.allocate(compressed.length);
compressedBuf.writeBytes(compressed); compressedBuf.writeBytes(compressed);
return frameAndReleaseMessage(new RawMessage(message.getCode(), compressedBuf)); frameAndReleaseMessage(new RawMessage(message.getCode(), compressedBuf), output);
} finally { } finally {
// We have to release the original message because frameAndRelease only released the // We have to release the original message because frameAndRelease only released the
// compressed copy. // compressed copy.
message.release(); message.release();
} }
} else { } else {
return frameAndReleaseMessage(message); frameAndReleaseMessage(message, output);
} }
} }
@VisibleForTesting @VisibleForTesting
ByteBuf frameAndReleaseMessage(final MessageData message) { void frameAndReleaseMessage(final MessageData message, final ByteBuf buf) {
try { try {
final int frameSize = message.getSize() + LENGTH_MESSAGE_ID; final int frameSize = message.getSize() + LENGTH_MESSAGE_ID;
final int pad = padding16(frameSize); final int pad = padding16(frameSize);
final int bufSize = LENGTH_FULL_HEADER + (frameSize + pad + LENGTH_MAC);
final ByteBuf buf = NetworkMemoryPool.allocate(bufSize);
final byte id = (byte) message.getCode(); final byte id = (byte) message.getCode();
@ -343,9 +341,6 @@ public class Framer {
hMac = Arrays.copyOf(hMac, LENGTH_MAC); hMac = Arrays.copyOf(hMac, LENGTH_MAC);
buf.writeBytes(h).writeBytes(hMac); buf.writeBytes(h).writeBytes(hMac);
// Sanity check.
assert buf.writerIndex() == LENGTH_FULL_HEADER;
// Encrypt payload. // Encrypt payload.
final byte[] f = new byte[frameSize + pad]; final byte[] f = new byte[frameSize + pad];
@ -367,11 +362,6 @@ public class Framer {
fMac = Arrays.copyOf(secrets.updateEgress(xor(fMac, fMacSeed)).getEgressMac(), LENGTH_MAC); fMac = Arrays.copyOf(secrets.updateEgress(xor(fMac, fMacSeed)).getEgressMac(), LENGTH_MAC);
buf.writeBytes(f).writeBytes(fMac); buf.writeBytes(f).writeBytes(fMac);
// Sanity check: all expected bytes are written.
assert buf.writerIndex() == LENGTH_FULL_HEADER + (frameSize + pad + LENGTH_MAC);
return buf;
} finally { } finally {
message.release(); message.release();
} }

@ -59,7 +59,7 @@ public class FramerTest {
final Framer framer = new Framer(secrets); final Framer framer = new Framer(secrets);
assertThatExceptionOfType(IllegalArgumentException.class) assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> framer.frame(ethMessage)) .isThrownBy(() -> framer.frame(ethMessage, Unpooled.buffer()))
.withMessageContaining("Message size in excess of maximum length."); .withMessageContaining("Message size in excess of maximum length.");
} }
@ -80,7 +80,8 @@ public class FramerTest {
final ByteBuf buf = wrappedBuffer(byteArray); final ByteBuf buf = wrappedBuffer(byteArray);
final MessageData ethMessage = new RawMessage(0x00, buf); final MessageData ethMessage = new RawMessage(0x00, buf);
final ByteBuf framedMessage = framer.frameAndReleaseMessage(ethMessage); final ByteBuf framedMessage = Unpooled.buffer();
framer.frameAndReleaseMessage(ethMessage, framedMessage);
final HandshakeSecrets deframeSecrets = secretsFrom(td, true); final HandshakeSecrets deframeSecrets = secretsFrom(td, true);
final Framer deframer = new Framer(deframeSecrets); final Framer deframer = new Framer(deframeSecrets);
@ -186,7 +187,8 @@ public class FramerTest {
framer = new Framer(secrets); framer = new Framer(secrets);
for (int i = 0; i < decrypted.size(); i++) { for (int i = 0; i < decrypted.size(); i++) {
final ByteBuf b = framer.frame(decrypted.get(i)); final ByteBuf b = Unpooled.buffer();
framer.frame(decrypted.get(i), b);
final byte[] enc = new byte[b.readableBytes()]; final byte[] enc = new byte[b.readableBytes()];
b.readBytes(enc); b.readBytes(enc);
final byte[] expected = decodeHexDump(messages.get(i).get("data").asText()); final byte[] expected = decodeHexDump(messages.get(i).get("data").asText());

Loading…
Cancel
Save