From f869b740f9f114eca0f337ce59608facc62e8a90 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Mon, 19 Nov 2018 09:35:39 +1000 Subject: [PATCH] Remove reference counting from MessageData (#264) * Remove release/retain requirement from MessageData. MessageData now wraps a BytesValue rather than a Netty ByteBuf so we no longer need to call release/retain and pass that through to the underlying ByteBuf. Entirely removed support for a BytesValue wrapping a Netty ByteBuf as it does not provide any way to release the underlying ByteBuf. * Add BytesValue.copyTo(byte[]) to avoid the need to call getArrayUnsafe() to copy the data. --- .../ibft/ibftmessage/AbstractIbftMessage.java | 19 +-- .../ibftmessage/IbftPrePrepareMessage.java | 15 +- .../ibft/ibftmessage/IbftPrepareMessage.java | 15 +- .../ibftmessage/IbftRoundChangeMessage.java | 15 +- .../IbftSignedMessageData.java | 8 + .../ibft/network/IbftNetworkPeersTest.java | 8 +- .../eth/manager/EthProtocolManager.java | 7 - .../ethereum/eth/manager/EthServer.java | 144 ++++++++---------- .../ethereum/eth/manager/RequestManager.java | 4 - .../eth/messages/BlockBodiesMessage.java | 18 +-- .../eth/messages/BlockHeadersMessage.java | 18 +-- .../eth/messages/GetBlockBodiesMessage.java | 18 +-- .../eth/messages/GetBlockHeadersMessage.java | 29 ++-- .../eth/messages/GetNodeDataMessage.java | 18 +-- .../eth/messages/GetReceiptsMessage.java | 18 +-- .../eth/messages/NewBlockHashesMessage.java | 17 +-- .../eth/messages/NewBlockMessage.java | 16 +- .../eth/messages/NodeDataMessage.java | 18 +-- .../eth/messages/ReceiptsMessage.java | 18 +-- .../ethereum/eth/messages/StatusMessage.java | 16 +- .../eth/messages/TransactionsMessage.java | 20 +-- .../eth/sync/BlockPropagationManager.java | 4 - .../tasks/AbstractGetHeadersFromPeerTask.java | 58 ++++--- .../eth/sync/tasks/GetBodiesFromPeerTask.java | 40 +++-- .../TransactionsMessageProcessor.java | 2 - .../eth/manager/EthProtocolManagerTest.java | 16 -- .../eth/manager/MockPeerConnection.java | 1 - .../eth/manager/RequestManagerTest.java | 4 +- .../eth/manager/RespondingEthPeer.java | 76 +++------ .../eth/messages/BlockBodiesMessageTest.java | 26 +--- .../eth/messages/BlockHeadersMessageTest.java | 22 +-- .../messages/GetBlockBodiesMessageTest.java | 18 +-- .../messages/GetBlockHeadersMessageTest.java | 41 ++--- .../eth/messages/GetNodeDataMessageTest.java | 23 +-- .../eth/messages/GetReceiptsMessageTest.java | 23 +-- .../messages/NewBlockHashesMessageTest.java | 18 +-- .../eth/messages/NewBlockMessageTest.java | 10 +- .../eth/messages/NodeDataMessageTest.java | 23 +-- .../eth/messages/ReceiptsMessageTest.java | 23 +-- .../eth/messages/StatusMessageTest.java | 6 +- .../eth/messages/TransactionsMessageTest.java | 23 +-- .../ethereum/p2p/testing/MockNetworkTest.java | 11 +- .../ethereum/p2p/NetworkMemoryPool.java | 26 ---- .../pantheon/ethereum/p2p/NetworkRunner.java | 30 ++-- .../ethereum/p2p/api/MessageData.java | 16 +- .../ethereum/p2p/netty/ApiHandler.java | 3 - .../p2p/netty/CapabilityMultiplexer.java | 16 +- .../pantheon/ethereum/p2p/netty/DeFramer.java | 10 +- .../p2p/netty/NettyPeerConnection.java | 2 - .../ethereum/p2p/rlpx/framing/Framer.java | 121 ++++++--------- .../rlpx/handshake/ecies/ECIESHandshaker.java | 4 +- .../ethereum/p2p/utils/ByteBufUtils.java | 51 ------- .../p2p/wire/AbstractMessageData.java | 25 +-- .../pantheon/ethereum/p2p/wire/PeerInfo.java | 14 -- .../ethereum/p2p/wire/RawMessage.java | 4 +- .../p2p/wire/messages/DisconnectMessage.java | 20 +-- .../p2p/wire/messages/EmptyMessage.java | 13 +- .../p2p/wire/messages/HelloMessage.java | 22 +-- .../p2p/netty/CapabilityMultiplexerTest.java | 12 +- .../ethereum/p2p/rlpx/framing/FramerTest.java | 9 +- .../p2p/wire/WireMessagesSedesTest.java | 12 +- .../pantheon/util/bytes/BytesValue.java | 36 +---- .../bytes/BytesValueImplementationsTest.java | 89 ++++------- 63 files changed, 429 insertions(+), 1033 deletions(-) delete mode 100644 ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkMemoryPool.java delete mode 100644 ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/utils/ByteBufUtils.java diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java index 7095862695..1a0cca471e 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java @@ -13,28 +13,13 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessage; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; -import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; - -import io.netty.buffer.ByteBuf; +import tech.pegasys.pantheon.util.bytes.BytesValue; public abstract class AbstractIbftMessage extends AbstractMessageData { - protected AbstractIbftMessage(final ByteBuf data) { + protected AbstractIbftMessage(final BytesValue data) { super(data); } public abstract IbftSignedMessageData decode(); - - protected static ByteBuf writeMessageToByteBuf( - final IbftSignedMessageData ibftSignedMessageData) { - - BytesValueRLPOutput rlpEncode = new BytesValueRLPOutput(); - ibftSignedMessageData.writeTo(rlpEncode); - - final ByteBuf data = NetworkMemoryPool.allocate(rlpEncode.encodedSize()); - data.writeBytes(rlpEncode.encoded().extractArray()); - - return data; - } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrePrepareMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrePrepareMessage.java index 70f8fbad3d..18fe8d141f 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrePrepareMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrePrepareMessage.java @@ -14,24 +14,20 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessage; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedPrePrepareMessageData; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -import io.netty.buffer.ByteBuf; - public class IbftPrePrepareMessage extends AbstractIbftMessage { private static final int MESSAGE_CODE = IbftV2.PRE_PREPARE; - private IbftPrePrepareMessage(final ByteBuf data) { + private IbftPrePrepareMessage(final BytesValue data) { super(data); } public static IbftPrePrepareMessage fromMessage(final MessageData message) { if (message instanceof IbftPrePrepareMessage) { - message.retain(); return (IbftPrePrepareMessage) message; } final int code = message.getCode(); @@ -40,21 +36,18 @@ public class IbftPrePrepareMessage extends AbstractIbftMessage { String.format("Message has code %d and thus is not a PrePrepareMessage", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new IbftPrePrepareMessage(data); + return new IbftPrePrepareMessage(message.getData()); } @Override public IbftSignedMessageData decode() { - return IbftSignedMessageData.readIbftSignedPrePrepareMessageDataFrom( - RLP.input(BytesValue.wrapBuffer(data))); + return IbftSignedMessageData.readIbftSignedPrePrepareMessageDataFrom(RLP.input(data)); } public static IbftPrePrepareMessage create( final IbftSignedMessageData ibftPrepareMessageDecoded) { - return new IbftPrePrepareMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded)); + return new IbftPrePrepareMessage(ibftPrepareMessageDecoded.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrepareMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrepareMessage.java index b1d77b3556..4c44aff9b0 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrepareMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrepareMessage.java @@ -14,24 +14,20 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessage; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedPrepareMessageData; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -import io.netty.buffer.ByteBuf; - public class IbftPrepareMessage extends AbstractIbftMessage { private static final int MESSAGE_CODE = IbftV2.PREPARE; - private IbftPrepareMessage(final ByteBuf data) { + private IbftPrepareMessage(final BytesValue data) { super(data); } public static IbftPrepareMessage fromMessage(final MessageData message) { if (message instanceof IbftPrepareMessage) { - message.retain(); return (IbftPrepareMessage) message; } final int code = message.getCode(); @@ -40,21 +36,18 @@ public class IbftPrepareMessage extends AbstractIbftMessage { String.format("Message has code %d and thus is not a PrepareMessage", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new IbftPrepareMessage(data); + return new IbftPrepareMessage(message.getData()); } @Override public IbftSignedMessageData decode() { - return IbftSignedMessageData.readIbftSignedPrepareMessageDataFrom( - RLP.input(BytesValue.wrapBuffer(data))); + return IbftSignedMessageData.readIbftSignedPrepareMessageDataFrom(RLP.input(data)); } public static IbftPrepareMessage create( final IbftSignedMessageData ibftPrepareMessageDecoded) { - return new IbftPrepareMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded)); + return new IbftPrepareMessage(ibftPrepareMessageDecoded.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftRoundChangeMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftRoundChangeMessage.java index bfdd00e495..cd6375babe 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftRoundChangeMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftRoundChangeMessage.java @@ -14,24 +14,20 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessage; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedRoundChangeMessageData; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -import io.netty.buffer.ByteBuf; - public class IbftRoundChangeMessage extends AbstractIbftMessage { private static final int MESSAGE_CODE = IbftV2.ROUND_CHANGE; - private IbftRoundChangeMessage(final ByteBuf data) { + private IbftRoundChangeMessage(final BytesValue data) { super(data); } public static IbftRoundChangeMessage fromMessage(final MessageData message) { if (message instanceof IbftRoundChangeMessage) { - message.retain(); return (IbftRoundChangeMessage) message; } final int code = message.getCode(); @@ -40,21 +36,18 @@ public class IbftRoundChangeMessage extends AbstractIbftMessage { String.format("Message has code %d and thus is not a RoundChangeMessage", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new IbftRoundChangeMessage(data); + return new IbftRoundChangeMessage(message.getData()); } @Override public IbftSignedMessageData decode() { - return IbftSignedMessageData.readIbftSignedRoundChangeMessageDataFrom( - RLP.input(BytesValue.wrapBuffer(data))); + return IbftSignedMessageData.readIbftSignedRoundChangeMessageDataFrom(RLP.input(data)); } public static IbftRoundChangeMessage create( final IbftSignedMessageData ibftPrepareMessageDecoded) { - return new IbftRoundChangeMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded)); + return new IbftRoundChangeMessage(ibftPrepareMessageDecoded.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessagedata/IbftSignedMessageData.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessagedata/IbftSignedMessageData.java index 1cfec1e0b6..0f26204f2c 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessagedata/IbftSignedMessageData.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessagedata/IbftSignedMessageData.java @@ -15,8 +15,10 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessagedata; import tech.pegasys.pantheon.crypto.SECP256K1.Signature; import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.core.Util; +import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.ethereum.rlp.RLPInput; import tech.pegasys.pantheon.ethereum.rlp.RLPOutput; +import tech.pegasys.pantheon.util.bytes.BytesValue; public class IbftSignedMessageData { @@ -51,6 +53,12 @@ public class IbftSignedMessageData { output.endList(); } + public BytesValue encode() { + final BytesValueRLPOutput rlpEncode = new BytesValueRLPOutput(); + writeTo(rlpEncode); + return rlpEncode.encoded(); + } + public static IbftSignedMessageData readIbftSignedPrePrepareMessageDataFrom(final RLPInput rlpInput) { diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java index 1a90a07038..5297447e7b 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java @@ -12,7 +12,6 @@ */ package tech.pegasys.pantheon.consensus.ibft.network; -import static io.netty.buffer.Unpooled.EMPTY_BUFFER; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -29,6 +28,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.math.BigInteger; import java.util.List; @@ -74,7 +74,7 @@ public class IbftNetworkPeersTest { peers.peerAdded(peer); } - final MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER); + final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY); peers.multicastToValidators(messageToSend); verify(peerConnections.get(0), times(1)).sendForProtocol("IBF", messageToSend); @@ -93,9 +93,9 @@ public class IbftNetworkPeersTest { final IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider); // only add peer connections 1, 2 & 3, none of which should be invoked. - Lists.newArrayList(1, 2, 3).stream().forEach(i -> peers.peerAdded(peerConnections.get(i))); + Lists.newArrayList(1, 2, 3).forEach(i -> peers.peerAdded(peerConnections.get(i))); - final MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER); + final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY); peers.multicastToValidators(messageToSend); verify(peerConnections.get(0), never()).sendForProtocol(any(), any()); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java index 3509527039..a8a0a13b79 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java @@ -244,8 +244,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { // Parsing errors can happen when clients broadcast network ids outside of the int range, // So just disconnect with "subprotocol" error rather than "breach of protocol". peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); - } finally { - status.release(); } } @@ -266,15 +264,10 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { .forEach( peer -> { try { - // Send(msg) will release the NewBlockMessage's internal buffer, thus it must be - // retained - // prior to transmission - then released on exit from function. - newBlockMessage.retain(); peer.send(newBlockMessage); } catch (final PeerNotConnected ex) { // Peers may disconnect while traversing the list, this is a normal occurrence. } }); - newBlockMessage.release(); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java index ebc2dc8619..7125c01139 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java @@ -118,112 +118,96 @@ class EthServer { static MessageData constructGetHeadersResponse( final Blockchain blockchain, final MessageData message, final int requestLimit) { final GetBlockHeadersMessage getHeaders = GetBlockHeadersMessage.readFrom(message); - try { - final Optional hash = getHeaders.hash(); - final int skip = getHeaders.skip(); - final int maxHeaders = Math.min(requestLimit, getHeaders.maxHeaders()); - final boolean reversed = getHeaders.reverse(); - final BlockHeader firstHeader; - if (hash.isPresent()) { - final Hash startHash = hash.get(); - firstHeader = blockchain.getBlockHeader(startHash).orElse(null); - } else { - final long firstNumber = getHeaders.blockNumber().getAsLong(); - firstHeader = blockchain.getBlockHeader(firstNumber).orElse(null); - } - final Collection resp; - if (firstHeader == null) { - resp = Collections.emptyList(); - } else { - resp = Lists.newArrayList(firstHeader); - final long numberDelta = reversed ? -(skip + 1) : (skip + 1); - for (int i = 1; i < maxHeaders; i++) { - final long blockNumber = firstHeader.getNumber() + i * numberDelta; - if (blockNumber < BlockHeader.GENESIS_BLOCK_NUMBER) { - break; - } - final Optional maybeHeader = blockchain.getBlockHeader(blockNumber); - if (maybeHeader.isPresent()) { - resp.add(maybeHeader.get()); - } else { - break; - } + final Optional hash = getHeaders.hash(); + final int skip = getHeaders.skip(); + final int maxHeaders = Math.min(requestLimit, getHeaders.maxHeaders()); + final boolean reversed = getHeaders.reverse(); + final BlockHeader firstHeader; + if (hash.isPresent()) { + final Hash startHash = hash.get(); + firstHeader = blockchain.getBlockHeader(startHash).orElse(null); + } else { + final long firstNumber = getHeaders.blockNumber().getAsLong(); + firstHeader = blockchain.getBlockHeader(firstNumber).orElse(null); + } + final Collection resp; + if (firstHeader == null) { + resp = Collections.emptyList(); + } else { + resp = Lists.newArrayList(firstHeader); + final long numberDelta = reversed ? -(skip + 1) : (skip + 1); + for (int i = 1; i < maxHeaders; i++) { + final long blockNumber = firstHeader.getNumber() + i * numberDelta; + if (blockNumber < BlockHeader.GENESIS_BLOCK_NUMBER) { + break; + } + final Optional maybeHeader = blockchain.getBlockHeader(blockNumber); + if (maybeHeader.isPresent()) { + resp.add(maybeHeader.get()); + } else { + break; } } - return BlockHeadersMessage.create(resp); - } finally { - getHeaders.release(); } + return BlockHeadersMessage.create(resp); } static MessageData constructGetBodiesResponse( final Blockchain blockchain, final MessageData message, final int requestLimit) { final GetBlockBodiesMessage getBlockBodiesMessage = GetBlockBodiesMessage.readFrom(message); - try { - final Iterable hashes = getBlockBodiesMessage.hashes(); + final Iterable hashes = getBlockBodiesMessage.hashes(); - final Collection bodies = new ArrayList<>(); - int count = 0; - for (final Hash hash : hashes) { - if (count >= requestLimit) { - break; - } - count++; - final Optional maybeBody = blockchain.getBlockBody(hash); - if (!maybeBody.isPresent()) { - continue; - } - bodies.add(maybeBody.get()); + final Collection bodies = new ArrayList<>(); + int count = 0; + for (final Hash hash : hashes) { + if (count >= requestLimit) { + break; } - return BlockBodiesMessage.create(bodies); - } finally { - getBlockBodiesMessage.release(); + count++; + final Optional maybeBody = blockchain.getBlockBody(hash); + if (!maybeBody.isPresent()) { + continue; + } + bodies.add(maybeBody.get()); } + return BlockBodiesMessage.create(bodies); } static MessageData constructGetReceiptsResponse( final Blockchain blockchain, final MessageData message, final int requestLimit) { final GetReceiptsMessage getReceipts = GetReceiptsMessage.readFrom(message); - try { - final Iterable hashes = getReceipts.hashes(); + final Iterable hashes = getReceipts.hashes(); - final List> receipts = new ArrayList<>(); - int count = 0; - for (final Hash hash : hashes) { - if (count >= requestLimit) { - break; - } - count++; - final Optional> maybeReceipts = blockchain.getTxReceipts(hash); - if (!maybeReceipts.isPresent()) { - continue; - } - receipts.add(maybeReceipts.get()); + final List> receipts = new ArrayList<>(); + int count = 0; + for (final Hash hash : hashes) { + if (count >= requestLimit) { + break; } - return ReceiptsMessage.create(receipts); - } finally { - getReceipts.release(); + count++; + final Optional> maybeReceipts = blockchain.getTxReceipts(hash); + if (!maybeReceipts.isPresent()) { + continue; + } + receipts.add(maybeReceipts.get()); } + return ReceiptsMessage.create(receipts); } static MessageData constructGetNodeDataResponse( final MessageData message, final int requestLimit) { final GetNodeDataMessage getNodeDataMessage = GetNodeDataMessage.readFrom(message); - try { - final Iterable hashes = getNodeDataMessage.hashes(); + final Iterable hashes = getNodeDataMessage.hashes(); - final List nodeData = new ArrayList<>(); - int count = 0; - for (final Hash hash : hashes) { - if (count >= requestLimit) { - break; - } - count++; - // TODO: Lookup node data and add it to the list + final List nodeData = new ArrayList<>(); + int count = 0; + for (final Hash hash : hashes) { + if (count >= requestLimit) { + break; } - return NodeDataMessage.create(nodeData); - } finally { - getNodeDataMessage.release(); + count++; + // TODO: Lookup node data and add it to the list } + return NodeDataMessage.create(nodeData); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java index 03b3799135..aa626db960 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java @@ -156,7 +156,6 @@ public class RequestManager { if (closed) { return; } - message.retain(); bufferedResponses.add(new Response(false, message)); dispatchBufferedResponses(); } @@ -168,9 +167,6 @@ public class RequestManager { Response response = bufferedResponses.poll(); while (response != null) { responseCallback.exec(response.closed, response.message, peer); - if (response.message != null) { - response.message.release(); - } response = bufferedResponses.poll(); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java index 2438969a61..afac3193d9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java @@ -16,20 +16,16 @@ import tech.pegasys.pantheon.ethereum.core.BlockBody; import tech.pegasys.pantheon.ethereum.core.BlockHashFunction; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.util.bytes.BytesValue; -import io.netty.buffer.ByteBuf; - public final class BlockBodiesMessage extends AbstractMessageData { public static BlockBodiesMessage readFrom(final MessageData message) { if (message instanceof BlockBodiesMessage) { - message.retain(); return (BlockBodiesMessage) message; } final int code = message.getCode(); @@ -37,9 +33,7 @@ public final class BlockBodiesMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a BlockBodiesMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new BlockBodiesMessage(data); + return new BlockBodiesMessage(message.getData()); } public static BlockBodiesMessage create(final Iterable bodies) { @@ -47,12 +41,10 @@ public final class BlockBodiesMessage extends AbstractMessageData { tmp.startList(); bodies.forEach(body -> body.writeTo(tmp)); tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new BlockBodiesMessage(data); + return new BlockBodiesMessage(tmp.encoded()); } - private BlockBodiesMessage(final ByteBuf data) { + private BlockBodiesMessage(final BytesValue data) { super(data); } @@ -64,9 +56,7 @@ public final class BlockBodiesMessage extends AbstractMessageData { public Iterable bodies(final ProtocolSchedule protocolSchedule) { final BlockHashFunction blockHashFunction = ScheduleBasedBlockHashFunction.create(protocolSchedule); - final byte[] tmp = new byte[data.readableBytes()]; - data.getBytes(0, tmp); - return new BytesValueRLPInput(BytesValue.wrap(tmp), false) + return new BytesValueRLPInput(data, false) .readList(rlp -> BlockBody.readFrom(rlp, blockHashFunction)); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java index 0d247bdc28..2d3bd941f7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java @@ -16,7 +16,6 @@ import tech.pegasys.pantheon.ethereum.core.BlockHashFunction; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -25,13 +24,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Iterator; -import io.netty.buffer.ByteBuf; - public final class BlockHeadersMessage extends AbstractMessageData { public static BlockHeadersMessage readFrom(final MessageData message) { if (message instanceof BlockHeadersMessage) { - message.retain(); return (BlockHeadersMessage) message; } final int code = message.getCode(); @@ -39,9 +35,7 @@ public final class BlockHeadersMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a BlockHeadersMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new BlockHeadersMessage(data); + return new BlockHeadersMessage(message.getData()); } public static BlockHeadersMessage create(final Iterable headers) { @@ -51,12 +45,10 @@ public final class BlockHeadersMessage extends AbstractMessageData { header.writeTo(tmp); } tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new BlockHeadersMessage(data); + return new BlockHeadersMessage(tmp.encoded()); } - private BlockHeadersMessage(final ByteBuf data) { + private BlockHeadersMessage(final BytesValue data) { super(data); } @@ -68,9 +60,7 @@ public final class BlockHeadersMessage extends AbstractMessageData { public Iterator getHeaders(final ProtocolSchedule protocolSchedule) { final BlockHashFunction blockHashFunction = ScheduleBasedBlockHashFunction.create(protocolSchedule); - final byte[] headers = new byte[data.readableBytes()]; - data.getBytes(0, headers); - return new BytesValueRLPInput(BytesValue.wrap(headers), false) + return new BytesValueRLPInput(data, false) .readList(rlp -> BlockHeader.readFrom(rlp, blockHashFunction)) .iterator(); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessage.java index 036b019280..f6254876a0 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessage.java @@ -13,7 +13,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.Collection; -import io.netty.buffer.ByteBuf; - public final class GetBlockBodiesMessage extends AbstractMessageData { public static GetBlockBodiesMessage readFrom(final MessageData message) { if (message instanceof GetBlockBodiesMessage) { - message.retain(); return (GetBlockBodiesMessage) message; } final int code = message.getCode(); @@ -38,9 +34,7 @@ public final class GetBlockBodiesMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a GetBlockBodiesMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new GetBlockBodiesMessage(data); + return new GetBlockBodiesMessage(message.getData()); } public static GetBlockBodiesMessage create(final Iterable hashes) { @@ -48,12 +42,10 @@ public final class GetBlockBodiesMessage extends AbstractMessageData { tmp.startList(); hashes.forEach(tmp::writeBytesValue); tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new GetBlockBodiesMessage(data); + return new GetBlockBodiesMessage(tmp.encoded()); } - private GetBlockBodiesMessage(final ByteBuf data) { + private GetBlockBodiesMessage(final BytesValue data) { super(data); } @@ -63,9 +55,7 @@ public final class GetBlockBodiesMessage extends AbstractMessageData { } public Iterable hashes() { - final byte[] tmp = new byte[data.readableBytes()]; - data.getBytes(0, tmp); - final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false); + final RLPInput input = new BytesValueRLPInput(data, false); input.enterList(); final Collection hashes = new ArrayList<>(); while (!input.isEndOfCurrentList()) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessage.java index 1dab09b595..919157c39f 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessage.java @@ -15,19 +15,17 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import static com.google.common.base.Preconditions.checkArgument; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; +import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.rlp.RLPInput; import tech.pegasys.pantheon.ethereum.rlp.RLPOutput; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Optional; import java.util.OptionalLong; -import io.netty.buffer.ByteBuf; - /** PV62 GetBlockHeaders Message. */ public final class GetBlockHeadersMessage extends AbstractMessageData { @@ -35,7 +33,6 @@ public final class GetBlockHeadersMessage extends AbstractMessageData { public static GetBlockHeadersMessage readFrom(final MessageData message) { if (message instanceof GetBlockHeadersMessage) { - message.retain(); return (GetBlockHeadersMessage) message; } final int code = message.getCode(); @@ -43,9 +40,7 @@ public final class GetBlockHeadersMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a GetBlockHeadersMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new GetBlockHeadersMessage(data); + return new GetBlockHeadersMessage(message.getData()); } public static GetBlockHeadersMessage create( @@ -54,9 +49,7 @@ public final class GetBlockHeadersMessage extends AbstractMessageData { GetBlockHeadersData.create(blockNum, maxHeaders, skip, reverse); final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); getBlockHeadersData.writeTo(tmp); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new GetBlockHeadersMessage(data); + return new GetBlockHeadersMessage(tmp.encoded()); } public static GetBlockHeadersMessage create( @@ -65,12 +58,10 @@ public final class GetBlockHeadersMessage extends AbstractMessageData { GetBlockHeadersData.create(hash, maxHeaders, skip, reverse); final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); getBlockHeadersData.writeTo(tmp); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new GetBlockHeadersMessage(data); + return new GetBlockHeadersMessage(tmp.encoded()); } - private GetBlockHeadersMessage(final ByteBuf data) { + private GetBlockHeadersMessage(final BytesValue data) { super(data); } @@ -113,7 +104,7 @@ public final class GetBlockHeadersMessage extends AbstractMessageData { private GetBlockHeadersData getBlockHeadersData() { if (getBlockHeadersData == null) { - getBlockHeadersData = GetBlockHeadersData.readFrom(ByteBufUtils.toRLPInput(data)); + getBlockHeadersData = GetBlockHeadersData.readFrom(RLP.input(data)); } return getBlockHeadersData; } @@ -160,9 +151,9 @@ public final class GetBlockHeadersMessage extends AbstractMessageData { blockNumber = OptionalLong.of(input.readLongScalar()); } - int maxHeaders = input.readIntScalar(); - int skip = input.readIntScalar(); - boolean reverse = input.readIntScalar() != 0; + final int maxHeaders = input.readIntScalar(); + final int skip = input.readIntScalar(); + final boolean reverse = input.readIntScalar() != 0; input.leaveList(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessage.java index 6e76919fc0..904230d7d1 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessage.java @@ -13,7 +13,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.Collection; -import io.netty.buffer.ByteBuf; - public final class GetNodeDataMessage extends AbstractMessageData { public static GetNodeDataMessage readFrom(final MessageData message) { if (message instanceof GetNodeDataMessage) { - message.retain(); return (GetNodeDataMessage) message; } final int code = message.getCode(); @@ -38,9 +34,7 @@ public final class GetNodeDataMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a GetNodeDataMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new GetNodeDataMessage(data); + return new GetNodeDataMessage(message.getData()); } public static GetNodeDataMessage create(final Iterable hashes) { @@ -48,12 +42,10 @@ public final class GetNodeDataMessage extends AbstractMessageData { tmp.startList(); hashes.forEach(tmp::writeBytesValue); tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new GetNodeDataMessage(data); + return new GetNodeDataMessage(tmp.encoded()); } - private GetNodeDataMessage(final ByteBuf data) { + private GetNodeDataMessage(final BytesValue data) { super(data); } @@ -63,9 +55,7 @@ public final class GetNodeDataMessage extends AbstractMessageData { } public Iterable hashes() { - final byte[] tmp = new byte[data.readableBytes()]; - data.getBytes(0, tmp); - final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false); + final RLPInput input = new BytesValueRLPInput(data, false); input.enterList(); final Collection hashes = new ArrayList<>(); while (!input.isEndOfCurrentList()) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessage.java index 7a0bb69aa9..f13ae89842 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessage.java @@ -13,7 +13,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.Collection; -import io.netty.buffer.ByteBuf; - public final class GetReceiptsMessage extends AbstractMessageData { public static GetReceiptsMessage readFrom(final MessageData message) { if (message instanceof GetReceiptsMessage) { - message.retain(); return (GetReceiptsMessage) message; } final int code = message.getCode(); @@ -38,9 +34,7 @@ public final class GetReceiptsMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a GetReceipts.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new GetReceiptsMessage(data); + return new GetReceiptsMessage(message.getData()); } public static GetReceiptsMessage create(final Iterable hashes) { @@ -48,12 +42,10 @@ public final class GetReceiptsMessage extends AbstractMessageData { tmp.startList(); hashes.forEach(tmp::writeBytesValue); tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new GetReceiptsMessage(data); + return new GetReceiptsMessage(tmp.encoded()); } - private GetReceiptsMessage(final ByteBuf data) { + private GetReceiptsMessage(final BytesValue data) { super(data); } @@ -63,9 +55,7 @@ public final class GetReceiptsMessage extends AbstractMessageData { } public Iterable hashes() { - final byte[] tmp = new byte[data.readableBytes()]; - data.getBytes(0, tmp); - final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false); + final RLPInput input = new BytesValueRLPInput(data, false); input.enterList(); final Collection hashes = new ArrayList<>(); while (!input.isEndOfCurrentList()) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessage.java index 7cb3f1d889..f2b036d175 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessage.java @@ -13,7 +13,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -24,13 +23,11 @@ import java.util.Iterator; import java.util.Objects; import com.google.common.collect.Iterators; -import io.netty.buffer.ByteBuf; public final class NewBlockHashesMessage extends AbstractMessageData { public static NewBlockHashesMessage readFrom(final MessageData message) { if (message instanceof NewBlockHashesMessage) { - message.retain(); return (NewBlockHashesMessage) message; } final int code = message.getCode(); @@ -38,9 +35,7 @@ public final class NewBlockHashesMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a NewBlockHashesMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new NewBlockHashesMessage(data); + return new NewBlockHashesMessage(message.getData()); } public static NewBlockHashesMessage create( @@ -54,12 +49,10 @@ public final class NewBlockHashesMessage extends AbstractMessageData { tmp.endList(); } tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new NewBlockHashesMessage(data); + return new NewBlockHashesMessage(tmp.encoded()); } - private NewBlockHashesMessage(final ByteBuf data) { + private NewBlockHashesMessage(final BytesValue data) { super(data); } @@ -69,9 +62,7 @@ public final class NewBlockHashesMessage extends AbstractMessageData { } public Iterator getNewHashes() { - final byte[] hashes = new byte[data.readableBytes()]; - data.getBytes(0, hashes); - return new BytesValueRLPInput(BytesValue.wrap(hashes), false) + return new BytesValueRLPInput(data, false) .readList( rlpInput -> { rlpInput.enterList(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessage.java index beb4983f87..b3765f705d 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessage.java @@ -16,9 +16,7 @@ import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHashFunction; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.ethereum.rlp.RLP; @@ -27,15 +25,13 @@ import tech.pegasys.pantheon.ethereum.rlp.RLPOutput; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; -import io.netty.buffer.ByteBuf; - public class NewBlockMessage extends AbstractMessageData { private static final int MESSAGE_CODE = EthPV62.NEW_BLOCK; private NewBlockMessageData messageFields = null; - private NewBlockMessage(final ByteBuf data) { + private NewBlockMessage(final BytesValue data) { super(data); } @@ -48,13 +44,11 @@ public class NewBlockMessage extends AbstractMessageData { final NewBlockMessageData msgData = new NewBlockMessageData(block, totalDifficulty); final BytesValueRLPOutput out = new BytesValueRLPOutput(); msgData.writeTo(out); - final ByteBuf data = ByteBufUtils.fromRLPOutput(out); - return new NewBlockMessage(data); + return new NewBlockMessage(out.encoded()); } public static NewBlockMessage readFrom(final MessageData message) { if (message instanceof NewBlockMessage) { - message.retain(); return (NewBlockMessage) message; } final int code = message.getCode(); @@ -62,9 +56,7 @@ public class NewBlockMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a NewBlockMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new NewBlockMessage(data); + return new NewBlockMessage(message.getData()); } public Block block(final ProtocolSchedule protocolSchedule) { @@ -77,7 +69,7 @@ public class NewBlockMessage extends AbstractMessageData { private NewBlockMessageData messageFields(final ProtocolSchedule protocolSchedule) { if (messageFields == null) { - final RLPInput input = RLP.input(BytesValue.wrap(ByteBufUtils.toByteArray(data))); + final RLPInput input = RLP.input(data); messageFields = NewBlockMessageData.readFrom(input, protocolSchedule); } return messageFields; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessage.java index cbf4b0e43b..534a6f04a4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessage.java @@ -12,7 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.eth.messages; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -23,13 +22,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.Collection; -import io.netty.buffer.ByteBuf; - public final class NodeDataMessage extends AbstractMessageData { public static NodeDataMessage readFrom(final MessageData message) { if (message instanceof NodeDataMessage) { - message.retain(); return (NodeDataMessage) message; } final int code = message.getCode(); @@ -37,9 +33,7 @@ public final class NodeDataMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a NodeDataMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new NodeDataMessage(data); + return new NodeDataMessage(message.getData()); } public static NodeDataMessage create(final Iterable nodeData) { @@ -47,12 +41,10 @@ public final class NodeDataMessage extends AbstractMessageData { tmp.startList(); nodeData.forEach(tmp::writeBytesValue); tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new NodeDataMessage(data); + return new NodeDataMessage(tmp.encoded()); } - private NodeDataMessage(final ByteBuf data) { + private NodeDataMessage(final BytesValue data) { super(data); } @@ -62,9 +54,7 @@ public final class NodeDataMessage extends AbstractMessageData { } public Iterable nodeData() { - final byte[] tmp = new byte[data.readableBytes()]; - data.getBytes(0, tmp); - final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false); + final RLPInput input = new BytesValueRLPInput(data, false); input.enterList(); final Collection nodeData = new ArrayList<>(); while (!input.isEndOfCurrentList()) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessage.java index 5a13259169..f22753701f 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessage.java @@ -13,7 +13,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.List; -import io.netty.buffer.ByteBuf; - public final class ReceiptsMessage extends AbstractMessageData { public static ReceiptsMessage readFrom(final MessageData message) { if (message instanceof ReceiptsMessage) { - message.retain(); return (ReceiptsMessage) message; } final int code = message.getCode(); @@ -38,9 +34,7 @@ public final class ReceiptsMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a ReceiptsMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new ReceiptsMessage(data); + return new ReceiptsMessage(message.getData()); } public static ReceiptsMessage create(final List> receipts) { @@ -53,12 +47,10 @@ public final class ReceiptsMessage extends AbstractMessageData { tmp.endList(); }); tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new ReceiptsMessage(data); + return new ReceiptsMessage(tmp.encoded()); } - private ReceiptsMessage(final ByteBuf data) { + private ReceiptsMessage(final BytesValue data) { super(data); } @@ -68,9 +60,7 @@ public final class ReceiptsMessage extends AbstractMessageData { } public List> receipts() { - final byte[] tmp = new byte[data.readableBytes()]; - data.getBytes(0, tmp); - final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false); + final RLPInput input = new BytesValueRLPInput(data, false); input.enterList(); final List> receipts = new ArrayList<>(); while (input.nextIsList()) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessage.java index 62ffdd59eb..189316bbd2 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessage.java @@ -13,9 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.ethereum.rlp.RLP; @@ -25,13 +23,11 @@ import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; -import io.netty.buffer.ByteBuf; - public final class StatusMessage extends AbstractMessageData { private EthStatus status; - public StatusMessage(final ByteBuf data) { + public StatusMessage(final BytesValue data) { super(data); } @@ -45,14 +41,12 @@ public final class StatusMessage extends AbstractMessageData { new EthStatus(protocolVersion, networkId, totalDifficulty, bestHash, genesisHash); final BytesValueRLPOutput out = new BytesValueRLPOutput(); status.writeTo(out); - final ByteBuf data = ByteBufUtils.fromRLPOutput(out); - return new StatusMessage(data); + return new StatusMessage(out.encoded()); } public static StatusMessage readFrom(final MessageData message) { if (message instanceof StatusMessage) { - message.retain(); return (StatusMessage) message; } final int code = message.getCode(); @@ -60,9 +54,7 @@ public final class StatusMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a StatusMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new StatusMessage(data); + return new StatusMessage(message.getData()); } @Override @@ -99,7 +91,7 @@ public final class StatusMessage extends AbstractMessageData { private EthStatus status() { if (status == null) { - final RLPInput input = RLP.input(BytesValue.wrap(ByteBufUtils.toByteArray(data))); + final RLPInput input = RLP.input(data); status = EthStatus.readFrom(input); } return status; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessage.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessage.java index b8dc95dae2..3ae8cef6cd 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessage.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessage.java @@ -13,7 +13,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Transaction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Iterator; import java.util.function.Function; -import io.netty.buffer.ByteBuf; - public class TransactionsMessage extends AbstractMessageData { public static TransactionsMessage readFrom(final MessageData message) { if (message instanceof TransactionsMessage) { - message.retain(); return (TransactionsMessage) message; } final int code = message.getCode(); @@ -38,9 +34,7 @@ public class TransactionsMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a TransactionsMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new TransactionsMessage(data); + return new TransactionsMessage(message.getData()); } public static TransactionsMessage create(final Iterable transactions) { @@ -50,12 +44,10 @@ public class TransactionsMessage extends AbstractMessageData { transaction.writeTo(tmp); } tmp.endList(); - final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize()); - data.writeBytes(tmp.encoded().extractArray()); - return new TransactionsMessage(data); + return new TransactionsMessage(tmp.encoded()); } - private TransactionsMessage(final ByteBuf data) { + private TransactionsMessage(final BytesValue data) { super(data); } @@ -66,10 +58,6 @@ public class TransactionsMessage extends AbstractMessageData { public Iterator transactions( final Function transactionReader) { - final byte[] transactions = new byte[data.readableBytes()]; - data.getBytes(0, transactions); - return new BytesValueRLPInput(BytesValue.wrap(transactions), false) - .readList(transactionReader) - .iterator(); + return new BytesValueRLPInput(data, false).readList(transactionReader).iterator(); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index bf2205b119..b3fa3022ce 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -159,8 +159,6 @@ public class BlockPropagationManager { importOrSavePendingBlock(block); } catch (final RLPException e) { message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); - } finally { - newBlockMessage.release(); } } @@ -210,8 +208,6 @@ public class BlockPropagationManager { } } catch (final RLPException e) { message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); - } finally { - newBlockHashesMessage.release(); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java index 2a86838577..2c176cf254 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java @@ -71,42 +71,38 @@ public abstract class AbstractGetHeadersFromPeerTask } final BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(message); - try { - final Iterator headers = headersMessage.getHeaders(protocolSchedule); - if (!headers.hasNext()) { - // Message contains no data - nothing to do - return Optional.empty(); - } + final Iterator headers = headersMessage.getHeaders(protocolSchedule); + if (!headers.hasNext()) { + // Message contains no data - nothing to do + return Optional.empty(); + } + + final BlockHeader firstHeader = headers.next(); + if (!matchesFirstHeader(firstHeader)) { + // This isn't our message - nothing to do + return Optional.empty(); + } - final BlockHeader firstHeader = headers.next(); - if (!matchesFirstHeader(firstHeader)) { - // This isn't our message - nothing to do + final List headersList = new ArrayList<>(); + headersList.add(firstHeader); + long prevNumber = firstHeader.getNumber(); + + final int expectedDelta = reverse ? -(skip + 1) : (skip + 1); + while (headers.hasNext()) { + final BlockHeader header = headers.next(); + if (header.getNumber() != prevNumber + expectedDelta) { + // Skip doesn't match, this isn't our data return Optional.empty(); } - - final List headersList = new ArrayList<>(); - headersList.add(firstHeader); - long prevNumber = firstHeader.getNumber(); - - final int expectedDelta = reverse ? -(skip + 1) : (skip + 1); - while (headers.hasNext()) { - final BlockHeader header = headers.next(); - if (header.getNumber() != prevNumber + expectedDelta) { - // Skip doesn't match, this isn't our data - return Optional.empty(); - } - prevNumber = header.getNumber(); - headersList.add(header); - if (headersList.size() == count) { - break; - } + prevNumber = header.getNumber(); + headersList.add(header); + if (headersList.size() == count) { + break; } - - LOG.debug("Received {} of {} headers requested from peer.", headersList.size(), count); - return Optional.of(headersList); - } finally { - headersMessage.release(); } + + LOG.debug("Received {} of {} headers requested from peer.", headersList.size(), count); + return Optional.of(headersList); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java index 65110eadfe..fdb90093c7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java @@ -99,31 +99,27 @@ public class GetBodiesFromPeerTask extends AbstractPeerRequestTask bodies = Lists.newArrayList(bodiesMessage.bodies(protocolSchedule)); - if (bodies.size() == 0) { - // Message contains no data - nothing to do - return Optional.empty(); - } else if (bodies.size() > headers.size()) { - // Message doesn't match our request - nothing to do - return Optional.empty(); - } + final List bodies = Lists.newArrayList(bodiesMessage.bodies(protocolSchedule)); + if (bodies.size() == 0) { + // Message contains no data - nothing to do + return Optional.empty(); + } else if (bodies.size() > headers.size()) { + // Message doesn't match our request - nothing to do + return Optional.empty(); + } - final List blocks = new ArrayList<>(); - for (final BlockBody body : bodies) { - final List headers = bodyToHeaders.get(new BodyIdentifier(body)); - if (headers == null) { - // This message contains unrelated bodies - exit - return Optional.empty(); - } - headers.forEach(h -> blocks.add(new Block(h, body))); - // Clear processed headers - headers.clear(); + final List blocks = new ArrayList<>(); + for (final BlockBody body : bodies) { + final List headers = bodyToHeaders.get(new BodyIdentifier(body)); + if (headers == null) { + // This message contains unrelated bodies - exit + return Optional.empty(); } - return Optional.of(blocks); - } finally { - bodiesMessage.release(); + headers.forEach(h -> blocks.add(new Block(h, body))); + // Clear processed headers + headers.clear(); } + return Optional.of(blocks); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java index 229af27f05..0cd4f6ce3a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java @@ -53,8 +53,6 @@ class TransactionsMessageProcessor { if (peer != null) { peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL); } - } finally { - transactionsMessage.release(); } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java index 8badfb4a62..0b95ecf204 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java @@ -188,7 +188,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i); } - message.release(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -221,7 +220,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < limit; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i); } - message.release(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -251,7 +249,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i); } - message.release(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -283,7 +280,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i * (skip + 1)); } - message.release(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -316,7 +312,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < blockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i * (skip + 1)); } - message.release(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -369,7 +364,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < 2; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i); } - message.release(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -397,7 +391,6 @@ public final class EthProtocolManagerTest { final List headers = Lists.newArrayList(headersMsg.getHeaders(protocolSchedule)); assertThat(headers.size()).isEqualTo(0); - message.release(); done.complete(null); }; final PeerConnection peer = setupPeer(ethManager, onSend); @@ -438,7 +431,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < blockCount; i++) { assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i)); } - message.release(); done.complete(null); }; @@ -483,7 +475,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < limit; i++) { assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i)); } - message.release(); done.complete(null); }; @@ -520,7 +511,6 @@ public final class EthProtocolManagerTest { Lists.newArrayList(blocksMessage.bodies(protocolSchedule)); assertThat(bodies.size()).isEqualTo(1); assertThat(expectedBlock.getBody()).isEqualTo(bodies.get(0)); - message.release(); done.complete(null); }; @@ -562,7 +552,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < blockCount; i++) { assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i)); } - message.release(); done.complete(null); }; @@ -606,7 +595,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < limit; i++) { assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i)); } - message.release(); done.complete(null); }; @@ -623,7 +611,6 @@ public final class EthProtocolManagerTest { try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) { // Setup blocks query final long blockNumber = blockchain.getChainHeadBlockNumber() - 5; - final int blockCount = 2; final BlockHeader header = blockchain.getBlockHeader(blockNumber).get(); final List expectedReceipts = blockchain.getTxReceipts(header.getHash()).get(); @@ -644,7 +631,6 @@ public final class EthProtocolManagerTest { Lists.newArrayList(receiptsMessage.receipts()); assertThat(receipts.size()).isEqualTo(1); assertThat(expectedReceipts).isEqualTo(receipts.get(0)); - message.release(); done.complete(null); }; @@ -699,7 +685,6 @@ public final class EthProtocolManagerTest { for (final NewBlockMessage msg : messageSentCaptor.getAllValues()) { assertThat(msg.block(protocolSchdeule)).isEqualTo(minedBlock); assertThat(msg.totalDifficulty(protocolSchdeule)).isEqualTo(expectedTotalDifficulty); - msg.release(); } assertThat(receivingPeerCaptor.getAllValues().containsAll(peers)).isTrue(); @@ -740,7 +725,6 @@ public final class EthProtocolManagerTest { for (int i = 0; i < receivedBlockCount; i++) { assertThat(headers.get(i).getNumber()).isEqualTo(receivedBlockCount - 1 - i); } - message.release(); done.complete(null); }; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java index cb52968449..8b69f49b5c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java @@ -54,7 +54,6 @@ class MockPeerConnection implements PeerConnection { @Override public void send(final Capability capability, final MessageData message) throws PeerNotConnected { if (disconnected) { - message.release(); throw new PeerNotConnected("MockPeerConnection disconnected"); } onSend.exec(capability, message, this); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java index 4f456a3d55..934c230ee7 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java @@ -22,6 +22,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.Collections; @@ -31,7 +32,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import io.netty.buffer.Unpooled; import org.junit.Test; public class RequestManagerTest { @@ -212,7 +212,7 @@ public class RequestManagerTest { } private EthMessage mockMessage(final EthPeer peer) { - return new EthMessage(peer, new RawMessage(1, Unpooled.buffer())); + return new EthMessage(peer, new RawMessage(1, BytesValue.EMPTY)); } private EthPeer createPeer() { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java index d938ba314a..53311dff9c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java @@ -151,21 +151,12 @@ public class RespondingEthPeer { final List currentMessages = new ArrayList<>(outgoingMessages); outgoingMessages.clear(); for (final OutgoingMessage msg : currentMessages) { - try { - final Optional maybeResponse = - responder.respond(msg.capability, msg.messageData); - maybeResponse.ifPresent( - (response) -> { - try { - ethProtocolManager.processMessage( - msg.capability, new DefaultMessage(peerConnection, response)); - } finally { - response.release(); - } - }); - } finally { - msg.messageData.release(); - } + final Optional maybeResponse = + responder.respond(msg.capability, msg.messageData); + maybeResponse.ifPresent( + (response) -> + ethProtocolManager.processMessage( + msg.capability, new DefaultMessage(peerConnection, response))); } return currentMessages.size() > 0; } @@ -229,51 +220,34 @@ public class RespondingEthPeer { switch (msg.getCode()) { case EthPV62.GET_BLOCK_HEADERS: final BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(originalResponse); - try { - final List originalHeaders = - Lists.newArrayList(headersMessage.getHeaders(protocolSchedule)); - final List partialHeaders = - originalHeaders.subList(0, (int) (originalHeaders.size() * portion)); - partialResponse = BlockHeadersMessage.create(partialHeaders); - } finally { - headersMessage.release(); - } + final List originalHeaders = + Lists.newArrayList(headersMessage.getHeaders(protocolSchedule)); + final List partialHeaders = + originalHeaders.subList(0, (int) (originalHeaders.size() * portion)); + partialResponse = BlockHeadersMessage.create(partialHeaders); break; case EthPV62.GET_BLOCK_BODIES: final BlockBodiesMessage bodiesMessage = BlockBodiesMessage.readFrom(originalResponse); - try { - final List originalBodies = - Lists.newArrayList(bodiesMessage.bodies(protocolSchedule)); - final List partialBodies = - originalBodies.subList(0, (int) (originalBodies.size() * portion)); - partialResponse = BlockBodiesMessage.create(partialBodies); - } finally { - bodiesMessage.release(); - } + final List originalBodies = + Lists.newArrayList(bodiesMessage.bodies(protocolSchedule)); + final List partialBodies = + originalBodies.subList(0, (int) (originalBodies.size() * portion)); + partialResponse = BlockBodiesMessage.create(partialBodies); break; case EthPV63.GET_RECEIPTS: final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(originalResponse); - try { - final List> originalReceipts = - Lists.newArrayList(receiptsMessage.receipts()); - final List> partialReceipts = - originalReceipts.subList(0, (int) (originalReceipts.size() * portion)); - partialResponse = ReceiptsMessage.create(partialReceipts); - } finally { - receiptsMessage.release(); - } + final List> originalReceipts = + Lists.newArrayList(receiptsMessage.receipts()); + final List> partialReceipts = + originalReceipts.subList(0, (int) (originalReceipts.size() * portion)); + partialResponse = ReceiptsMessage.create(partialReceipts); break; case EthPV63.GET_NODE_DATA: final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(originalResponse); - try { - final List originalNodeData = - Lists.newArrayList(nodeDataMessage.nodeData()); - final List partialNodeData = - originalNodeData.subList(0, (int) (originalNodeData.size() * portion)); - partialResponse = NodeDataMessage.create(partialNodeData); - } finally { - nodeDataMessage.release(); - } + final List originalNodeData = Lists.newArrayList(nodeDataMessage.nodeData()); + final List partialNodeData = + originalNodeData.subList(0, (int) (originalNodeData.size() * portion)); + partialResponse = NodeDataMessage.create(partialNodeData); break; } return Optional.of(partialResponse); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessageTest.java index ee9051c645..92ba1fd26d 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessageTest.java @@ -18,7 +18,6 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.development.DevelopmentProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -33,7 +32,6 @@ import java.util.Iterator; import java.util.List; import com.google.common.io.Resources; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -61,23 +59,15 @@ public final class BlockBodiesMessageTest { rlp -> BlockHeader.readFrom(rlp, MainnetBlockHashFunction::createHash)))); } final MessageData initialMessage = BlockBodiesMessage.create(bodies); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV62.BLOCK_BODIES, rawBuffer); + final MessageData raw = new RawMessage(EthPV62.BLOCK_BODIES, initialMessage.getData()); final BlockBodiesMessage message = BlockBodiesMessage.readFrom(raw); - try { - final Iterator readBodies = - message - .bodies( - DevelopmentProtocolSchedule.create(GenesisConfigFile.DEFAULT.getConfigOptions())) - .iterator(); - for (int i = 0; i < 50; ++i) { - Assertions.assertThat(readBodies.next()).isEqualTo(bodies.get(i)); - } - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readBodies = + message + .bodies( + DevelopmentProtocolSchedule.create(GenesisConfigFile.DEFAULT.getConfigOptions())) + .iterator(); + for (int i = 0; i < 50; ++i) { + Assertions.assertThat(readBodies.next()).isEqualTo(bodies.get(i)); } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java index e23275b9d9..b445f641b8 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java @@ -16,7 +16,6 @@ import tech.pegasys.pantheon.config.GenesisConfigFile; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.development.DevelopmentProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -31,7 +30,6 @@ import java.util.Iterator; import java.util.List; import com.google.common.io.Resources; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -55,21 +53,13 @@ public final class BlockHeadersMessageTest { oneBlock.skipNext(); } final MessageData initialMessage = BlockHeadersMessage.create(headers); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV62.BLOCK_HEADERS, rawBuffer); + final MessageData raw = new RawMessage(EthPV62.BLOCK_HEADERS, initialMessage.getData()); final BlockHeadersMessage message = BlockHeadersMessage.readFrom(raw); - try { - final Iterator readHeaders = - message.getHeaders( - DevelopmentProtocolSchedule.create(GenesisConfigFile.DEFAULT.getConfigOptions())); - for (int i = 0; i < 50; ++i) { - Assertions.assertThat(readHeaders.next()).isEqualTo(headers.get(i)); - } - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readHeaders = + message.getHeaders( + DevelopmentProtocolSchedule.create(GenesisConfigFile.DEFAULT.getConfigOptions())); + for (int i = 0; i < 50; ++i) { + Assertions.assertThat(readHeaders.next()).isEqualTo(headers.get(i)); } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessageTest.java index 3fafeda4c8..ac4b5d77df 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessageTest.java @@ -15,7 +15,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -30,7 +29,6 @@ import java.util.Iterator; import java.util.List; import com.google.common.io.Resources; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -54,19 +52,11 @@ public final class GetBlockBodiesMessageTest { oneBlock.skipNext(); } final MessageData initialMessage = GetBlockBodiesMessage.create(hashes); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_BODIES, rawBuffer); + final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_BODIES, initialMessage.getData()); final GetBlockBodiesMessage message = GetBlockBodiesMessage.readFrom(raw); - try { - final Iterator readHeaders = message.hashes().iterator(); - for (int i = 0; i < 50; ++i) { - Assertions.assertThat(readHeaders.next()).isEqualTo(hashes.get(i)); - } - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readHeaders = message.hashes().iterator(); + for (int i = 0; i < 50; ++i) { + Assertions.assertThat(readHeaders.next()).isEqualTo(hashes.get(i)); } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessageTest.java index adf1e34273..95f5dba446 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessageTest.java @@ -13,14 +13,12 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Arrays; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -34,21 +32,13 @@ public final class GetBlockHeadersMessageTest { final int maxHeaders = 128; final GetBlockHeadersMessage initialMessage = GetBlockHeadersMessage.create(hash, maxHeaders, skip, reverse); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, rawBuffer); + final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, initialMessage.getData()); final GetBlockHeadersMessage message = GetBlockHeadersMessage.readFrom(raw); - try { - Assertions.assertThat(message.blockNumber()).isEmpty(); - Assertions.assertThat(message.hash().get()).isEqualTo(hash); - Assertions.assertThat(message.reverse()).isEqualTo(reverse); - Assertions.assertThat(message.skip()).isEqualTo(skip); - Assertions.assertThat(message.maxHeaders()).isEqualTo(maxHeaders); - } finally { - initialMessage.release(); - raw.release(); - message.release(); - } + Assertions.assertThat(message.blockNumber()).isEmpty(); + Assertions.assertThat(message.hash().get()).isEqualTo(hash); + Assertions.assertThat(message.reverse()).isEqualTo(reverse); + Assertions.assertThat(message.skip()).isEqualTo(skip); + Assertions.assertThat(message.maxHeaders()).isEqualTo(maxHeaders); } } @@ -60,20 +50,13 @@ public final class GetBlockHeadersMessageTest { final int maxHeaders = 128; final GetBlockHeadersMessage initialMessage = GetBlockHeadersMessage.create(blockNum, maxHeaders, skip, reverse); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, rawBuffer); + final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, initialMessage.getData()); final GetBlockHeadersMessage message = GetBlockHeadersMessage.readFrom(raw); - try { - Assertions.assertThat(initialMessage.blockNumber().getAsLong()).isEqualTo(blockNum); - Assertions.assertThat(initialMessage.hash()).isEmpty(); - Assertions.assertThat(initialMessage.reverse()).isEqualTo(reverse); - Assertions.assertThat(initialMessage.skip()).isEqualTo(skip); - Assertions.assertThat(initialMessage.maxHeaders()).isEqualTo(maxHeaders); - } finally { - initialMessage.release(); - raw.release(); - message.release(); - } + Assertions.assertThat(message.blockNumber().getAsLong()).isEqualTo(blockNum); + Assertions.assertThat(message.hash()).isEmpty(); + Assertions.assertThat(message.reverse()).isEqualTo(reverse); + Assertions.assertThat(message.skip()).isEqualTo(skip); + Assertions.assertThat(message.maxHeaders()).isEqualTo(maxHeaders); } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessageTest.java index 262ebaecdf..b4267e637e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessageTest.java @@ -13,24 +13,21 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; public final class GetNodeDataMessageTest { @Test - public void roundTripTest() throws IOException { + public void roundTripTest() { // Generate some hashes final BlockDataGenerator gen = new BlockDataGenerator(1); final List hashes = new ArrayList<>(); @@ -42,22 +39,14 @@ public final class GetNodeDataMessageTest { // Perform round-trip transformation // Create GetNodeData, copy it to a generic message, then read back into a GetNodeData message final MessageData initialMessage = GetNodeDataMessage.create(hashes); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV63.GET_NODE_DATA, rawBuffer); + final MessageData raw = new RawMessage(EthPV63.GET_NODE_DATA, initialMessage.getData()); final GetNodeDataMessage message = GetNodeDataMessage.readFrom(raw); // Read hashes back out after round trip and check they match originals. - try { - final Iterator readData = message.hashes().iterator(); - for (int i = 0; i < hashCount; ++i) { - Assertions.assertThat(readData.next()).isEqualTo(hashes.get(i)); - } - Assertions.assertThat(readData.hasNext()).isFalse(); - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readData = message.hashes().iterator(); + for (int i = 0; i < hashCount; ++i) { + Assertions.assertThat(readData.next()).isEqualTo(hashes.get(i)); } + Assertions.assertThat(readData.hasNext()).isFalse(); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessageTest.java index 89760a8317..5903140e9a 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessageTest.java @@ -13,24 +13,21 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; public final class GetReceiptsMessageTest { @Test - public void roundTripTest() throws IOException { + public void roundTripTest() { // Generate some hashes final BlockDataGenerator gen = new BlockDataGenerator(1); final List hashes = new ArrayList<>(); @@ -43,22 +40,14 @@ public final class GetReceiptsMessageTest { // Create GetReceipts message, copy it to a generic message, then read back into a GetReceipts // message final MessageData initialMessage = GetReceiptsMessage.create(hashes); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV63.GET_RECEIPTS, rawBuffer); + final MessageData raw = new RawMessage(EthPV63.GET_RECEIPTS, initialMessage.getData()); final GetReceiptsMessage message = GetReceiptsMessage.readFrom(raw); // Read hashes back out after round trip and check they match originals. - try { - final Iterator readData = message.hashes().iterator(); - for (int i = 0; i < hashCount; ++i) { - Assertions.assertThat(readData.next()).isEqualTo(hashes.get(i)); - } - Assertions.assertThat(readData.hasNext()).isFalse(); - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readData = message.hashes().iterator(); + for (int i = 0; i < hashCount; ++i) { + Assertions.assertThat(readData.next()).isEqualTo(hashes.get(i)); } + Assertions.assertThat(readData.hasNext()).isFalse(); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessageTest.java index 197353b0e6..c9904f901a 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessageTest.java @@ -14,7 +14,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput; @@ -29,7 +28,6 @@ import java.util.Iterator; import java.util.List; import com.google.common.io.Resources; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -55,19 +53,11 @@ public final class NewBlockHashesMessageTest { oneBlock.skipNext(); } final MessageData initialMessage = NewBlockHashesMessage.create(hashes); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV62.NEW_BLOCK_HASHES, rawBuffer); + final MessageData raw = new RawMessage(EthPV62.NEW_BLOCK_HASHES, initialMessage.getData()); final NewBlockHashesMessage message = NewBlockHashesMessage.readFrom(raw); - try { - final Iterator readHeaders = message.getNewHashes(); - for (int i = 0; i < 50; ++i) { - Assertions.assertThat(readHeaders.next()).isEqualTo(hashes.get(i)); - } - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readHeaders = message.getNewHashes(); + for (int i = 0; i < 50; ++i) { + Assertions.assertThat(readHeaders.next()).isEqualTo(hashes.get(i)); } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessageTest.java index 9a183e93d9..7eac4acacb 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessageTest.java @@ -18,14 +18,12 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; -import io.netty.buffer.Unpooled; import org.junit.Test; public class NewBlockMessageTest { @@ -56,10 +54,7 @@ public class NewBlockMessageTest { tmp.writeUInt256Scalar(totalDifficulty); tmp.endList(); - final BytesValue msgPayload = tmp.encoded(); - - final RawMessage rawMsg = - new RawMessage(EthPV62.NEW_BLOCK, Unpooled.wrappedBuffer(tmp.encoded().extractArray())); + final RawMessage rawMsg = new RawMessage(EthPV62.NEW_BLOCK, tmp.encoded()); final NewBlockMessage newBlockMsg = NewBlockMessage.readFrom(rawMsg); @@ -71,8 +66,7 @@ public class NewBlockMessageTest { @Test public void readFromMessageWithWrongCodeThrows() { - final ProtocolSchedule protSchedule = MainnetProtocolSchedule.create(); - final RawMessage rawMsg = new RawMessage(EthPV62.BLOCK_HEADERS, NetworkMemoryPool.allocate(1)); + final RawMessage rawMsg = new RawMessage(EthPV62.BLOCK_HEADERS, BytesValue.of(0)); assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> NewBlockMessage.readFrom(rawMsg)); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessageTest.java index a07d5c8a64..f4e1fd0b9d 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessageTest.java @@ -12,25 +12,22 @@ */ package tech.pegasys.pantheon.ethereum.eth.messages; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; import tech.pegasys.pantheon.util.bytes.BytesValue; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; public final class NodeDataMessageTest { @Test - public void roundTripTest() throws IOException { + public void roundTripTest() { // Generate some data final BlockDataGenerator gen = new BlockDataGenerator(1); final List nodeData = new ArrayList<>(); @@ -42,22 +39,14 @@ public final class NodeDataMessageTest { // Perform round-trip transformation // Create specific message, copy it to a generic message, then read back into a specific format final MessageData initialMessage = NodeDataMessage.create(nodeData); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV63.NODE_DATA, rawBuffer); + final MessageData raw = new RawMessage(EthPV63.NODE_DATA, initialMessage.getData()); final NodeDataMessage message = NodeDataMessage.readFrom(raw); // Read data back out after round trip and check they match originals. - try { - final Iterator readData = message.nodeData().iterator(); - for (int i = 0; i < nodeCount; ++i) { - Assertions.assertThat(readData.next()).isEqualTo(nodeData.get(i)); - } - Assertions.assertThat(readData.hasNext()).isFalse(); - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readData = message.nodeData().iterator(); + for (int i = 0; i < nodeCount; ++i) { + Assertions.assertThat(readData.next()).isEqualTo(nodeData.get(i)); } + Assertions.assertThat(readData.hasNext()).isFalse(); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessageTest.java index bac6a65093..9f79f9c66f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessageTest.java @@ -13,24 +13,21 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; public final class ReceiptsMessageTest { @Test - public void roundTripTest() throws IOException { + public void roundTripTest() { // Generate some data final BlockDataGenerator gen = new BlockDataGenerator(1); final List> receipts = new ArrayList<>(); @@ -47,22 +44,14 @@ public final class ReceiptsMessageTest { // Perform round-trip transformation // Create specific message, copy it to a generic message, then read back into a specific format final MessageData initialMessage = ReceiptsMessage.create(receipts); - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV63.RECEIPTS, rawBuffer); + final MessageData raw = new RawMessage(EthPV63.RECEIPTS, initialMessage.getData()); final ReceiptsMessage message = ReceiptsMessage.readFrom(raw); // Read data back out after round trip and check they match originals. - try { - final Iterator> readData = message.receipts().iterator(); - for (int i = 0; i < dataCount; ++i) { - Assertions.assertThat(readData.next()).isEqualTo(receipts.get(i)); - } - Assertions.assertThat(readData.hasNext()).isFalse(); - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator> readData = message.receipts().iterator(); + for (int i = 0; i < dataCount; ++i) { + Assertions.assertThat(readData.next()).isEqualTo(receipts.get(i)); } + Assertions.assertThat(readData.hasNext()).isFalse(); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessageTest.java index ca21611fdb..8ea8b853e8 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessageTest.java @@ -22,8 +22,6 @@ import tech.pegasys.pantheon.util.uint.UInt256; import java.util.Random; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.junit.Test; public class StatusMessageTest { @@ -56,9 +54,7 @@ public class StatusMessageTest { final MessageData msg = StatusMessage.create(version, networkId, td, bestHash, genesisHash); // Make a message copy from serialized data and check deserialized results - final ByteBuf buffer = Unpooled.buffer(msg.getSize(), msg.getSize()); - msg.writeTo(buffer); - final StatusMessage copy = new StatusMessage(buffer); + final StatusMessage copy = new StatusMessage(msg.getData()); assertThat(copy.protocolVersion()).isEqualTo(version); assertThat(copy.networkId()).isEqualTo(networkId); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessageTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessageTest.java index 057bf3b952..4b97d62188 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessageTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessageTest.java @@ -13,24 +13,21 @@ package tech.pegasys.pantheon.ethereum.eth.messages; import tech.pegasys.pantheon.ethereum.core.Transaction; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; public class TransactionsMessageTest { @Test - public void transactionRoundTrip() throws IOException { + public void transactionRoundTrip() { // Setup list of transactions final int txCount = 20; final BlockDataGenerator gen = new BlockDataGenerator(1); @@ -42,23 +39,15 @@ public class TransactionsMessageTest { // Create TransactionsMessage final MessageData initialMessage = TransactionsMessage.create(transactions); // Read message into a generic RawMessage - final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize()); - initialMessage.writeTo(rawBuffer); - final MessageData raw = new RawMessage(EthPV62.TRANSACTIONS, rawBuffer); + final MessageData raw = new RawMessage(EthPV62.TRANSACTIONS, initialMessage.getData()); // Transform back to a TransactionsMessage from RawMessage final TransactionsMessage message = TransactionsMessage.readFrom(raw); // Check that transactions match original inputs after transformations - try { - final Iterator readTransactions = message.transactions(Transaction::readFrom); - for (int i = 0; i < txCount; ++i) { - Assertions.assertThat(readTransactions.next()).isEqualTo(transactions.get(i)); - } - Assertions.assertThat(readTransactions.hasNext()).isFalse(); - } finally { - message.release(); - initialMessage.release(); - raw.release(); + final Iterator readTransactions = message.transactions(Transaction::readFrom); + for (int i = 0; i < txCount; ++i) { + Assertions.assertThat(readTransactions.next()).isEqualTo(transactions.get(i)); } + Assertions.assertThat(readTransactions.hasNext()).isFalse(); } } diff --git a/ethereum/mock-p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetworkTest.java b/ethereum/mock-p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetworkTest.java index d938f813ec..152472d7d3 100644 --- a/ethereum/mock-p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetworkTest.java +++ b/ethereum/mock-p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetworkTest.java @@ -12,7 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.p2p.testing; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; @@ -30,7 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; -import io.netty.buffer.ByteBuf; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -71,18 +69,15 @@ public final class MockNetworkTest { // Validate Message Exchange final int size = 128; - final ByteBuf dataSent = NetworkMemoryPool.allocate(size); final byte[] data = new byte[size]; ThreadLocalRandom.current().nextBytes(data); - dataSent.writeBytes(data); final int code = 0x74; final PeerConnection connection = optionalConnection.get(); - connection.send(cap, new RawMessage(code, dataSent)); + connection.send(cap, new RawMessage(code, BytesValue.wrap(data))); final Message receivedMessage = messageFuture.get(); final MessageData receivedMessageData = receivedMessage.getData(); - final ByteBuf receiveBuffer = NetworkMemoryPool.allocate(size); - receivedMessageData.writeTo(receiveBuffer); - Assertions.assertThat(receiveBuffer.compareTo(dataSent)).isEqualTo(0); + Assertions.assertThat(receivedMessageData.getData().compareTo(BytesValue.wrap(data))) + .isEqualTo(0); Assertions.assertThat(receivedMessage.getConnection().getPeer().getNodeId()) .isEqualTo(two.getId()); Assertions.assertThat(receivedMessageData.getSize()).isEqualTo(size); diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkMemoryPool.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkMemoryPool.java deleted file mode 100644 index 4974f23564..0000000000 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkMemoryPool.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2018 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.ethereum.p2p; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; - -public class NetworkMemoryPool { - - private static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(); - - public static ByteBuf allocate(final int size) { - return ALLOCATOR.ioBuffer(0, size); - } -} diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkRunner.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkRunner.java index d9e0cbb01d..63de82ccf4 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkRunner.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkRunner.java @@ -12,7 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.p2p; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; @@ -115,24 +114,19 @@ public class NetworkRunner implements AutoCloseable { network.subscribe( cap, message -> { - final MessageData data = message.getData(); - try { - final int code = message.getData().getCode(); - if (!protocol.isValidMessageCode(cap.getVersion(), code)) { - // Handle invalid messsages by disconnecting - LOG.debug( - "Invalid message code ({}-{}, {}) received from peer, disconnecting from:", - cap.getName(), - cap.getVersion(), - code, - message.getConnection()); - message.getConnection().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); - return; - } - protocolManager.processMessage(cap, message); - } finally { - data.release(); + final int code = message.getData().getCode(); + if (!protocol.isValidMessageCode(cap.getVersion(), code)) { + // Handle invalid messages by disconnecting + LOG.debug( + "Invalid message code ({}-{}, {}) received from peer, disconnecting from:", + cap.getName(), + cap.getVersion(), + code, + message.getConnection()); + message.getConnection().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); + return; } + protocolManager.processMessage(cap, message); }); } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/MessageData.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/MessageData.java index 26298c3cba..e2d2ff43d9 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/MessageData.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/MessageData.java @@ -12,7 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.p2p.api; -import io.netty.buffer.ByteBuf; +import tech.pegasys.pantheon.util.bytes.BytesValue; /** A P2P Network Message's Data. */ public interface MessageData { @@ -20,7 +20,7 @@ public interface MessageData { /** * Returns the size of the message. * - * @return Number of bytes {@link #writeTo(ByteBuf)} will write to an output buffer. + * @return Number of bytes in this data. */ int getSize(); @@ -32,15 +32,9 @@ public interface MessageData { int getCode(); /** - * Puts the message's body into the given {@link ByteBuf}. + * Get the serialized representation for this message * - * @param output ByteBuf to write the message to + * @return the serialized representation of this message */ - void writeTo(ByteBuf output); - - /** Releases the memory underlying this message. */ - void release(); - - /** Retains (increments its reference count) the memory underlying this message once. */ - void retain(); + BytesValue getData(); } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/ApiHandler.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/ApiHandler.java index 4529f99ee0..a2f31045f7 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/ApiHandler.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/ApiHandler.java @@ -90,10 +90,7 @@ final class ApiHandler extends SimpleChannelInboundHandler { "Received Wire DISCONNECT, but unable to parse reason. Peer: {}", connection.getPeer().getClientId(), e); - } finally { - disconnect.release(); } - connection.terminateConnection(reason, true); } return; diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexer.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexer.java index d0809d5118..c7f51fb44f 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexer.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexer.java @@ -17,6 +17,7 @@ import static java.util.Comparator.comparing; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.Comparator; @@ -31,7 +32,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableRangeMap; import com.google.common.collect.ImmutableRangeMap.Builder; import com.google.common.collect.Range; -import io.netty.buffer.ByteBuf; public class CapabilityMultiplexer { @@ -108,18 +108,8 @@ public class CapabilityMultiplexer { } @Override - public void writeTo(final ByteBuf output) { - originalMessage.writeTo(output); - } - - @Override - public void release() { - originalMessage.release(); - } - - @Override - public void retain() { - originalMessage.retain(); + public BytesValue getData() { + return originalMessage.getData(); } }; } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java index b4db486574..5d8a651c6d 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java @@ -71,14 +71,13 @@ final class DeFramer extends ByteToMessageDecoder { // Decode first hello and use the payload to modify pipeline final PeerInfo peerInfo; try { - peerInfo = parsePeerInfo(message); + peerInfo = HelloMessage.readFrom(message).getPeerInfo(); } catch (final RLPException e) { LOG.debug("Received invalid HELLO message", e); connectFuture.completeExceptionally(e); ctx.close(); return; } - message.release(); LOG.debug("Received HELLO message: {}", peerInfo); if (peerInfo.getVersion() >= 5) { LOG.debug("Enable compression for p2pVersion: {}", peerInfo.getVersion()); @@ -115,13 +114,6 @@ final class DeFramer extends ByteToMessageDecoder { } } - private PeerInfo parsePeerInfo(final MessageData message) { - final HelloMessage helloMessage = HelloMessage.readFrom(message); - final PeerInfo peerInfo = helloMessage.getPeerInfo(); - helloMessage.release(); - return peerInfo; - } - @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable throwable) throws Exception { diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java index 4c53cc008b..c4b4ce3509 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java @@ -69,7 +69,6 @@ final class NettyPeerConnection implements PeerConnection { @Override public void send(final Capability capability, final MessageData message) throws PeerNotConnected { if (isDisconnected()) { - message.release(); throw new PeerNotConnected("Attempt to send message to a closed peer connection"); } if (capability != null) { @@ -77,7 +76,6 @@ final class NettyPeerConnection implements PeerConnection { final SubProtocol subProtocol = multiplexer.subProtocol(capability); if (subProtocol == null || !subProtocol.isValidMessageCode(capability.getVersion(), message.getCode())) { - message.release(); throw new UnsupportedOperationException( "Attempt to send unsupported message (" + message.getCode() diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java index 0de9ec2f26..f491f347d5 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java @@ -16,10 +16,8 @@ import static io.netty.buffer.ByteBufUtil.hexDump; import static io.netty.buffer.Unpooled.wrappedBuffer; import static org.bouncycastle.pqc.math.linearalgebra.ByteUtils.xor; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.HandshakeSecrets; -import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.rlp.RlpUtils; @@ -254,7 +252,7 @@ public class Framer { final int id = idbv.isZero() || idbv.size() == 0 ? 0 : idbv.get(0); // Write message data to ByteBuf, decompressing as necessary - final ByteBuf data; + final BytesValue data; if (compressionEnabled) { // Decompress data before writing to ByteBuf final byte[] compressedMessageData = Arrays.copyOfRange(frameData, 1, frameData.length - pad); @@ -263,13 +261,11 @@ public class Framer { compressor.uncompressedLength(compressedMessageData) < LENGTH_MAX_MESSAGE_FRAME, "Message size in excess of maximum length."); final byte[] decompressedMessageData = compressor.decompress(compressedMessageData); - data = NetworkMemoryPool.allocate(decompressedMessageData.length); - data.writeBytes(decompressedMessageData); + data = BytesValue.wrap(decompressedMessageData); } else { // Move data to a ByteBuf final int messageLength = frameSize - LENGTH_MESSAGE_ID; - data = NetworkMemoryPool.allocate(messageLength); - data.writeBytes(frameData, 1, messageLength); + data = BytesValue.wrap(frameData, 1, messageLength); } return new RawMessage(id, data); @@ -295,76 +291,57 @@ public class Framer { message.getSize() < LENGTH_MAX_MESSAGE_FRAME, "Message size in excess of maximum length."); // Compress message if (compressionEnabled) { - try { - // Extract data from message - final ByteBuf tmp = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(tmp); - // Compress data - final byte[] uncompressed = ByteBufUtils.toByteArray(tmp); - final byte[] compressed = compressor.compress(uncompressed); - tmp.release(); - // Construct new, compressed message - final ByteBuf compressedBuf = NetworkMemoryPool.allocate(compressed.length); - compressedBuf.writeBytes(compressed); - frameAndReleaseMessage(new RawMessage(message.getCode(), compressedBuf), output); - } finally { - // We have to release the original message because frameAndRelease only released the - // compressed copy. - message.release(); - } + // Extract data from message + // Compress data + final byte[] compressed = compressor.compress(message.getData().getArrayUnsafe()); + // Construct new, compressed message + frameMessage(new RawMessage(message.getCode(), BytesValue.wrap(compressed)), output); } else { - frameAndReleaseMessage(message, output); + frameMessage(message, output); } } @VisibleForTesting - void frameAndReleaseMessage(final MessageData message, final ByteBuf buf) { - try { - final int frameSize = message.getSize() + LENGTH_MESSAGE_ID; - final int pad = padding16(frameSize); - - final byte id = (byte) message.getCode(); - - // Generate the header data. - final byte[] h = new byte[LENGTH_HEADER_DATA]; - h[0] = (byte) ((frameSize >> 16) & 0xff); - h[1] = (byte) ((frameSize >> 8) & 0xff); - h[2] = (byte) (frameSize & 0xff); - System.arraycopy(PROTOCOL_HEADER, 0, h, LENGTH_FRAME_SIZE, PROTOCOL_HEADER.length); - Arrays.fill(h, LENGTH_FRAME_SIZE + PROTOCOL_HEADER.length, h.length - 1, (byte) 0x00); - encryptor.processBytes(h, 0, LENGTH_HEADER_DATA, h, 0); - - // Generate the header MAC. - byte[] hMac = Arrays.copyOf(secrets.getEgressMac(), LENGTH_MAC); - macEncryptor.processBlock(hMac, 0, hMac, 0); - hMac = secrets.updateEgress(xor(h, hMac)).getEgressMac(); - hMac = Arrays.copyOf(hMac, LENGTH_MAC); - buf.writeBytes(h).writeBytes(hMac); - - // Encrypt payload. - final byte[] f = new byte[frameSize + pad]; - - final BytesValue bv = id == 0 ? RLP.NULL : RLP.encodeOne(BytesValue.of(id)); - assert bv.size() == 1; - f[0] = bv.get(0); - - // Zero-padded to 16-byte boundary. - final ByteBuf tmp = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(tmp); - tmp.getBytes(tmp.readerIndex(), f, 1, tmp.readableBytes()); - encryptor.processBytes(f, 0, f.length, f, 0); - tmp.release(); - - // Calculate the frame MAC. - final byte[] fMacSeed = Arrays.copyOf(secrets.updateEgress(f).getEgressMac(), LENGTH_MAC); - byte[] fMac = new byte[16]; - macEncryptor.processBlock(fMacSeed, 0, fMac, 0); - fMac = Arrays.copyOf(secrets.updateEgress(xor(fMac, fMacSeed)).getEgressMac(), LENGTH_MAC); - - buf.writeBytes(f).writeBytes(fMac); - } finally { - message.release(); - } + void frameMessage(final MessageData message, final ByteBuf buf) { + final int frameSize = message.getSize() + LENGTH_MESSAGE_ID; + final int pad = padding16(frameSize); + + final byte id = (byte) message.getCode(); + + // Generate the header data. + final byte[] h = new byte[LENGTH_HEADER_DATA]; + h[0] = (byte) ((frameSize >> 16) & 0xff); + h[1] = (byte) ((frameSize >> 8) & 0xff); + h[2] = (byte) (frameSize & 0xff); + System.arraycopy(PROTOCOL_HEADER, 0, h, LENGTH_FRAME_SIZE, PROTOCOL_HEADER.length); + Arrays.fill(h, LENGTH_FRAME_SIZE + PROTOCOL_HEADER.length, h.length - 1, (byte) 0x00); + encryptor.processBytes(h, 0, LENGTH_HEADER_DATA, h, 0); + + // Generate the header MAC. + byte[] hMac = Arrays.copyOf(secrets.getEgressMac(), LENGTH_MAC); + macEncryptor.processBlock(hMac, 0, hMac, 0); + hMac = secrets.updateEgress(xor(h, hMac)).getEgressMac(); + hMac = Arrays.copyOf(hMac, LENGTH_MAC); + buf.writeBytes(h).writeBytes(hMac); + + // Encrypt payload. + final byte[] f = new byte[frameSize + pad]; + + final BytesValue bv = id == 0 ? RLP.NULL : RLP.encodeOne(BytesValue.of(id)); + assert bv.size() == 1; + f[0] = bv.get(0); + + // Zero-padded to 16-byte boundary. + message.getData().copyTo(f, 0, 1); + encryptor.processBytes(f, 0, f.length, f, 0); + + // Calculate the frame MAC. + final byte[] fMacSeed = Arrays.copyOf(secrets.updateEgress(f).getEgressMac(), LENGTH_MAC); + byte[] fMac = new byte[16]; + macEncryptor.processBlock(fMacSeed, 0, fMac, 0); + fMac = Arrays.copyOf(secrets.updateEgress(xor(fMac, fMacSeed)).getEgressMac(), LENGTH_MAC); + + buf.writeBytes(f).writeBytes(fMac); } private static int padding16(final int size) { diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/handshake/ecies/ECIESHandshaker.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/handshake/ecies/ECIESHandshaker.java index 0cc0241c53..06179a3da9 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/handshake/ecies/ECIESHandshaker.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/handshake/ecies/ECIESHandshaker.java @@ -178,7 +178,9 @@ public class ECIESHandshaker implements Handshaker { buf.markReaderIndex(); final ByteBuf bufferedBytes = buf.readSlice(expectedLength); - BytesValue bytes = BytesValue.wrapBuffer(bufferedBytes); + final byte[] encryptedBytes = new byte[bufferedBytes.readableBytes()]; + bufferedBytes.getBytes(0, encryptedBytes); + BytesValue bytes = BytesValue.wrap(encryptedBytes); BytesValue encryptedMsg = bytes; try { diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/utils/ByteBufUtils.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/utils/ByteBufUtils.java deleted file mode 100644 index d03cc008c3..0000000000 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/utils/ByteBufUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2018 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.ethereum.p2p.utils; - -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; -import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; -import tech.pegasys.pantheon.ethereum.rlp.RLP; -import tech.pegasys.pantheon.ethereum.rlp.RLPInput; -import tech.pegasys.pantheon.util.bytes.BytesValue; - -import io.netty.buffer.ByteBuf; - -/** Utility methods for working with {@link ByteBuf}'s. */ -public class ByteBufUtils { - - private ByteBufUtils() {} - - public static byte[] toByteArray(final ByteBuf buffer) { - final byte[] bytes = new byte[buffer.readableBytes()]; - buffer.getBytes(buffer.readerIndex(), bytes); - return bytes; - } - - /** - * Creates an {@link RLPInput} for the data in buffer. The data is copied from - * buffer so that the {@link RLPInput} and any data read from it are safe to use even after - * buffer is released. - * - * @param buffer the data to read as RLP - * @return an {@link RLPInput} for the data in buffer - */ - public static RLPInput toRLPInput(final ByteBuf buffer) { - return RLP.input(BytesValue.wrap(toByteArray(buffer))); - } - - public static ByteBuf fromRLPOutput(final BytesValueRLPOutput out) { - final ByteBuf data = NetworkMemoryPool.allocate(out.encodedSize()); - data.writeBytes(out.encoded().extractArray()); - return data; - } -} diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java index 4e87b44c8f..80da64bced 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java @@ -13,36 +13,23 @@ package tech.pegasys.pantheon.ethereum.p2p.wire; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; - -import io.netty.buffer.ByteBuf; +import tech.pegasys.pantheon.util.bytes.BytesValue; public abstract class AbstractMessageData implements MessageData { - protected final ByteBuf data; + protected final BytesValue data; - protected AbstractMessageData(final ByteBuf data) { + protected AbstractMessageData(final BytesValue data) { this.data = data; } @Override public final int getSize() { - return data.readableBytes(); - } - - @Override - public final void writeTo(final ByteBuf output) { - data.markReaderIndex(); - output.writeBytes(data); - data.resetReaderIndex(); - } - - @Override - public final void release() { - data.release(); + return data.size(); } @Override - public final void retain() { - data.retain(); + public BytesValue getData() { + return data; } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java index ae9ab4ce4a..0fbaac1802 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java @@ -14,8 +14,6 @@ package tech.pegasys.pantheon.ethereum.p2p.wire; import static tech.pegasys.pantheon.util.bytes.BytesValue.wrap; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; -import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.ethereum.rlp.RLPInput; import tech.pegasys.pantheon.ethereum.rlp.RLPOutput; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -26,8 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.Objects; -import io.netty.buffer.ByteBuf; - /** * Encapsulates information about a peer, including their protocol version, client ID, capabilities * and other. @@ -96,16 +92,6 @@ public class PeerInfo { out.endList(); } - public ByteBuf toByteBuf() { - // TODO: we should have a RLPOutput type based on ByteBuf - final BytesValueRLPOutput out = new BytesValueRLPOutput(); - writeTo(out); - - final ByteBuf data = NetworkMemoryPool.allocate(out.encodedSize()); - data.writeBytes(out.encoded().extractArray()); - return data; - } - @Override public String toString() { final StringBuilder sb = new StringBuilder("PeerInfo{"); diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java index 6729f6bdcf..d371b7a9b1 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java @@ -12,13 +12,13 @@ */ package tech.pegasys.pantheon.ethereum.p2p.wire; -import io.netty.buffer.ByteBuf; +import tech.pegasys.pantheon.util.bytes.BytesValue; public final class RawMessage extends AbstractMessageData { private final int code; - public RawMessage(final int code, final ByteBuf data) { + public RawMessage(final int code, final BytesValue data) { super(data); this.code = code; } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/DisconnectMessage.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/DisconnectMessage.java index 4d53fc3ba5..16e9c03e0e 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/DisconnectMessage.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/DisconnectMessage.java @@ -15,22 +15,20 @@ package tech.pegasys.pantheon.ethereum.p2p.wire.messages; import static com.google.common.base.Preconditions.checkArgument; import static tech.pegasys.pantheon.util.Preconditions.checkGuard; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.WireProtocolException; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; +import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.rlp.RLPInput; import tech.pegasys.pantheon.ethereum.rlp.RLPOutput; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.stream.Stream; -import io.netty.buffer.ByteBuf; - public final class DisconnectMessage extends AbstractMessageData { - private DisconnectMessage(final ByteBuf data) { + private DisconnectMessage(final BytesValue data) { super(data); } @@ -38,14 +36,12 @@ public final class DisconnectMessage extends AbstractMessageData { final Data data = new Data(reason); final BytesValueRLPOutput out = new BytesValueRLPOutput(); data.writeTo(out); - final ByteBuf buf = ByteBufUtils.fromRLPOutput(out); - return new DisconnectMessage(buf); + return new DisconnectMessage(out.encoded()); } public static DisconnectMessage readFrom(final MessageData message) { if (message instanceof DisconnectMessage) { - message.retain(); return (DisconnectMessage) message; } final int code = message.getCode(); @@ -53,9 +49,7 @@ public final class DisconnectMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a DisconnectMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new DisconnectMessage(data); + return new DisconnectMessage(message.getData()); } @Override @@ -64,7 +58,7 @@ public final class DisconnectMessage extends AbstractMessageData { } public DisconnectReason getReason() { - return Data.readFrom(ByteBufUtils.toRLPInput(data)).getReason(); + return Data.readFrom(RLP.input(data)).getReason(); } @Override @@ -105,7 +99,7 @@ public final class DisconnectMessage extends AbstractMessageData { * @see ÐΞVp2p Wire * Protocol */ - public static enum DisconnectReason { + public enum DisconnectReason { REQUESTED((byte) 0x00), TCP_SUBSYSTEM_ERROR((byte) 0x01), BREACH_OF_PROTOCOL((byte) 0x02), diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/EmptyMessage.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/EmptyMessage.java index 0b86ae0e4c..489cfe34ca 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/EmptyMessage.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/EmptyMessage.java @@ -13,8 +13,7 @@ package tech.pegasys.pantheon.ethereum.p2p.wire.messages; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; - -import io.netty.buffer.ByteBuf; +import tech.pegasys.pantheon.util.bytes.BytesValue; /** A message without a body. */ abstract class EmptyMessage implements MessageData { @@ -25,11 +24,7 @@ abstract class EmptyMessage implements MessageData { } @Override - public final void writeTo(final ByteBuf output) {} - - @Override - public final void release() {} - - @Override - public final void retain() {} + public BytesValue getData() { + return BytesValue.EMPTY; + } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/HelloMessage.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/HelloMessage.java index b7ee563db0..92cc401ebc 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/HelloMessage.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/HelloMessage.java @@ -12,36 +12,28 @@ */ package tech.pegasys.pantheon.ethereum.p2p.wire.messages; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; - -import io.netty.buffer.ByteBuf; +import tech.pegasys.pantheon.ethereum.rlp.RLP; +import tech.pegasys.pantheon.util.bytes.BytesValue; public final class HelloMessage extends AbstractMessageData { - private HelloMessage(final ByteBuf data) { + private HelloMessage(final BytesValue data) { super(data); } public static HelloMessage create(final PeerInfo peerInfo) { final BytesValueRLPOutput out = new BytesValueRLPOutput(); peerInfo.writeTo(out); - final ByteBuf buf = ByteBufUtils.fromRLPOutput(out); - - return new HelloMessage(buf); - } - public static HelloMessage create(final ByteBuf data) { - return new HelloMessage(data); + return new HelloMessage(out.encoded()); } public static HelloMessage readFrom(final MessageData message) { if (message instanceof HelloMessage) { - message.retain(); return (HelloMessage) message; } final int code = message.getCode(); @@ -49,9 +41,7 @@ public final class HelloMessage extends AbstractMessageData { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a HelloMessage.", code)); } - final ByteBuf data = NetworkMemoryPool.allocate(message.getSize()); - message.writeTo(data); - return new HelloMessage(data); + return new HelloMessage(message.getData()); } @Override @@ -60,7 +50,7 @@ public final class HelloMessage extends AbstractMessageData { } public PeerInfo getPeerInfo() { - return PeerInfo.readFrom(ByteBufUtils.toRLPInput(data)); + return PeerInfo.readFrom(RLP.input(data)); } @Override diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexerTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexerTest.java index 51b116ae60..087ae94011 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexerTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexerTest.java @@ -14,19 +14,18 @@ package tech.pegasys.pantheon.ethereum.p2p.netty; import static org.assertj.core.api.Assertions.assertThat; -import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.netty.CapabilityMultiplexer.ProtocolMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; -import io.netty.buffer.ByteBuf; import org.junit.Test; public class CapabilityMultiplexerTest { @@ -63,8 +62,7 @@ public class CapabilityMultiplexerTest { assertThat(multiplexerB.getAgreedCapabilities()).isEqualTo(expectedCaps); // Multiplex a message and check the value - final ByteBuf ethData = NetworkMemoryPool.allocate(5); - ethData.writeBytes(new byte[] {1, 2, 3, 4, 5}); + final BytesValue ethData = BytesValue.of(1, 2, 3, 4, 5); final int ethCode = 1; final MessageData ethMessage = new RawMessage(ethCode, ethData); // Check offset @@ -74,15 +72,13 @@ public class CapabilityMultiplexerTest { assertThat(multiplexerB.multiplex(eth62, ethMessage).getCode()) .isEqualTo(ethCode + expectedOffset); // Check data is unchanged - final ByteBuf multiplexedData = NetworkMemoryPool.allocate(ethMessage.getSize()); - multiplexerA.multiplex(eth62, ethMessage).writeTo(multiplexedData); + final BytesValue multiplexedData = multiplexerA.multiplex(eth62, ethMessage).getData(); assertThat(multiplexedData).isEqualTo(ethData); // Demultiplex and check value final MessageData multiplexedEthMessage = new RawMessage(ethCode + expectedOffset, ethData); ProtocolMessage demultiplexed = multiplexerA.demultiplex(multiplexedEthMessage); - final ByteBuf demultiplexedData = NetworkMemoryPool.allocate(ethMessage.getSize()); - demultiplexed.getMessage().writeTo(demultiplexedData); + final BytesValue demultiplexedData = ethMessage.getData(); // Check returned result assertThat(demultiplexed.getMessage().getCode()).isEqualTo(ethCode); assertThat(demultiplexed.getCapability()).isEqualTo(eth62); diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java index 904972a850..350ab3ab9d 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java @@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.HandshakeSecrets; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.io.IOException; import java.util.List; @@ -52,8 +53,7 @@ public class FramerTest { final byte[] byteArray = new byte[0xFFFFFF]; new Random().nextBytes(byteArray); - final ByteBuf buf = wrappedBuffer(byteArray); - final MessageData ethMessage = new RawMessage(0x00, buf); + final MessageData ethMessage = new RawMessage(0x00, BytesValue.wrap(byteArray)); final HandshakeSecrets secrets = new HandshakeSecrets(aes, mac, mac); final Framer framer = new Framer(secrets); @@ -77,11 +77,10 @@ public class FramerTest { framer.enableCompression(); final byte[] byteArray = Snappy.compress(new byte[0x1000000]); - final ByteBuf buf = wrappedBuffer(byteArray); - final MessageData ethMessage = new RawMessage(0x00, buf); + final MessageData ethMessage = new RawMessage(0x00, BytesValue.wrap(byteArray)); final ByteBuf framedMessage = Unpooled.buffer(); - framer.frameAndReleaseMessage(ethMessage, framedMessage); + framer.frameMessage(ethMessage, framedMessage); final HandshakeSecrets deframeSecrets = secretsFrom(td, true); final Framer deframer = new Framer(deframeSecrets); diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/wire/WireMessagesSedesTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/wire/WireMessagesSedesTest.java index 21dee8530d..b76cf01eec 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/wire/WireMessagesSedesTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/wire/WireMessagesSedesTest.java @@ -15,10 +15,10 @@ package tech.pegasys.pantheon.ethereum.p2p.wire; import static io.netty.buffer.ByteBufUtil.decodeHexDump; import static org.assertj.core.api.Assertions.assertThat; +import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -import io.netty.buffer.ByteBuf; import org.junit.Test; public class WireMessagesSedesTest { @@ -45,7 +45,8 @@ public class WireMessagesSedesTest { } private static void assertSedesWorks(final byte[] data) { - final PeerInfo peerInfo = PeerInfo.readFrom(RLP.input(BytesValue.wrap(data))); + final BytesValue input = BytesValue.wrap(data); + final PeerInfo peerInfo = PeerInfo.readFrom(RLP.input(input)); assertThat(peerInfo.getClientId()).isNotBlank(); assertThat(peerInfo.getCapabilities()).isNotEmpty(); @@ -54,9 +55,8 @@ public class WireMessagesSedesTest { assertThat(peerInfo.getVersion()).isEqualTo(5); // Re-serialize and check that data matches - final ByteBuf buffer = peerInfo.toByteBuf(); - final byte[] serialized = new byte[buffer.readableBytes()]; - buffer.getBytes(buffer.readerIndex(), serialized); - assertThat(serialized).isEqualTo(data); + final BytesValueRLPOutput out = new BytesValueRLPOutput(); + peerInfo.writeTo(out); + assertThat(out.encoded()).isEqualTo(input); } } diff --git a/util/src/main/java/tech/pegasys/pantheon/util/bytes/BytesValue.java b/util/src/main/java/tech/pegasys/pantheon/util/bytes/BytesValue.java index 82285a15d4..89c37f5544 100644 --- a/util/src/main/java/tech/pegasys/pantheon/util/bytes/BytesValue.java +++ b/util/src/main/java/tech/pegasys/pantheon/util/bytes/BytesValue.java @@ -19,7 +19,6 @@ import java.nio.ByteBuffer; import java.security.MessageDigest; import com.google.common.annotations.VisibleForTesting; -import io.netty.buffer.ByteBuf; import io.vertx.core.buffer.Buffer; /** @@ -157,30 +156,6 @@ public interface BytesValue extends Comparable { return MutableBytesValue.wrapBuffer(buffer, offset, size); } - /** - * Wraps a full Netty {@link ByteBuf} as a {@link BytesValue}. - * - * @param buffer The buffer to wrap. - * @return A {@link BytesValue} that exposes the bytes of {@code buffer}. - */ - static BytesValue wrapBuffer(final ByteBuf buffer) { - return wrapBuffer(buffer, buffer.readerIndex(), buffer.readableBytes()); - } - - /** - * Wraps a slice of a Netty {@link ByteBuf} as a {@link BytesValue}. - * - * @param buffer The buffer to wrap. - * @param offset The offset in {@code buffer} from which to expose the bytes in the returned - * value. That is, {@code wrapBuffer(buffer, i, 1).get(0) == buffer.getByte(i)}. - * @param size The size of the returned value. - * @return A {@link BytesValue} that exposes the equivalent of {@code buffer.getBytes(offset, - * offset + size)} (but without copying said bytes). - */ - static BytesValue wrapBuffer(final ByteBuf buffer, final int offset, final int size) { - return MutableBytesValue.wrapBuffer(buffer, offset, size); - } - /** * Wraps a {@link ByteBuffer} as a {@link BytesValue}. * @@ -467,15 +442,8 @@ public interface BytesValue extends Comparable { } } - /** - * Appends the bytes of this value to the provided Netty {@link ByteBuf}. - * - * @param buffer The {@link ByteBuf} to which to append this value. - */ - default void appendTo(final ByteBuf buffer) { - for (int i = 0; i < size(); i++) { - buffer.writeByte(get(i)); - } + default void copyTo(final byte[] dest, final int srcPos, final int destPos) { + System.arraycopy(getArrayUnsafe(), srcPos, dest, destPos, size() - srcPos); } /** diff --git a/util/src/test/java/tech/pegasys/pantheon/util/bytes/BytesValueImplementationsTest.java b/util/src/test/java/tech/pegasys/pantheon/util/bytes/BytesValueImplementationsTest.java index bdf58d47a9..da4ef37723 100644 --- a/util/src/test/java/tech/pegasys/pantheon/util/bytes/BytesValueImplementationsTest.java +++ b/util/src/test/java/tech/pegasys/pantheon/util/bytes/BytesValueImplementationsTest.java @@ -30,8 +30,6 @@ import java.util.Collection; import java.util.function.Function; import com.google.common.io.BaseEncoding; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.vertx.core.buffer.Buffer; import org.junit.Test; import org.junit.runner.RunWith; @@ -72,12 +70,6 @@ public class BytesValueImplementationsTest { (BytesValueCreator) (b) -> BytesValue.wrapBuffer(buffer(b)), (BytesValueSliceCreator) (b, start, len) -> BytesValue.wrapBuffer(buffer(b), start, len) }, - { - "BytesValue.wrapBuffer() (Netty ByteBuf)", - (BytesValueCreator) (b) -> BytesValue.wrapBuffer(byteBuf(b)), - (BytesValueSliceCreator) - (b, start, len) -> BytesValue.wrapBuffer(byteBuf(b), start, len) - }, { "BytesValue.wrapBuffer() (nio ByteBuffer)", (BytesValueCreator) (b) -> BytesValue.wrapBuffer(byteBuffer(b)), @@ -95,12 +87,6 @@ public class BytesValueImplementationsTest { (BytesValueSliceCreator) (b, start, len) -> MutableBytesValue.wrapBuffer(buffer(b), start, len) }, - { - "MutableBytesValue.wrapBuffer() (Netty ByteBuf)", - (BytesValueCreator) (b) -> MutableBytesValue.wrapBuffer(byteBuf(b)), - (BytesValueSliceCreator) - (b, start, len) -> MutableBytesValue.wrapBuffer(byteBuf(b), start, len) - }, { "MutableBytesValue.wrapBuffer() (nio ByteBuffer)", (BytesValueCreator) (b) -> MutableBytesValue.wrapBuffer(byteBuffer(b)), @@ -123,12 +109,6 @@ public class BytesValueImplementationsTest { (BytesValueSliceCreator) (b, start, len) -> new MutableBufferWrappingBytesValue(buffer(b), start, len) }, - { - MutableByteBufWrappingBytesValue.class.getSimpleName(), - (BytesValueCreator) (b) -> new MutableByteBufWrappingBytesValue(byteBuf(b)), - (BytesValueSliceCreator) - (b, start, len) -> new MutableByteBufWrappingBytesValue(byteBuf(b), start, len), - }, { MutableByteBufferWrappingBytesValue.class.getSimpleName(), (BytesValueCreator) (b) -> new MutableByteBufferWrappingBytesValue(byteBuffer(b)), @@ -143,10 +123,6 @@ public class BytesValueImplementationsTest { return ByteBuffer.wrap(bytes); } - private static ByteBuf byteBuf(final byte[] bytes) { - return Unpooled.copiedBuffer(bytes); - } - private static Buffer buffer(final byte[] bytes) { return Buffer.buffer(bytes); } @@ -155,10 +131,8 @@ public class BytesValueImplementationsTest { return Buffer.buffer(fromHexString(hex).getArrayUnsafe()); } - private static ByteBuf hexToByteBuf(final String hex) { - final byte[] bytes = fromHexString(hex).getArrayUnsafe(); - return Unpooled.unreleasableBuffer(Unpooled.buffer(bytes.length, Integer.MAX_VALUE)) - .writeBytes(bytes); + private static byte[] hexToByteArray(final String hex) { + return fromHexString(hex).getArrayUnsafe(); } private BytesValue fromHex(final String hex) { @@ -166,7 +140,7 @@ public class BytesValueImplementationsTest { if (hex.substring(0, 2).equals("0x")) { hexVal = hex.substring((2)); } - byte[] bytes = BaseEncoding.base16().decode(hexVal); + final byte[] bytes = BaseEncoding.base16().decode(hexVal); return creator.create(bytes); } @@ -354,43 +328,38 @@ public class BytesValueImplementationsTest { } @Test - public void appending() { - assertAppendTo(BytesValue.EMPTY, Buffer.buffer(), BytesValue.EMPTY); - assertAppendTo(BytesValue.EMPTY, hexToBuffer("0x1234"), fromHex("0x1234")); - assertAppendTo(fromHex("0x1234"), Buffer.buffer(), fromHex("0x1234")); - assertAppendTo(fromHex("0x5678"), hexToBuffer("0x1234"), fromHex("0x12345678")); + public void appendingToBuffer() { + assertAppendToBuffer(BytesValue.EMPTY, Buffer.buffer(), BytesValue.EMPTY); + assertAppendToBuffer(BytesValue.EMPTY, hexToBuffer("0x1234"), fromHex("0x1234")); + assertAppendToBuffer(fromHex("0x1234"), Buffer.buffer(), fromHex("0x1234")); + assertAppendToBuffer(fromHex("0x5678"), hexToBuffer("0x1234"), fromHex("0x12345678")); } - private void assertAppendTo( + private void assertAppendToBuffer( final BytesValue toAppend, final Buffer buffer, final BytesValue expected) { toAppend.appendTo(buffer); assertEquals(expected, BytesValue.wrap(buffer.getBytes())); } @Test - public void appendingToByteBuf() { - final byte[] bytes0 = new byte[0]; - final byte[] bytes1 = new byte[0]; - assertAppendToByteBuf( - BytesValue.EMPTY, - Unpooled.unreleasableBuffer(Unpooled.buffer(bytes0.length, Integer.MAX_VALUE)) - .writeBytes(bytes0), - BytesValue.EMPTY); - assertAppendToByteBuf(BytesValue.EMPTY, hexToByteBuf("0x1234"), fromHex("0x1234")); - assertAppendToByteBuf( - fromHex("0x1234"), - Unpooled.unreleasableBuffer(Unpooled.buffer(bytes1.length, Integer.MAX_VALUE)) - .writeBytes(bytes1), - fromHex("0x1234")); - assertAppendToByteBuf(fromHex("0x5678"), hexToByteBuf("0x1234"), fromHex("0x12345678")); - } - - private void assertAppendToByteBuf( - final BytesValue toAppend, final ByteBuf buffer, final BytesValue expected) { - toAppend.appendTo(buffer); - final byte[] arr = new byte[buffer.writerIndex()]; - buffer.getBytes(0, arr); - assertEquals(expected, BytesValue.wrap(arr)); + public void copyingToByteArray() { + assertCopyToByteArray(BytesValue.EMPTY, 0, new byte[0], 0, BytesValue.EMPTY); + assertCopyToByteArray(BytesValue.EMPTY, 0, hexToByteArray("0x1234"), 2, fromHex("0x1234")); + assertCopyToByteArray(fromHex("0x1234"), 0, new byte[2], 0, fromHex("0x1234")); + assertCopyToByteArray(fromHex("0x1234"), 1, new byte[2], 0, fromHex("0x3400")); + assertCopyToByteArray(fromHex("0x1234"), 1, new byte[2], 1, fromHex("0x0034")); + assertCopyToByteArray( + fromHex("0x5678"), 0, Arrays.copyOf(hexToByteArray("0x1234"), 4), 2, fromHex("0x12345678")); + } + + private void assertCopyToByteArray( + final BytesValue toAppend, + final int srcPos, + final byte[] dest, + final int destPos, + final BytesValue expected) { + toAppend.copyTo(dest, srcPos, destPos); + assertEquals(expected, BytesValue.wrap(dest)); } @SuppressWarnings("DoNotInvokeMessageDigestDirectly") @@ -646,11 +615,11 @@ public class BytesValueImplementationsTest { @FunctionalInterface private interface BytesValueCreator { - public BytesValue create(byte[] bytes); + BytesValue create(byte[] bytes); } @FunctionalInterface private interface BytesValueSliceCreator { - public BytesValue create(byte[] bytes, int start, int length); + BytesValue create(byte[] bytes, int start, int length); } }