Remove reference counting from MessageData (#264)

* Remove release/retain requirement from MessageData.

MessageData now wraps a BytesValue rather than a Netty ByteBuf so we no longer need to call release/retain and pass that through to the underlying ByteBuf.
Entirely removed support for a BytesValue wrapping a Netty ByteBuf as it does not provide any way to release the underlying ByteBuf.

* Add BytesValue.copyTo(byte[]) to avoid the need to call getArrayUnsafe() to copy the data.
Adrian Sutton 6 years ago committed by GitHub
parent 9d88a55155
commit f869b740f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java
  2. 15
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrePrepareMessage.java
  3. 15
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftPrepareMessage.java
  4. 15
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/IbftRoundChangeMessage.java
  5. 8
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessagedata/IbftSignedMessageData.java
  6. 8
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java
  7. 7
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
  8. 16
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java
  9. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManager.java
  10. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessage.java
  11. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessage.java
  12. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessage.java
  13. 29
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessage.java
  14. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessage.java
  15. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessage.java
  16. 17
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessage.java
  17. 16
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessage.java
  18. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessage.java
  19. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessage.java
  20. 16
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessage.java
  21. 20
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessage.java
  22. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java
  23. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java
  24. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java
  25. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java
  26. 16
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
  27. 1
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java
  28. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RequestManagerTest.java
  29. 32
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/RespondingEthPeer.java
  30. 12
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockBodiesMessageTest.java
  31. 12
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/BlockHeadersMessageTest.java
  32. 12
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockBodiesMessageTest.java
  33. 31
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetBlockHeadersMessageTest.java
  34. 15
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetNodeDataMessageTest.java
  35. 15
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/GetReceiptsMessageTest.java
  36. 12
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockHashesMessageTest.java
  37. 10
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NewBlockMessageTest.java
  38. 15
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/NodeDataMessageTest.java
  39. 15
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/ReceiptsMessageTest.java
  40. 6
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/StatusMessageTest.java
  41. 15
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessageTest.java
  42. 11
      ethereum/mock-p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/testing/MockNetworkTest.java
  43. 26
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkMemoryPool.java
  44. 8
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/NetworkRunner.java
  45. 16
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/api/MessageData.java
  46. 3
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/ApiHandler.java
  47. 16
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexer.java
  48. 10
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java
  49. 2
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyPeerConnection.java
  50. 39
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java
  51. 4
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/handshake/ecies/ECIESHandshaker.java
  52. 51
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/utils/ByteBufUtils.java
  53. 25
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java
  54. 14
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java
  55. 4
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java
  56. 20
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/DisconnectMessage.java
  57. 13
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/EmptyMessage.java
  58. 22
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/messages/HelloMessage.java
  59. 12
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/CapabilityMultiplexerTest.java
  60. 9
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java
  61. 12
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/wire/WireMessagesSedesTest.java
  62. 36
      util/src/main/java/tech/pegasys/pantheon/util/bytes/BytesValue.java
  63. 89
      util/src/test/java/tech/pegasys/pantheon/util/bytes/BytesValueImplementationsTest.java

@ -13,28 +13,13 @@
package tech.pegasys.pantheon.consensus.ibft.ibftmessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import io.netty.buffer.ByteBuf;
import tech.pegasys.pantheon.util.bytes.BytesValue;
public abstract class AbstractIbftMessage extends AbstractMessageData {
protected AbstractIbftMessage(final ByteBuf data) {
protected AbstractIbftMessage(final BytesValue data) {
super(data);
}
public abstract IbftSignedMessageData<?> decode();
protected static ByteBuf writeMessageToByteBuf(
final IbftSignedMessageData<?> ibftSignedMessageData) {
BytesValueRLPOutput rlpEncode = new BytesValueRLPOutput();
ibftSignedMessageData.writeTo(rlpEncode);
final ByteBuf data = NetworkMemoryPool.allocate(rlpEncode.encodedSize());
data.writeBytes(rlpEncode.encoded().extractArray());
return data;
}
}

@ -14,24 +14,20 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedPrePrepareMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import io.netty.buffer.ByteBuf;
public class IbftPrePrepareMessage extends AbstractIbftMessage {
private static final int MESSAGE_CODE = IbftV2.PRE_PREPARE;
private IbftPrePrepareMessage(final ByteBuf data) {
private IbftPrePrepareMessage(final BytesValue data) {
super(data);
}
public static IbftPrePrepareMessage fromMessage(final MessageData message) {
if (message instanceof IbftPrePrepareMessage) {
message.retain();
return (IbftPrePrepareMessage) message;
}
final int code = message.getCode();
@ -40,21 +36,18 @@ public class IbftPrePrepareMessage extends AbstractIbftMessage {
String.format("Message has code %d and thus is not a PrePrepareMessage", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new IbftPrePrepareMessage(data);
return new IbftPrePrepareMessage(message.getData());
}
@Override
public IbftSignedMessageData<IbftUnsignedPrePrepareMessageData> decode() {
return IbftSignedMessageData.readIbftSignedPrePrepareMessageDataFrom(
RLP.input(BytesValue.wrapBuffer(data)));
return IbftSignedMessageData.readIbftSignedPrePrepareMessageDataFrom(RLP.input(data));
}
public static IbftPrePrepareMessage create(
final IbftSignedMessageData<IbftUnsignedPrePrepareMessageData> ibftPrepareMessageDecoded) {
return new IbftPrePrepareMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded));
return new IbftPrePrepareMessage(ibftPrepareMessageDecoded.encode());
}
@Override

@ -14,24 +14,20 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedPrepareMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import io.netty.buffer.ByteBuf;
public class IbftPrepareMessage extends AbstractIbftMessage {
private static final int MESSAGE_CODE = IbftV2.PREPARE;
private IbftPrepareMessage(final ByteBuf data) {
private IbftPrepareMessage(final BytesValue data) {
super(data);
}
public static IbftPrepareMessage fromMessage(final MessageData message) {
if (message instanceof IbftPrepareMessage) {
message.retain();
return (IbftPrepareMessage) message;
}
final int code = message.getCode();
@ -40,21 +36,18 @@ public class IbftPrepareMessage extends AbstractIbftMessage {
String.format("Message has code %d and thus is not a PrepareMessage", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new IbftPrepareMessage(data);
return new IbftPrepareMessage(message.getData());
}
@Override
public IbftSignedMessageData<IbftUnsignedPrepareMessageData> decode() {
return IbftSignedMessageData.readIbftSignedPrepareMessageDataFrom(
RLP.input(BytesValue.wrapBuffer(data)));
return IbftSignedMessageData.readIbftSignedPrepareMessageDataFrom(RLP.input(data));
}
public static IbftPrepareMessage create(
final IbftSignedMessageData<IbftUnsignedPrepareMessageData> ibftPrepareMessageDecoded) {
return new IbftPrepareMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded));
return new IbftPrepareMessage(ibftPrepareMessageDecoded.encode());
}
@Override

@ -14,24 +14,20 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedRoundChangeMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import io.netty.buffer.ByteBuf;
public class IbftRoundChangeMessage extends AbstractIbftMessage {
private static final int MESSAGE_CODE = IbftV2.ROUND_CHANGE;
private IbftRoundChangeMessage(final ByteBuf data) {
private IbftRoundChangeMessage(final BytesValue data) {
super(data);
}
public static IbftRoundChangeMessage fromMessage(final MessageData message) {
if (message instanceof IbftRoundChangeMessage) {
message.retain();
return (IbftRoundChangeMessage) message;
}
final int code = message.getCode();
@ -40,21 +36,18 @@ public class IbftRoundChangeMessage extends AbstractIbftMessage {
String.format("Message has code %d and thus is not a RoundChangeMessage", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new IbftRoundChangeMessage(data);
return new IbftRoundChangeMessage(message.getData());
}
@Override
public IbftSignedMessageData<IbftUnsignedRoundChangeMessageData> decode() {
return IbftSignedMessageData.readIbftSignedRoundChangeMessageDataFrom(
RLP.input(BytesValue.wrapBuffer(data)));
return IbftSignedMessageData.readIbftSignedRoundChangeMessageDataFrom(RLP.input(data));
}
public static IbftRoundChangeMessage create(
final IbftSignedMessageData<IbftUnsignedRoundChangeMessageData> ibftPrepareMessageDecoded) {
return new IbftRoundChangeMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded));
return new IbftRoundChangeMessage(ibftPrepareMessageDecoded.encode());
}
@Override

@ -15,8 +15,10 @@ package tech.pegasys.pantheon.consensus.ibft.ibftmessagedata;
import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
public class IbftSignedMessageData<M extends AbstractIbftUnsignedMessageData> {
@ -51,6 +53,12 @@ public class IbftSignedMessageData<M extends AbstractIbftUnsignedMessageData> {
output.endList();
}
public BytesValue encode() {
final BytesValueRLPOutput rlpEncode = new BytesValueRLPOutput();
writeTo(rlpEncode);
return rlpEncode.encoded();
}
public static IbftSignedMessageData<IbftUnsignedPrePrepareMessageData>
readIbftSignedPrePrepareMessageDataFrom(final RLPInput rlpInput) {

@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.consensus.ibft.network;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -29,6 +28,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.math.BigInteger;
import java.util.List;
@ -74,7 +74,7 @@ public class IbftNetworkPeersTest {
peers.peerAdded(peer);
}
final MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER);
final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY);
peers.multicastToValidators(messageToSend);
verify(peerConnections.get(0), times(1)).sendForProtocol("IBF", messageToSend);
@ -93,9 +93,9 @@ public class IbftNetworkPeersTest {
final IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider);
// only add peer connections 1, 2 & 3, none of which should be invoked.
Lists.newArrayList(1, 2, 3).stream().forEach(i -> peers.peerAdded(peerConnections.get(i)));
Lists.newArrayList(1, 2, 3).forEach(i -> peers.peerAdded(peerConnections.get(i)));
final MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER);
final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY);
peers.multicastToValidators(messageToSend);
verify(peerConnections.get(0), never()).sendForProtocol(any(), any());

@ -244,8 +244,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
// Parsing errors can happen when clients broadcast network ids outside of the int range,
// So just disconnect with "subprotocol" error rather than "breach of protocol".
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} finally {
status.release();
}
}
@ -266,15 +264,10 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
.forEach(
peer -> {
try {
// Send(msg) will release the NewBlockMessage's internal buffer, thus it must be
// retained
// prior to transmission - then released on exit from function.
newBlockMessage.retain();
peer.send(newBlockMessage);
} catch (final PeerNotConnected ex) {
// Peers may disconnect while traversing the list, this is a normal occurrence.
}
});
newBlockMessage.release();
}
}

@ -118,7 +118,6 @@ class EthServer {
static MessageData constructGetHeadersResponse(
final Blockchain blockchain, final MessageData message, final int requestLimit) {
final GetBlockHeadersMessage getHeaders = GetBlockHeadersMessage.readFrom(message);
try {
final Optional<Hash> hash = getHeaders.hash();
final int skip = getHeaders.skip();
final int maxHeaders = Math.min(requestLimit, getHeaders.maxHeaders());
@ -151,15 +150,11 @@ class EthServer {
}
}
return BlockHeadersMessage.create(resp);
} finally {
getHeaders.release();
}
}
static MessageData constructGetBodiesResponse(
final Blockchain blockchain, final MessageData message, final int requestLimit) {
final GetBlockBodiesMessage getBlockBodiesMessage = GetBlockBodiesMessage.readFrom(message);
try {
final Iterable<Hash> hashes = getBlockBodiesMessage.hashes();
final Collection<BlockBody> bodies = new ArrayList<>();
@ -176,15 +171,11 @@ class EthServer {
bodies.add(maybeBody.get());
}
return BlockBodiesMessage.create(bodies);
} finally {
getBlockBodiesMessage.release();
}
}
static MessageData constructGetReceiptsResponse(
final Blockchain blockchain, final MessageData message, final int requestLimit) {
final GetReceiptsMessage getReceipts = GetReceiptsMessage.readFrom(message);
try {
final Iterable<Hash> hashes = getReceipts.hashes();
final List<List<TransactionReceipt>> receipts = new ArrayList<>();
@ -201,15 +192,11 @@ class EthServer {
receipts.add(maybeReceipts.get());
}
return ReceiptsMessage.create(receipts);
} finally {
getReceipts.release();
}
}
static MessageData constructGetNodeDataResponse(
final MessageData message, final int requestLimit) {
final GetNodeDataMessage getNodeDataMessage = GetNodeDataMessage.readFrom(message);
try {
final Iterable<Hash> hashes = getNodeDataMessage.hashes();
final List<BytesValue> nodeData = new ArrayList<>();
@ -222,8 +209,5 @@ class EthServer {
// TODO: Lookup node data and add it to the list
}
return NodeDataMessage.create(nodeData);
} finally {
getNodeDataMessage.release();
}
}
}

@ -156,7 +156,6 @@ public class RequestManager {
if (closed) {
return;
}
message.retain();
bufferedResponses.add(new Response(false, message));
dispatchBufferedResponses();
}
@ -168,9 +167,6 @@ public class RequestManager {
Response response = bufferedResponses.poll();
while (response != null) {
responseCallback.exec(response.closed, response.message, peer);
if (response.message != null) {
response.message.release();
}
response = bufferedResponses.poll();
}
}

@ -16,20 +16,16 @@ import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHashFunction;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import io.netty.buffer.ByteBuf;
public final class BlockBodiesMessage extends AbstractMessageData {
public static BlockBodiesMessage readFrom(final MessageData message) {
if (message instanceof BlockBodiesMessage) {
message.retain();
return (BlockBodiesMessage) message;
}
final int code = message.getCode();
@ -37,9 +33,7 @@ public final class BlockBodiesMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a BlockBodiesMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new BlockBodiesMessage(data);
return new BlockBodiesMessage(message.getData());
}
public static BlockBodiesMessage create(final Iterable<BlockBody> bodies) {
@ -47,12 +41,10 @@ public final class BlockBodiesMessage extends AbstractMessageData {
tmp.startList();
bodies.forEach(body -> body.writeTo(tmp));
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new BlockBodiesMessage(data);
return new BlockBodiesMessage(tmp.encoded());
}
private BlockBodiesMessage(final ByteBuf data) {
private BlockBodiesMessage(final BytesValue data) {
super(data);
}
@ -64,9 +56,7 @@ public final class BlockBodiesMessage extends AbstractMessageData {
public <C> Iterable<BlockBody> bodies(final ProtocolSchedule<C> protocolSchedule) {
final BlockHashFunction blockHashFunction =
ScheduleBasedBlockHashFunction.create(protocolSchedule);
final byte[] tmp = new byte[data.readableBytes()];
data.getBytes(0, tmp);
return new BytesValueRLPInput(BytesValue.wrap(tmp), false)
return new BytesValueRLPInput(data, false)
.readList(rlp -> BlockBody.readFrom(rlp, blockHashFunction));
}
}

@ -16,7 +16,6 @@ import tech.pegasys.pantheon.ethereum.core.BlockHashFunction;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -25,13 +24,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Iterator;
import io.netty.buffer.ByteBuf;
public final class BlockHeadersMessage extends AbstractMessageData {
public static BlockHeadersMessage readFrom(final MessageData message) {
if (message instanceof BlockHeadersMessage) {
message.retain();
return (BlockHeadersMessage) message;
}
final int code = message.getCode();
@ -39,9 +35,7 @@ public final class BlockHeadersMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a BlockHeadersMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new BlockHeadersMessage(data);
return new BlockHeadersMessage(message.getData());
}
public static BlockHeadersMessage create(final Iterable<BlockHeader> headers) {
@ -51,12 +45,10 @@ public final class BlockHeadersMessage extends AbstractMessageData {
header.writeTo(tmp);
}
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new BlockHeadersMessage(data);
return new BlockHeadersMessage(tmp.encoded());
}
private BlockHeadersMessage(final ByteBuf data) {
private BlockHeadersMessage(final BytesValue data) {
super(data);
}
@ -68,9 +60,7 @@ public final class BlockHeadersMessage extends AbstractMessageData {
public <C> Iterator<BlockHeader> getHeaders(final ProtocolSchedule<C> protocolSchedule) {
final BlockHashFunction blockHashFunction =
ScheduleBasedBlockHashFunction.create(protocolSchedule);
final byte[] headers = new byte[data.readableBytes()];
data.getBytes(0, headers);
return new BytesValueRLPInput(BytesValue.wrap(headers), false)
return new BytesValueRLPInput(data, false)
.readList(rlp -> BlockHeader.readFrom(rlp, blockHashFunction))
.iterator();
}

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.Collection;
import io.netty.buffer.ByteBuf;
public final class GetBlockBodiesMessage extends AbstractMessageData {
public static GetBlockBodiesMessage readFrom(final MessageData message) {
if (message instanceof GetBlockBodiesMessage) {
message.retain();
return (GetBlockBodiesMessage) message;
}
final int code = message.getCode();
@ -38,9 +34,7 @@ public final class GetBlockBodiesMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetBlockBodiesMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new GetBlockBodiesMessage(data);
return new GetBlockBodiesMessage(message.getData());
}
public static GetBlockBodiesMessage create(final Iterable<Hash> hashes) {
@ -48,12 +42,10 @@ public final class GetBlockBodiesMessage extends AbstractMessageData {
tmp.startList();
hashes.forEach(tmp::writeBytesValue);
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new GetBlockBodiesMessage(data);
return new GetBlockBodiesMessage(tmp.encoded());
}
private GetBlockBodiesMessage(final ByteBuf data) {
private GetBlockBodiesMessage(final BytesValue data) {
super(data);
}
@ -63,9 +55,7 @@ public final class GetBlockBodiesMessage extends AbstractMessageData {
}
public Iterable<Hash> hashes() {
final byte[] tmp = new byte[data.readableBytes()];
data.getBytes(0, tmp);
final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false);
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
final Collection<Hash> hashes = new ArrayList<>();
while (!input.isEndOfCurrentList()) {

@ -15,19 +15,17 @@ package tech.pegasys.pantheon.ethereum.eth.messages;
import static com.google.common.base.Preconditions.checkArgument;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Optional;
import java.util.OptionalLong;
import io.netty.buffer.ByteBuf;
/** PV62 GetBlockHeaders Message. */
public final class GetBlockHeadersMessage extends AbstractMessageData {
@ -35,7 +33,6 @@ public final class GetBlockHeadersMessage extends AbstractMessageData {
public static GetBlockHeadersMessage readFrom(final MessageData message) {
if (message instanceof GetBlockHeadersMessage) {
message.retain();
return (GetBlockHeadersMessage) message;
}
final int code = message.getCode();
@ -43,9 +40,7 @@ public final class GetBlockHeadersMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetBlockHeadersMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new GetBlockHeadersMessage(data);
return new GetBlockHeadersMessage(message.getData());
}
public static GetBlockHeadersMessage create(
@ -54,9 +49,7 @@ public final class GetBlockHeadersMessage extends AbstractMessageData {
GetBlockHeadersData.create(blockNum, maxHeaders, skip, reverse);
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
getBlockHeadersData.writeTo(tmp);
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new GetBlockHeadersMessage(data);
return new GetBlockHeadersMessage(tmp.encoded());
}
public static GetBlockHeadersMessage create(
@ -65,12 +58,10 @@ public final class GetBlockHeadersMessage extends AbstractMessageData {
GetBlockHeadersData.create(hash, maxHeaders, skip, reverse);
final BytesValueRLPOutput tmp = new BytesValueRLPOutput();
getBlockHeadersData.writeTo(tmp);
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new GetBlockHeadersMessage(data);
return new GetBlockHeadersMessage(tmp.encoded());
}
private GetBlockHeadersMessage(final ByteBuf data) {
private GetBlockHeadersMessage(final BytesValue data) {
super(data);
}
@ -113,7 +104,7 @@ public final class GetBlockHeadersMessage extends AbstractMessageData {
private GetBlockHeadersData getBlockHeadersData() {
if (getBlockHeadersData == null) {
getBlockHeadersData = GetBlockHeadersData.readFrom(ByteBufUtils.toRLPInput(data));
getBlockHeadersData = GetBlockHeadersData.readFrom(RLP.input(data));
}
return getBlockHeadersData;
}
@ -160,9 +151,9 @@ public final class GetBlockHeadersMessage extends AbstractMessageData {
blockNumber = OptionalLong.of(input.readLongScalar());
}
int maxHeaders = input.readIntScalar();
int skip = input.readIntScalar();
boolean reverse = input.readIntScalar() != 0;
final int maxHeaders = input.readIntScalar();
final int skip = input.readIntScalar();
final boolean reverse = input.readIntScalar() != 0;
input.leaveList();

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.Collection;
import io.netty.buffer.ByteBuf;
public final class GetNodeDataMessage extends AbstractMessageData {
public static GetNodeDataMessage readFrom(final MessageData message) {
if (message instanceof GetNodeDataMessage) {
message.retain();
return (GetNodeDataMessage) message;
}
final int code = message.getCode();
@ -38,9 +34,7 @@ public final class GetNodeDataMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetNodeDataMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new GetNodeDataMessage(data);
return new GetNodeDataMessage(message.getData());
}
public static GetNodeDataMessage create(final Iterable<Hash> hashes) {
@ -48,12 +42,10 @@ public final class GetNodeDataMessage extends AbstractMessageData {
tmp.startList();
hashes.forEach(tmp::writeBytesValue);
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new GetNodeDataMessage(data);
return new GetNodeDataMessage(tmp.encoded());
}
private GetNodeDataMessage(final ByteBuf data) {
private GetNodeDataMessage(final BytesValue data) {
super(data);
}
@ -63,9 +55,7 @@ public final class GetNodeDataMessage extends AbstractMessageData {
}
public Iterable<Hash> hashes() {
final byte[] tmp = new byte[data.readableBytes()];
data.getBytes(0, tmp);
final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false);
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
final Collection<Hash> hashes = new ArrayList<>();
while (!input.isEndOfCurrentList()) {

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.Collection;
import io.netty.buffer.ByteBuf;
public final class GetReceiptsMessage extends AbstractMessageData {
public static GetReceiptsMessage readFrom(final MessageData message) {
if (message instanceof GetReceiptsMessage) {
message.retain();
return (GetReceiptsMessage) message;
}
final int code = message.getCode();
@ -38,9 +34,7 @@ public final class GetReceiptsMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a GetReceipts.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new GetReceiptsMessage(data);
return new GetReceiptsMessage(message.getData());
}
public static GetReceiptsMessage create(final Iterable<Hash> hashes) {
@ -48,12 +42,10 @@ public final class GetReceiptsMessage extends AbstractMessageData {
tmp.startList();
hashes.forEach(tmp::writeBytesValue);
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new GetReceiptsMessage(data);
return new GetReceiptsMessage(tmp.encoded());
}
private GetReceiptsMessage(final ByteBuf data) {
private GetReceiptsMessage(final BytesValue data) {
super(data);
}
@ -63,9 +55,7 @@ public final class GetReceiptsMessage extends AbstractMessageData {
}
public Iterable<Hash> hashes() {
final byte[] tmp = new byte[data.readableBytes()];
data.getBytes(0, tmp);
final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false);
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
final Collection<Hash> hashes = new ArrayList<>();
while (!input.isEndOfCurrentList()) {

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -24,13 +23,11 @@ import java.util.Iterator;
import java.util.Objects;
import com.google.common.collect.Iterators;
import io.netty.buffer.ByteBuf;
public final class NewBlockHashesMessage extends AbstractMessageData {
public static NewBlockHashesMessage readFrom(final MessageData message) {
if (message instanceof NewBlockHashesMessage) {
message.retain();
return (NewBlockHashesMessage) message;
}
final int code = message.getCode();
@ -38,9 +35,7 @@ public final class NewBlockHashesMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a NewBlockHashesMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new NewBlockHashesMessage(data);
return new NewBlockHashesMessage(message.getData());
}
public static NewBlockHashesMessage create(
@ -54,12 +49,10 @@ public final class NewBlockHashesMessage extends AbstractMessageData {
tmp.endList();
}
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new NewBlockHashesMessage(data);
return new NewBlockHashesMessage(tmp.encoded());
}
private NewBlockHashesMessage(final ByteBuf data) {
private NewBlockHashesMessage(final BytesValue data) {
super(data);
}
@ -69,9 +62,7 @@ public final class NewBlockHashesMessage extends AbstractMessageData {
}
public Iterator<NewBlockHashesMessage.NewBlockHash> getNewHashes() {
final byte[] hashes = new byte[data.readableBytes()];
data.getBytes(0, hashes);
return new BytesValueRLPInput(BytesValue.wrap(hashes), false)
return new BytesValueRLPInput(data, false)
.readList(
rlpInput -> {
rlpInput.enterList();

@ -16,9 +16,7 @@ import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHashFunction;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
@ -27,15 +25,13 @@ import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
import io.netty.buffer.ByteBuf;
public class NewBlockMessage extends AbstractMessageData {
private static final int MESSAGE_CODE = EthPV62.NEW_BLOCK;
private NewBlockMessageData messageFields = null;
private NewBlockMessage(final ByteBuf data) {
private NewBlockMessage(final BytesValue data) {
super(data);
}
@ -48,13 +44,11 @@ public class NewBlockMessage extends AbstractMessageData {
final NewBlockMessageData msgData = new NewBlockMessageData(block, totalDifficulty);
final BytesValueRLPOutput out = new BytesValueRLPOutput();
msgData.writeTo(out);
final ByteBuf data = ByteBufUtils.fromRLPOutput(out);
return new NewBlockMessage(data);
return new NewBlockMessage(out.encoded());
}
public static NewBlockMessage readFrom(final MessageData message) {
if (message instanceof NewBlockMessage) {
message.retain();
return (NewBlockMessage) message;
}
final int code = message.getCode();
@ -62,9 +56,7 @@ public class NewBlockMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a NewBlockMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new NewBlockMessage(data);
return new NewBlockMessage(message.getData());
}
public <C> Block block(final ProtocolSchedule<C> protocolSchedule) {
@ -77,7 +69,7 @@ public class NewBlockMessage extends AbstractMessageData {
private <C> NewBlockMessageData messageFields(final ProtocolSchedule<C> protocolSchedule) {
if (messageFields == null) {
final RLPInput input = RLP.input(BytesValue.wrap(ByteBufUtils.toByteArray(data)));
final RLPInput input = RLP.input(data);
messageFields = NewBlockMessageData.readFrom(input, protocolSchedule);
}
return messageFields;

@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -23,13 +22,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.Collection;
import io.netty.buffer.ByteBuf;
public final class NodeDataMessage extends AbstractMessageData {
public static NodeDataMessage readFrom(final MessageData message) {
if (message instanceof NodeDataMessage) {
message.retain();
return (NodeDataMessage) message;
}
final int code = message.getCode();
@ -37,9 +33,7 @@ public final class NodeDataMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a NodeDataMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new NodeDataMessage(data);
return new NodeDataMessage(message.getData());
}
public static NodeDataMessage create(final Iterable<BytesValue> nodeData) {
@ -47,12 +41,10 @@ public final class NodeDataMessage extends AbstractMessageData {
tmp.startList();
nodeData.forEach(tmp::writeBytesValue);
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new NodeDataMessage(data);
return new NodeDataMessage(tmp.encoded());
}
private NodeDataMessage(final ByteBuf data) {
private NodeDataMessage(final BytesValue data) {
super(data);
}
@ -62,9 +54,7 @@ public final class NodeDataMessage extends AbstractMessageData {
}
public Iterable<BytesValue> nodeData() {
final byte[] tmp = new byte[data.readableBytes()];
data.getBytes(0, tmp);
final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false);
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
final Collection<BytesValue> nodeData = new ArrayList<>();
while (!input.isEndOfCurrentList()) {

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
public final class ReceiptsMessage extends AbstractMessageData {
public static ReceiptsMessage readFrom(final MessageData message) {
if (message instanceof ReceiptsMessage) {
message.retain();
return (ReceiptsMessage) message;
}
final int code = message.getCode();
@ -38,9 +34,7 @@ public final class ReceiptsMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a ReceiptsMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new ReceiptsMessage(data);
return new ReceiptsMessage(message.getData());
}
public static ReceiptsMessage create(final List<List<TransactionReceipt>> receipts) {
@ -53,12 +47,10 @@ public final class ReceiptsMessage extends AbstractMessageData {
tmp.endList();
});
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new ReceiptsMessage(data);
return new ReceiptsMessage(tmp.encoded());
}
private ReceiptsMessage(final ByteBuf data) {
private ReceiptsMessage(final BytesValue data) {
super(data);
}
@ -68,9 +60,7 @@ public final class ReceiptsMessage extends AbstractMessageData {
}
public List<List<TransactionReceipt>> receipts() {
final byte[] tmp = new byte[data.readableBytes()];
data.getBytes(0, tmp);
final RLPInput input = new BytesValueRLPInput(BytesValue.wrap(tmp), false);
final RLPInput input = new BytesValueRLPInput(data, false);
input.enterList();
final List<List<TransactionReceipt>> receipts = new ArrayList<>();
while (input.nextIsList()) {

@ -13,9 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
@ -25,13 +23,11 @@ import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
import io.netty.buffer.ByteBuf;
public final class StatusMessage extends AbstractMessageData {
private EthStatus status;
public StatusMessage(final ByteBuf data) {
public StatusMessage(final BytesValue data) {
super(data);
}
@ -45,14 +41,12 @@ public final class StatusMessage extends AbstractMessageData {
new EthStatus(protocolVersion, networkId, totalDifficulty, bestHash, genesisHash);
final BytesValueRLPOutput out = new BytesValueRLPOutput();
status.writeTo(out);
final ByteBuf data = ByteBufUtils.fromRLPOutput(out);
return new StatusMessage(data);
return new StatusMessage(out.encoded());
}
public static StatusMessage readFrom(final MessageData message) {
if (message instanceof StatusMessage) {
message.retain();
return (StatusMessage) message;
}
final int code = message.getCode();
@ -60,9 +54,7 @@ public final class StatusMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a StatusMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new StatusMessage(data);
return new StatusMessage(message.getData());
}
@Override
@ -99,7 +91,7 @@ public final class StatusMessage extends AbstractMessageData {
private EthStatus status() {
if (status == null) {
final RLPInput input = RLP.input(BytesValue.wrap(ByteBufUtils.toByteArray(data)));
final RLPInput input = RLP.input(data);
status = EthStatus.readFrom(input);
}
return status;

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -24,13 +23,10 @@ import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Iterator;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
public class TransactionsMessage extends AbstractMessageData {
public static TransactionsMessage readFrom(final MessageData message) {
if (message instanceof TransactionsMessage) {
message.retain();
return (TransactionsMessage) message;
}
final int code = message.getCode();
@ -38,9 +34,7 @@ public class TransactionsMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a TransactionsMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new TransactionsMessage(data);
return new TransactionsMessage(message.getData());
}
public static TransactionsMessage create(final Iterable<Transaction> transactions) {
@ -50,12 +44,10 @@ public class TransactionsMessage extends AbstractMessageData {
transaction.writeTo(tmp);
}
tmp.endList();
final ByteBuf data = NetworkMemoryPool.allocate(tmp.encodedSize());
data.writeBytes(tmp.encoded().extractArray());
return new TransactionsMessage(data);
return new TransactionsMessage(tmp.encoded());
}
private TransactionsMessage(final ByteBuf data) {
private TransactionsMessage(final BytesValue data) {
super(data);
}
@ -66,10 +58,6 @@ public class TransactionsMessage extends AbstractMessageData {
public Iterator<Transaction> transactions(
final Function<RLPInput, Transaction> transactionReader) {
final byte[] transactions = new byte[data.readableBytes()];
data.getBytes(0, transactions);
return new BytesValueRLPInput(BytesValue.wrap(transactions), false)
.readList(transactionReader)
.iterator();
return new BytesValueRLPInput(data, false).readList(transactionReader).iterator();
}
}

@ -159,8 +159,6 @@ public class BlockPropagationManager<C> {
importOrSavePendingBlock(block);
} catch (final RLPException e) {
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
} finally {
newBlockMessage.release();
}
}
@ -210,8 +208,6 @@ public class BlockPropagationManager<C> {
}
} catch (final RLPException e) {
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
} finally {
newBlockHashesMessage.release();
}
}

@ -71,7 +71,6 @@ public abstract class AbstractGetHeadersFromPeerTask
}
final BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(message);
try {
final Iterator<BlockHeader> headers = headersMessage.getHeaders(protocolSchedule);
if (!headers.hasNext()) {
// Message contains no data - nothing to do
@ -104,9 +103,6 @@ public abstract class AbstractGetHeadersFromPeerTask
LOG.debug("Received {} of {} headers requested from peer.", headersList.size(), count);
return Optional.of(headersList);
} finally {
headersMessage.release();
}
}
@Override

@ -99,7 +99,6 @@ public class GetBodiesFromPeerTask<C> extends AbstractPeerRequestTask<List<Block
}
final BlockBodiesMessage bodiesMessage = BlockBodiesMessage.readFrom(message);
try {
final List<BlockBody> bodies = Lists.newArrayList(bodiesMessage.bodies(protocolSchedule));
if (bodies.size() == 0) {
// Message contains no data - nothing to do
@ -121,9 +120,6 @@ public class GetBodiesFromPeerTask<C> extends AbstractPeerRequestTask<List<Block
headers.clear();
}
return Optional.of(blocks);
} finally {
bodiesMessage.release();
}
}
@Override

@ -53,8 +53,6 @@ class TransactionsMessageProcessor {
if (peer != null) {
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
}
} finally {
transactionsMessage.release();
}
}
}

@ -188,7 +188,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i);
}
message.release();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -221,7 +220,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < limit; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i);
}
message.release();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -251,7 +249,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i);
}
message.release();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -283,7 +280,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i * (skip + 1));
}
message.release();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -316,7 +312,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < blockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(endBlock - i * (skip + 1));
}
message.release();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -369,7 +364,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < 2; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(startBlock + i);
}
message.release();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -397,7 +391,6 @@ public final class EthProtocolManagerTest {
final List<BlockHeader> headers =
Lists.newArrayList(headersMsg.getHeaders(protocolSchedule));
assertThat(headers.size()).isEqualTo(0);
message.release();
done.complete(null);
};
final PeerConnection peer = setupPeer(ethManager, onSend);
@ -438,7 +431,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < blockCount; i++) {
assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i));
}
message.release();
done.complete(null);
};
@ -483,7 +475,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < limit; i++) {
assertThat(expectedBlocks[i].getBody()).isEqualTo(bodies.get(i));
}
message.release();
done.complete(null);
};
@ -520,7 +511,6 @@ public final class EthProtocolManagerTest {
Lists.newArrayList(blocksMessage.bodies(protocolSchedule));
assertThat(bodies.size()).isEqualTo(1);
assertThat(expectedBlock.getBody()).isEqualTo(bodies.get(0));
message.release();
done.complete(null);
};
@ -562,7 +552,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < blockCount; i++) {
assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i));
}
message.release();
done.complete(null);
};
@ -606,7 +595,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < limit; i++) {
assertThat(expectedReceipts.get(i)).isEqualTo(receipts.get(i));
}
message.release();
done.complete(null);
};
@ -623,7 +611,6 @@ public final class EthProtocolManagerTest {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1)) {
// Setup blocks query
final long blockNumber = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2;
final BlockHeader header = blockchain.getBlockHeader(blockNumber).get();
final List<TransactionReceipt> expectedReceipts =
blockchain.getTxReceipts(header.getHash()).get();
@ -644,7 +631,6 @@ public final class EthProtocolManagerTest {
Lists.newArrayList(receiptsMessage.receipts());
assertThat(receipts.size()).isEqualTo(1);
assertThat(expectedReceipts).isEqualTo(receipts.get(0));
message.release();
done.complete(null);
};
@ -699,7 +685,6 @@ public final class EthProtocolManagerTest {
for (final NewBlockMessage msg : messageSentCaptor.getAllValues()) {
assertThat(msg.block(protocolSchdeule)).isEqualTo(minedBlock);
assertThat(msg.totalDifficulty(protocolSchdeule)).isEqualTo(expectedTotalDifficulty);
msg.release();
}
assertThat(receivingPeerCaptor.getAllValues().containsAll(peers)).isTrue();
@ -740,7 +725,6 @@ public final class EthProtocolManagerTest {
for (int i = 0; i < receivedBlockCount; i++) {
assertThat(headers.get(i).getNumber()).isEqualTo(receivedBlockCount - 1 - i);
}
message.release();
done.complete(null);
};

@ -54,7 +54,6 @@ class MockPeerConnection implements PeerConnection {
@Override
public void send(final Capability capability, final MessageData message) throws PeerNotConnected {
if (disconnected) {
message.release();
throw new PeerNotConnected("MockPeerConnection disconnected");
}
onSend.exec(capability, message, this);

@ -22,6 +22,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.Collections;
@ -31,7 +32,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import io.netty.buffer.Unpooled;
import org.junit.Test;
public class RequestManagerTest {
@ -212,7 +212,7 @@ public class RequestManagerTest {
}
private EthMessage mockMessage(final EthPeer peer) {
return new EthMessage(peer, new RawMessage(1, Unpooled.buffer()));
return new EthMessage(peer, new RawMessage(1, BytesValue.EMPTY));
}
private EthPeer createPeer() {

@ -151,21 +151,12 @@ public class RespondingEthPeer {
final List<OutgoingMessage> currentMessages = new ArrayList<>(outgoingMessages);
outgoingMessages.clear();
for (final OutgoingMessage msg : currentMessages) {
try {
final Optional<MessageData> maybeResponse =
responder.respond(msg.capability, msg.messageData);
maybeResponse.ifPresent(
(response) -> {
try {
(response) ->
ethProtocolManager.processMessage(
msg.capability, new DefaultMessage(peerConnection, response));
} finally {
response.release();
}
});
} finally {
msg.messageData.release();
}
msg.capability, new DefaultMessage(peerConnection, response)));
}
return currentMessages.size() > 0;
}
@ -229,51 +220,34 @@ public class RespondingEthPeer {
switch (msg.getCode()) {
case EthPV62.GET_BLOCK_HEADERS:
final BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(originalResponse);
try {
final List<BlockHeader> originalHeaders =
Lists.newArrayList(headersMessage.getHeaders(protocolSchedule));
final List<BlockHeader> partialHeaders =
originalHeaders.subList(0, (int) (originalHeaders.size() * portion));
partialResponse = BlockHeadersMessage.create(partialHeaders);
} finally {
headersMessage.release();
}
break;
case EthPV62.GET_BLOCK_BODIES:
final BlockBodiesMessage bodiesMessage = BlockBodiesMessage.readFrom(originalResponse);
try {
final List<BlockBody> originalBodies =
Lists.newArrayList(bodiesMessage.bodies(protocolSchedule));
final List<BlockBody> partialBodies =
originalBodies.subList(0, (int) (originalBodies.size() * portion));
partialResponse = BlockBodiesMessage.create(partialBodies);
} finally {
bodiesMessage.release();
}
break;
case EthPV63.GET_RECEIPTS:
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(originalResponse);
try {
final List<List<TransactionReceipt>> originalReceipts =
Lists.newArrayList(receiptsMessage.receipts());
final List<List<TransactionReceipt>> partialReceipts =
originalReceipts.subList(0, (int) (originalReceipts.size() * portion));
partialResponse = ReceiptsMessage.create(partialReceipts);
} finally {
receiptsMessage.release();
}
break;
case EthPV63.GET_NODE_DATA:
final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(originalResponse);
try {
final List<BytesValue> originalNodeData =
Lists.newArrayList(nodeDataMessage.nodeData());
final List<BytesValue> originalNodeData = Lists.newArrayList(nodeDataMessage.nodeData());
final List<BytesValue> partialNodeData =
originalNodeData.subList(0, (int) (originalNodeData.size() * portion));
partialResponse = NodeDataMessage.create(partialNodeData);
} finally {
nodeDataMessage.release();
}
break;
}
return Optional.of(partialResponse);

@ -18,7 +18,6 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.development.DevelopmentProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -33,7 +32,6 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.io.Resources;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -61,11 +59,8 @@ public final class BlockBodiesMessageTest {
rlp -> BlockHeader.readFrom(rlp, MainnetBlockHashFunction::createHash))));
}
final MessageData initialMessage = BlockBodiesMessage.create(bodies);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV62.BLOCK_BODIES, rawBuffer);
final MessageData raw = new RawMessage(EthPV62.BLOCK_BODIES, initialMessage.getData());
final BlockBodiesMessage message = BlockBodiesMessage.readFrom(raw);
try {
final Iterator<BlockBody> readBodies =
message
.bodies(
@ -74,10 +69,5 @@ public final class BlockBodiesMessageTest {
for (int i = 0; i < 50; ++i) {
Assertions.assertThat(readBodies.next()).isEqualTo(bodies.get(i));
}
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -16,7 +16,6 @@ import tech.pegasys.pantheon.config.GenesisConfigFile;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.development.DevelopmentProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -31,7 +30,6 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.io.Resources;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -55,21 +53,13 @@ public final class BlockHeadersMessageTest {
oneBlock.skipNext();
}
final MessageData initialMessage = BlockHeadersMessage.create(headers);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV62.BLOCK_HEADERS, rawBuffer);
final MessageData raw = new RawMessage(EthPV62.BLOCK_HEADERS, initialMessage.getData());
final BlockHeadersMessage message = BlockHeadersMessage.readFrom(raw);
try {
final Iterator<BlockHeader> readHeaders =
message.getHeaders(
DevelopmentProtocolSchedule.create(GenesisConfigFile.DEFAULT.getConfigOptions()));
for (int i = 0; i < 50; ++i) {
Assertions.assertThat(readHeaders.next()).isEqualTo(headers.get(i));
}
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -15,7 +15,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -30,7 +29,6 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.io.Resources;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -54,19 +52,11 @@ public final class GetBlockBodiesMessageTest {
oneBlock.skipNext();
}
final MessageData initialMessage = GetBlockBodiesMessage.create(hashes);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_BODIES, rawBuffer);
final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_BODIES, initialMessage.getData());
final GetBlockBodiesMessage message = GetBlockBodiesMessage.readFrom(raw);
try {
final Iterator<Hash> readHeaders = message.hashes().iterator();
for (int i = 0; i < 50; ++i) {
Assertions.assertThat(readHeaders.next()).isEqualTo(hashes.get(i));
}
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -13,14 +13,12 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Arrays;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -34,21 +32,13 @@ public final class GetBlockHeadersMessageTest {
final int maxHeaders = 128;
final GetBlockHeadersMessage initialMessage =
GetBlockHeadersMessage.create(hash, maxHeaders, skip, reverse);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, rawBuffer);
final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, initialMessage.getData());
final GetBlockHeadersMessage message = GetBlockHeadersMessage.readFrom(raw);
try {
Assertions.assertThat(message.blockNumber()).isEmpty();
Assertions.assertThat(message.hash().get()).isEqualTo(hash);
Assertions.assertThat(message.reverse()).isEqualTo(reverse);
Assertions.assertThat(message.skip()).isEqualTo(skip);
Assertions.assertThat(message.maxHeaders()).isEqualTo(maxHeaders);
} finally {
initialMessage.release();
raw.release();
message.release();
}
}
}
@ -60,20 +50,13 @@ public final class GetBlockHeadersMessageTest {
final int maxHeaders = 128;
final GetBlockHeadersMessage initialMessage =
GetBlockHeadersMessage.create(blockNum, maxHeaders, skip, reverse);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, rawBuffer);
final MessageData raw = new RawMessage(EthPV62.GET_BLOCK_HEADERS, initialMessage.getData());
final GetBlockHeadersMessage message = GetBlockHeadersMessage.readFrom(raw);
try {
Assertions.assertThat(initialMessage.blockNumber().getAsLong()).isEqualTo(blockNum);
Assertions.assertThat(initialMessage.hash()).isEmpty();
Assertions.assertThat(initialMessage.reverse()).isEqualTo(reverse);
Assertions.assertThat(initialMessage.skip()).isEqualTo(skip);
Assertions.assertThat(initialMessage.maxHeaders()).isEqualTo(maxHeaders);
} finally {
initialMessage.release();
raw.release();
message.release();
}
Assertions.assertThat(message.blockNumber().getAsLong()).isEqualTo(blockNum);
Assertions.assertThat(message.hash()).isEmpty();
Assertions.assertThat(message.reverse()).isEqualTo(reverse);
Assertions.assertThat(message.skip()).isEqualTo(skip);
Assertions.assertThat(message.maxHeaders()).isEqualTo(maxHeaders);
}
}
}

@ -13,24 +13,21 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
public final class GetNodeDataMessageTest {
@Test
public void roundTripTest() throws IOException {
public void roundTripTest() {
// Generate some hashes
final BlockDataGenerator gen = new BlockDataGenerator(1);
final List<Hash> hashes = new ArrayList<>();
@ -42,22 +39,14 @@ public final class GetNodeDataMessageTest {
// Perform round-trip transformation
// Create GetNodeData, copy it to a generic message, then read back into a GetNodeData message
final MessageData initialMessage = GetNodeDataMessage.create(hashes);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV63.GET_NODE_DATA, rawBuffer);
final MessageData raw = new RawMessage(EthPV63.GET_NODE_DATA, initialMessage.getData());
final GetNodeDataMessage message = GetNodeDataMessage.readFrom(raw);
// Read hashes back out after round trip and check they match originals.
try {
final Iterator<Hash> readData = message.hashes().iterator();
for (int i = 0; i < hashCount; ++i) {
Assertions.assertThat(readData.next()).isEqualTo(hashes.get(i));
}
Assertions.assertThat(readData.hasNext()).isFalse();
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -13,24 +13,21 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
public final class GetReceiptsMessageTest {
@Test
public void roundTripTest() throws IOException {
public void roundTripTest() {
// Generate some hashes
final BlockDataGenerator gen = new BlockDataGenerator(1);
final List<Hash> hashes = new ArrayList<>();
@ -43,22 +40,14 @@ public final class GetReceiptsMessageTest {
// Create GetReceipts message, copy it to a generic message, then read back into a GetReceipts
// message
final MessageData initialMessage = GetReceiptsMessage.create(hashes);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV63.GET_RECEIPTS, rawBuffer);
final MessageData raw = new RawMessage(EthPV63.GET_RECEIPTS, initialMessage.getData());
final GetReceiptsMessage message = GetReceiptsMessage.readFrom(raw);
// Read hashes back out after round trip and check they match originals.
try {
final Iterator<Hash> readData = message.hashes().iterator();
for (int i = 0; i < hashCount; ++i) {
Assertions.assertThat(readData.next()).isEqualTo(hashes.get(i));
}
Assertions.assertThat(readData.hasNext()).isFalse();
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -14,7 +14,6 @@ package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPInput;
@ -29,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.io.Resources;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -55,19 +53,11 @@ public final class NewBlockHashesMessageTest {
oneBlock.skipNext();
}
final MessageData initialMessage = NewBlockHashesMessage.create(hashes);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV62.NEW_BLOCK_HASHES, rawBuffer);
final MessageData raw = new RawMessage(EthPV62.NEW_BLOCK_HASHES, initialMessage.getData());
final NewBlockHashesMessage message = NewBlockHashesMessage.readFrom(raw);
try {
final Iterator<NewBlockHashesMessage.NewBlockHash> readHeaders = message.getNewHashes();
for (int i = 0; i < 50; ++i) {
Assertions.assertThat(readHeaders.next()).isEqualTo(hashes.get(i));
}
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -18,14 +18,12 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
import io.netty.buffer.Unpooled;
import org.junit.Test;
public class NewBlockMessageTest {
@ -56,10 +54,7 @@ public class NewBlockMessageTest {
tmp.writeUInt256Scalar(totalDifficulty);
tmp.endList();
final BytesValue msgPayload = tmp.encoded();
final RawMessage rawMsg =
new RawMessage(EthPV62.NEW_BLOCK, Unpooled.wrappedBuffer(tmp.encoded().extractArray()));
final RawMessage rawMsg = new RawMessage(EthPV62.NEW_BLOCK, tmp.encoded());
final NewBlockMessage newBlockMsg = NewBlockMessage.readFrom(rawMsg);
@ -71,8 +66,7 @@ public class NewBlockMessageTest {
@Test
public void readFromMessageWithWrongCodeThrows() {
final ProtocolSchedule<Void> protSchedule = MainnetProtocolSchedule.create();
final RawMessage rawMsg = new RawMessage(EthPV62.BLOCK_HEADERS, NetworkMemoryPool.allocate(1));
final RawMessage rawMsg = new RawMessage(EthPV62.BLOCK_HEADERS, BytesValue.of(0));
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> NewBlockMessage.readFrom(rawMsg));

@ -12,25 +12,22 @@
*/
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
public final class NodeDataMessageTest {
@Test
public void roundTripTest() throws IOException {
public void roundTripTest() {
// Generate some data
final BlockDataGenerator gen = new BlockDataGenerator(1);
final List<BytesValue> nodeData = new ArrayList<>();
@ -42,22 +39,14 @@ public final class NodeDataMessageTest {
// Perform round-trip transformation
// Create specific message, copy it to a generic message, then read back into a specific format
final MessageData initialMessage = NodeDataMessage.create(nodeData);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV63.NODE_DATA, rawBuffer);
final MessageData raw = new RawMessage(EthPV63.NODE_DATA, initialMessage.getData());
final NodeDataMessage message = NodeDataMessage.readFrom(raw);
// Read data back out after round trip and check they match originals.
try {
final Iterator<BytesValue> readData = message.nodeData().iterator();
for (int i = 0; i < nodeCount; ++i) {
Assertions.assertThat(readData.next()).isEqualTo(nodeData.get(i));
}
Assertions.assertThat(readData.hasNext()).isFalse();
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -13,24 +13,21 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
public final class ReceiptsMessageTest {
@Test
public void roundTripTest() throws IOException {
public void roundTripTest() {
// Generate some data
final BlockDataGenerator gen = new BlockDataGenerator(1);
final List<List<TransactionReceipt>> receipts = new ArrayList<>();
@ -47,22 +44,14 @@ public final class ReceiptsMessageTest {
// Perform round-trip transformation
// Create specific message, copy it to a generic message, then read back into a specific format
final MessageData initialMessage = ReceiptsMessage.create(receipts);
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV63.RECEIPTS, rawBuffer);
final MessageData raw = new RawMessage(EthPV63.RECEIPTS, initialMessage.getData());
final ReceiptsMessage message = ReceiptsMessage.readFrom(raw);
// Read data back out after round trip and check they match originals.
try {
final Iterator<List<TransactionReceipt>> readData = message.receipts().iterator();
for (int i = 0; i < dataCount; ++i) {
Assertions.assertThat(readData.next()).isEqualTo(receipts.get(i));
}
Assertions.assertThat(readData.hasNext()).isFalse();
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -22,8 +22,6 @@ import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Random;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
public class StatusMessageTest {
@ -56,9 +54,7 @@ public class StatusMessageTest {
final MessageData msg = StatusMessage.create(version, networkId, td, bestHash, genesisHash);
// Make a message copy from serialized data and check deserialized results
final ByteBuf buffer = Unpooled.buffer(msg.getSize(), msg.getSize());
msg.writeTo(buffer);
final StatusMessage copy = new StatusMessage(buffer);
final StatusMessage copy = new StatusMessage(msg.getData());
assertThat(copy.protocolVersion()).isEqualTo(version);
assertThat(copy.networkId()).isEqualTo(networkId);

@ -13,24 +13,21 @@
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
public class TransactionsMessageTest {
@Test
public void transactionRoundTrip() throws IOException {
public void transactionRoundTrip() {
// Setup list of transactions
final int txCount = 20;
final BlockDataGenerator gen = new BlockDataGenerator(1);
@ -42,23 +39,15 @@ public class TransactionsMessageTest {
// Create TransactionsMessage
final MessageData initialMessage = TransactionsMessage.create(transactions);
// Read message into a generic RawMessage
final ByteBuf rawBuffer = NetworkMemoryPool.allocate(initialMessage.getSize());
initialMessage.writeTo(rawBuffer);
final MessageData raw = new RawMessage(EthPV62.TRANSACTIONS, rawBuffer);
final MessageData raw = new RawMessage(EthPV62.TRANSACTIONS, initialMessage.getData());
// Transform back to a TransactionsMessage from RawMessage
final TransactionsMessage message = TransactionsMessage.readFrom(raw);
// Check that transactions match original inputs after transformations
try {
final Iterator<Transaction> readTransactions = message.transactions(Transaction::readFrom);
for (int i = 0; i < txCount; ++i) {
Assertions.assertThat(readTransactions.next()).isEqualTo(transactions.get(i));
}
Assertions.assertThat(readTransactions.hasNext()).isFalse();
} finally {
message.release();
initialMessage.release();
raw.release();
}
}
}

@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.testing;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork;
@ -30,7 +29,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import io.netty.buffer.ByteBuf;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -71,18 +69,15 @@ public final class MockNetworkTest {
// Validate Message Exchange
final int size = 128;
final ByteBuf dataSent = NetworkMemoryPool.allocate(size);
final byte[] data = new byte[size];
ThreadLocalRandom.current().nextBytes(data);
dataSent.writeBytes(data);
final int code = 0x74;
final PeerConnection connection = optionalConnection.get();
connection.send(cap, new RawMessage(code, dataSent));
connection.send(cap, new RawMessage(code, BytesValue.wrap(data)));
final Message receivedMessage = messageFuture.get();
final MessageData receivedMessageData = receivedMessage.getData();
final ByteBuf receiveBuffer = NetworkMemoryPool.allocate(size);
receivedMessageData.writeTo(receiveBuffer);
Assertions.assertThat(receiveBuffer.compareTo(dataSent)).isEqualTo(0);
Assertions.assertThat(receivedMessageData.getData().compareTo(BytesValue.wrap(data)))
.isEqualTo(0);
Assertions.assertThat(receivedMessage.getConnection().getPeer().getNodeId())
.isEqualTo(two.getId());
Assertions.assertThat(receivedMessageData.getSize()).isEqualTo(size);

@ -1,26 +0,0 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
public class NetworkMemoryPool {
private static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator();
public static ByteBuf allocate(final int size) {
return ALLOCATOR.ioBuffer(0, size);
}
}

@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.p2p;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork;
import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
@ -115,11 +114,9 @@ public class NetworkRunner implements AutoCloseable {
network.subscribe(
cap,
message -> {
final MessageData data = message.getData();
try {
final int code = message.getData().getCode();
if (!protocol.isValidMessageCode(cap.getVersion(), code)) {
// Handle invalid messsages by disconnecting
// Handle invalid messages by disconnecting
LOG.debug(
"Invalid message code ({}-{}, {}) received from peer, disconnecting from:",
cap.getName(),
@ -130,9 +127,6 @@ public class NetworkRunner implements AutoCloseable {
return;
}
protocolManager.processMessage(cap, message);
} finally {
data.release();
}
});
}
}

@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.api;
import io.netty.buffer.ByteBuf;
import tech.pegasys.pantheon.util.bytes.BytesValue;
/** A P2P Network Message's Data. */
public interface MessageData {
@ -20,7 +20,7 @@ public interface MessageData {
/**
* Returns the size of the message.
*
* @return Number of bytes {@link #writeTo(ByteBuf)} will write to an output buffer.
* @return Number of bytes in this data.
*/
int getSize();
@ -32,15 +32,9 @@ public interface MessageData {
int getCode();
/**
* Puts the message's body into the given {@link ByteBuf}.
* Get the serialized representation for this message
*
* @param output ByteBuf to write the message to
* @return the serialized representation of this message
*/
void writeTo(ByteBuf output);
/** Releases the memory underlying this message. */
void release();
/** Retains (increments its reference count) the memory underlying this message once. */
void retain();
BytesValue getData();
}

@ -90,10 +90,7 @@ final class ApiHandler extends SimpleChannelInboundHandler<MessageData> {
"Received Wire DISCONNECT, but unable to parse reason. Peer: {}",
connection.getPeer().getClientId(),
e);
} finally {
disconnect.release();
}
connection.terminateConnection(reason, true);
}
return;

@ -17,6 +17,7 @@ import static java.util.Comparator.comparing;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.ArrayList;
import java.util.Comparator;
@ -31,7 +32,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableRangeMap;
import com.google.common.collect.ImmutableRangeMap.Builder;
import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
public class CapabilityMultiplexer {
@ -108,18 +108,8 @@ public class CapabilityMultiplexer {
}
@Override
public void writeTo(final ByteBuf output) {
originalMessage.writeTo(output);
}
@Override
public void release() {
originalMessage.release();
}
@Override
public void retain() {
originalMessage.retain();
public BytesValue getData() {
return originalMessage.getData();
}
};
}

@ -71,14 +71,13 @@ final class DeFramer extends ByteToMessageDecoder {
// Decode first hello and use the payload to modify pipeline
final PeerInfo peerInfo;
try {
peerInfo = parsePeerInfo(message);
peerInfo = HelloMessage.readFrom(message).getPeerInfo();
} catch (final RLPException e) {
LOG.debug("Received invalid HELLO message", e);
connectFuture.completeExceptionally(e);
ctx.close();
return;
}
message.release();
LOG.debug("Received HELLO message: {}", peerInfo);
if (peerInfo.getVersion() >= 5) {
LOG.debug("Enable compression for p2pVersion: {}", peerInfo.getVersion());
@ -115,13 +114,6 @@ final class DeFramer extends ByteToMessageDecoder {
}
}
private PeerInfo parsePeerInfo(final MessageData message) {
final HelloMessage helloMessage = HelloMessage.readFrom(message);
final PeerInfo peerInfo = helloMessage.getPeerInfo();
helloMessage.release();
return peerInfo;
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable throwable)
throws Exception {

@ -69,7 +69,6 @@ final class NettyPeerConnection implements PeerConnection {
@Override
public void send(final Capability capability, final MessageData message) throws PeerNotConnected {
if (isDisconnected()) {
message.release();
throw new PeerNotConnected("Attempt to send message to a closed peer connection");
}
if (capability != null) {
@ -77,7 +76,6 @@ final class NettyPeerConnection implements PeerConnection {
final SubProtocol subProtocol = multiplexer.subProtocol(capability);
if (subProtocol == null
|| !subProtocol.isValidMessageCode(capability.getVersion(), message.getCode())) {
message.release();
throw new UnsupportedOperationException(
"Attempt to send unsupported message ("
+ message.getCode()

@ -16,10 +16,8 @@ import static io.netty.buffer.ByteBufUtil.hexDump;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.bouncycastle.pqc.math.linearalgebra.ByteUtils.xor;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.HandshakeSecrets;
import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.rlp.RlpUtils;
@ -254,7 +252,7 @@ public class Framer {
final int id = idbv.isZero() || idbv.size() == 0 ? 0 : idbv.get(0);
// Write message data to ByteBuf, decompressing as necessary
final ByteBuf data;
final BytesValue data;
if (compressionEnabled) {
// Decompress data before writing to ByteBuf
final byte[] compressedMessageData = Arrays.copyOfRange(frameData, 1, frameData.length - pad);
@ -263,13 +261,11 @@ public class Framer {
compressor.uncompressedLength(compressedMessageData) < LENGTH_MAX_MESSAGE_FRAME,
"Message size in excess of maximum length.");
final byte[] decompressedMessageData = compressor.decompress(compressedMessageData);
data = NetworkMemoryPool.allocate(decompressedMessageData.length);
data.writeBytes(decompressedMessageData);
data = BytesValue.wrap(decompressedMessageData);
} else {
// Move data to a ByteBuf
final int messageLength = frameSize - LENGTH_MESSAGE_ID;
data = NetworkMemoryPool.allocate(messageLength);
data.writeBytes(frameData, 1, messageLength);
data = BytesValue.wrap(frameData, 1, messageLength);
}
return new RawMessage(id, data);
@ -295,31 +291,18 @@ public class Framer {
message.getSize() < LENGTH_MAX_MESSAGE_FRAME, "Message size in excess of maximum length.");
// Compress message
if (compressionEnabled) {
try {
// Extract data from message
final ByteBuf tmp = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(tmp);
// Compress data
final byte[] uncompressed = ByteBufUtils.toByteArray(tmp);
final byte[] compressed = compressor.compress(uncompressed);
tmp.release();
final byte[] compressed = compressor.compress(message.getData().getArrayUnsafe());
// Construct new, compressed message
final ByteBuf compressedBuf = NetworkMemoryPool.allocate(compressed.length);
compressedBuf.writeBytes(compressed);
frameAndReleaseMessage(new RawMessage(message.getCode(), compressedBuf), output);
} finally {
// We have to release the original message because frameAndRelease only released the
// compressed copy.
message.release();
}
frameMessage(new RawMessage(message.getCode(), BytesValue.wrap(compressed)), output);
} else {
frameAndReleaseMessage(message, output);
frameMessage(message, output);
}
}
@VisibleForTesting
void frameAndReleaseMessage(final MessageData message, final ByteBuf buf) {
try {
void frameMessage(final MessageData message, final ByteBuf buf) {
final int frameSize = message.getSize() + LENGTH_MESSAGE_ID;
final int pad = padding16(frameSize);
@ -349,11 +332,8 @@ public class Framer {
f[0] = bv.get(0);
// Zero-padded to 16-byte boundary.
final ByteBuf tmp = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(tmp);
tmp.getBytes(tmp.readerIndex(), f, 1, tmp.readableBytes());
message.getData().copyTo(f, 0, 1);
encryptor.processBytes(f, 0, f.length, f, 0);
tmp.release();
// Calculate the frame MAC.
final byte[] fMacSeed = Arrays.copyOf(secrets.updateEgress(f).getEgressMac(), LENGTH_MAC);
@ -362,9 +342,6 @@ public class Framer {
fMac = Arrays.copyOf(secrets.updateEgress(xor(fMac, fMacSeed)).getEgressMac(), LENGTH_MAC);
buf.writeBytes(f).writeBytes(fMac);
} finally {
message.release();
}
}
private static int padding16(final int size) {

@ -178,7 +178,9 @@ public class ECIESHandshaker implements Handshaker {
buf.markReaderIndex();
final ByteBuf bufferedBytes = buf.readSlice(expectedLength);
BytesValue bytes = BytesValue.wrapBuffer(bufferedBytes);
final byte[] encryptedBytes = new byte[bufferedBytes.readableBytes()];
bufferedBytes.getBytes(0, encryptedBytes);
BytesValue bytes = BytesValue.wrap(encryptedBytes);
BytesValue encryptedMsg = bytes;
try {

@ -1,51 +0,0 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.utils;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import io.netty.buffer.ByteBuf;
/** Utility methods for working with {@link ByteBuf}'s. */
public class ByteBufUtils {
private ByteBufUtils() {}
public static byte[] toByteArray(final ByteBuf buffer) {
final byte[] bytes = new byte[buffer.readableBytes()];
buffer.getBytes(buffer.readerIndex(), bytes);
return bytes;
}
/**
* Creates an {@link RLPInput} for the data in <code>buffer</code>. The data is copied from <code>
* buffer</code> so that the {@link RLPInput} and any data read from it are safe to use even after
* <code>buffer</code> is released.
*
* @param buffer the data to read as RLP
* @return an {@link RLPInput} for the data in <code>buffer</code>
*/
public static RLPInput toRLPInput(final ByteBuf buffer) {
return RLP.input(BytesValue.wrap(toByteArray(buffer)));
}
public static ByteBuf fromRLPOutput(final BytesValueRLPOutput out) {
final ByteBuf data = NetworkMemoryPool.allocate(out.encodedSize());
data.writeBytes(out.encoded().extractArray());
return data;
}
}

@ -13,36 +13,23 @@
package tech.pegasys.pantheon.ethereum.p2p.wire;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import io.netty.buffer.ByteBuf;
import tech.pegasys.pantheon.util.bytes.BytesValue;
public abstract class AbstractMessageData implements MessageData {
protected final ByteBuf data;
protected final BytesValue data;
protected AbstractMessageData(final ByteBuf data) {
protected AbstractMessageData(final BytesValue data) {
this.data = data;
}
@Override
public final int getSize() {
return data.readableBytes();
}
@Override
public final void writeTo(final ByteBuf output) {
data.markReaderIndex();
output.writeBytes(data);
data.resetReaderIndex();
}
@Override
public final void release() {
data.release();
return data.size();
}
@Override
public final void retain() {
data.retain();
public BytesValue getData() {
return data;
}
}

@ -14,8 +14,6 @@ package tech.pegasys.pantheon.ethereum.p2p.wire;
import static tech.pegasys.pantheon.util.bytes.BytesValue.wrap;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -26,8 +24,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import io.netty.buffer.ByteBuf;
/**
* Encapsulates information about a peer, including their protocol version, client ID, capabilities
* and other.
@ -96,16 +92,6 @@ public class PeerInfo {
out.endList();
}
public ByteBuf toByteBuf() {
// TODO: we should have a RLPOutput type based on ByteBuf
final BytesValueRLPOutput out = new BytesValueRLPOutput();
writeTo(out);
final ByteBuf data = NetworkMemoryPool.allocate(out.encodedSize());
data.writeBytes(out.encoded().extractArray());
return data;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("PeerInfo{");

@ -12,13 +12,13 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.wire;
import io.netty.buffer.ByteBuf;
import tech.pegasys.pantheon.util.bytes.BytesValue;
public final class RawMessage extends AbstractMessageData {
private final int code;
public RawMessage(final int code, final ByteBuf data) {
public RawMessage(final int code, final BytesValue data) {
super(data);
this.code = code;
}

@ -15,22 +15,20 @@ package tech.pegasys.pantheon.ethereum.p2p.wire.messages;
import static com.google.common.base.Preconditions.checkArgument;
import static tech.pegasys.pantheon.util.Preconditions.checkGuard;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.WireProtocolException;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.stream.Stream;
import io.netty.buffer.ByteBuf;
public final class DisconnectMessage extends AbstractMessageData {
private DisconnectMessage(final ByteBuf data) {
private DisconnectMessage(final BytesValue data) {
super(data);
}
@ -38,14 +36,12 @@ public final class DisconnectMessage extends AbstractMessageData {
final Data data = new Data(reason);
final BytesValueRLPOutput out = new BytesValueRLPOutput();
data.writeTo(out);
final ByteBuf buf = ByteBufUtils.fromRLPOutput(out);
return new DisconnectMessage(buf);
return new DisconnectMessage(out.encoded());
}
public static DisconnectMessage readFrom(final MessageData message) {
if (message instanceof DisconnectMessage) {
message.retain();
return (DisconnectMessage) message;
}
final int code = message.getCode();
@ -53,9 +49,7 @@ public final class DisconnectMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a DisconnectMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new DisconnectMessage(data);
return new DisconnectMessage(message.getData());
}
@Override
@ -64,7 +58,7 @@ public final class DisconnectMessage extends AbstractMessageData {
}
public DisconnectReason getReason() {
return Data.readFrom(ByteBufUtils.toRLPInput(data)).getReason();
return Data.readFrom(RLP.input(data)).getReason();
}
@Override
@ -105,7 +99,7 @@ public final class DisconnectMessage extends AbstractMessageData {
* @see <a href="https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol">ÐΞVp2p Wire
* Protocol</a>
*/
public static enum DisconnectReason {
public enum DisconnectReason {
REQUESTED((byte) 0x00),
TCP_SUBSYSTEM_ERROR((byte) 0x01),
BREACH_OF_PROTOCOL((byte) 0x02),

@ -13,8 +13,7 @@
package tech.pegasys.pantheon.ethereum.p2p.wire.messages;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import io.netty.buffer.ByteBuf;
import tech.pegasys.pantheon.util.bytes.BytesValue;
/** A message without a body. */
abstract class EmptyMessage implements MessageData {
@ -25,11 +24,7 @@ abstract class EmptyMessage implements MessageData {
}
@Override
public final void writeTo(final ByteBuf output) {}
@Override
public final void release() {}
@Override
public final void retain() {}
public BytesValue getData() {
return BytesValue.EMPTY;
}
}

@ -12,36 +12,28 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.wire.messages;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.utils.ByteBufUtils;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import io.netty.buffer.ByteBuf;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;
public final class HelloMessage extends AbstractMessageData {
private HelloMessage(final ByteBuf data) {
private HelloMessage(final BytesValue data) {
super(data);
}
public static HelloMessage create(final PeerInfo peerInfo) {
final BytesValueRLPOutput out = new BytesValueRLPOutput();
peerInfo.writeTo(out);
final ByteBuf buf = ByteBufUtils.fromRLPOutput(out);
return new HelloMessage(buf);
}
public static HelloMessage create(final ByteBuf data) {
return new HelloMessage(data);
return new HelloMessage(out.encoded());
}
public static HelloMessage readFrom(final MessageData message) {
if (message instanceof HelloMessage) {
message.retain();
return (HelloMessage) message;
}
final int code = message.getCode();
@ -49,9 +41,7 @@ public final class HelloMessage extends AbstractMessageData {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a HelloMessage.", code));
}
final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new HelloMessage(data);
return new HelloMessage(message.getData());
}
@Override
@ -60,7 +50,7 @@ public final class HelloMessage extends AbstractMessageData {
}
public PeerInfo getPeerInfo() {
return PeerInfo.readFrom(ByteBufUtils.toRLPInput(data));
return PeerInfo.readFrom(RLP.input(data));
}
@Override

@ -14,19 +14,18 @@ package tech.pegasys.pantheon.ethereum.p2p.netty;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.netty.CapabilityMultiplexer.ProtocolMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.junit.Test;
public class CapabilityMultiplexerTest {
@ -63,8 +62,7 @@ public class CapabilityMultiplexerTest {
assertThat(multiplexerB.getAgreedCapabilities()).isEqualTo(expectedCaps);
// Multiplex a message and check the value
final ByteBuf ethData = NetworkMemoryPool.allocate(5);
ethData.writeBytes(new byte[] {1, 2, 3, 4, 5});
final BytesValue ethData = BytesValue.of(1, 2, 3, 4, 5);
final int ethCode = 1;
final MessageData ethMessage = new RawMessage(ethCode, ethData);
// Check offset
@ -74,15 +72,13 @@ public class CapabilityMultiplexerTest {
assertThat(multiplexerB.multiplex(eth62, ethMessage).getCode())
.isEqualTo(ethCode + expectedOffset);
// Check data is unchanged
final ByteBuf multiplexedData = NetworkMemoryPool.allocate(ethMessage.getSize());
multiplexerA.multiplex(eth62, ethMessage).writeTo(multiplexedData);
final BytesValue multiplexedData = multiplexerA.multiplex(eth62, ethMessage).getData();
assertThat(multiplexedData).isEqualTo(ethData);
// Demultiplex and check value
final MessageData multiplexedEthMessage = new RawMessage(ethCode + expectedOffset, ethData);
ProtocolMessage demultiplexed = multiplexerA.demultiplex(multiplexedEthMessage);
final ByteBuf demultiplexedData = NetworkMemoryPool.allocate(ethMessage.getSize());
demultiplexed.getMessage().writeTo(demultiplexedData);
final BytesValue demultiplexedData = ethMessage.getData();
// Check returned result
assertThat(demultiplexed.getMessage().getCode()).isEqualTo(ethCode);
assertThat(demultiplexed.getCapability()).isEqualTo(eth62);

@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.HandshakeSecrets;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.io.IOException;
import java.util.List;
@ -52,8 +53,7 @@ public class FramerTest {
final byte[] byteArray = new byte[0xFFFFFF];
new Random().nextBytes(byteArray);
final ByteBuf buf = wrappedBuffer(byteArray);
final MessageData ethMessage = new RawMessage(0x00, buf);
final MessageData ethMessage = new RawMessage(0x00, BytesValue.wrap(byteArray));
final HandshakeSecrets secrets = new HandshakeSecrets(aes, mac, mac);
final Framer framer = new Framer(secrets);
@ -77,11 +77,10 @@ public class FramerTest {
framer.enableCompression();
final byte[] byteArray = Snappy.compress(new byte[0x1000000]);
final ByteBuf buf = wrappedBuffer(byteArray);
final MessageData ethMessage = new RawMessage(0x00, buf);
final MessageData ethMessage = new RawMessage(0x00, BytesValue.wrap(byteArray));
final ByteBuf framedMessage = Unpooled.buffer();
framer.frameAndReleaseMessage(ethMessage, framedMessage);
framer.frameMessage(ethMessage, framedMessage);
final HandshakeSecrets deframeSecrets = secretsFrom(td, true);
final Framer deframer = new Framer(deframeSecrets);

@ -15,10 +15,10 @@ package tech.pegasys.pantheon.ethereum.p2p.wire;
import static io.netty.buffer.ByteBufUtil.decodeHexDump;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import io.netty.buffer.ByteBuf;
import org.junit.Test;
public class WireMessagesSedesTest {
@ -45,7 +45,8 @@ public class WireMessagesSedesTest {
}
private static void assertSedesWorks(final byte[] data) {
final PeerInfo peerInfo = PeerInfo.readFrom(RLP.input(BytesValue.wrap(data)));
final BytesValue input = BytesValue.wrap(data);
final PeerInfo peerInfo = PeerInfo.readFrom(RLP.input(input));
assertThat(peerInfo.getClientId()).isNotBlank();
assertThat(peerInfo.getCapabilities()).isNotEmpty();
@ -54,9 +55,8 @@ public class WireMessagesSedesTest {
assertThat(peerInfo.getVersion()).isEqualTo(5);
// Re-serialize and check that data matches
final ByteBuf buffer = peerInfo.toByteBuf();
final byte[] serialized = new byte[buffer.readableBytes()];
buffer.getBytes(buffer.readerIndex(), serialized);
assertThat(serialized).isEqualTo(data);
final BytesValueRLPOutput out = new BytesValueRLPOutput();
peerInfo.writeTo(out);
assertThat(out.encoded()).isEqualTo(input);
}
}

@ -19,7 +19,6 @@ import java.nio.ByteBuffer;
import java.security.MessageDigest;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
/**
@ -157,30 +156,6 @@ public interface BytesValue extends Comparable<BytesValue> {
return MutableBytesValue.wrapBuffer(buffer, offset, size);
}
/**
* Wraps a full Netty {@link ByteBuf} as a {@link BytesValue}.
*
* @param buffer The buffer to wrap.
* @return A {@link BytesValue} that exposes the bytes of {@code buffer}.
*/
static BytesValue wrapBuffer(final ByteBuf buffer) {
return wrapBuffer(buffer, buffer.readerIndex(), buffer.readableBytes());
}
/**
* Wraps a slice of a Netty {@link ByteBuf} as a {@link BytesValue}.
*
* @param buffer The buffer to wrap.
* @param offset The offset in {@code buffer} from which to expose the bytes in the returned
* value. That is, {@code wrapBuffer(buffer, i, 1).get(0) == buffer.getByte(i)}.
* @param size The size of the returned value.
* @return A {@link BytesValue} that exposes the equivalent of {@code buffer.getBytes(offset,
* offset + size)} (but without copying said bytes).
*/
static BytesValue wrapBuffer(final ByteBuf buffer, final int offset, final int size) {
return MutableBytesValue.wrapBuffer(buffer, offset, size);
}
/**
* Wraps a {@link ByteBuffer} as a {@link BytesValue}.
*
@ -467,15 +442,8 @@ public interface BytesValue extends Comparable<BytesValue> {
}
}
/**
* Appends the bytes of this value to the provided Netty {@link ByteBuf}.
*
* @param buffer The {@link ByteBuf} to which to append this value.
*/
default void appendTo(final ByteBuf buffer) {
for (int i = 0; i < size(); i++) {
buffer.writeByte(get(i));
}
default void copyTo(final byte[] dest, final int srcPos, final int destPos) {
System.arraycopy(getArrayUnsafe(), srcPos, dest, destPos, size() - srcPos);
}
/**

@ -30,8 +30,6 @@ import java.util.Collection;
import java.util.function.Function;
import com.google.common.io.BaseEncoding;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.core.buffer.Buffer;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -72,12 +70,6 @@ public class BytesValueImplementationsTest {
(BytesValueCreator) (b) -> BytesValue.wrapBuffer(buffer(b)),
(BytesValueSliceCreator) (b, start, len) -> BytesValue.wrapBuffer(buffer(b), start, len)
},
{
"BytesValue.wrapBuffer() (Netty ByteBuf)",
(BytesValueCreator) (b) -> BytesValue.wrapBuffer(byteBuf(b)),
(BytesValueSliceCreator)
(b, start, len) -> BytesValue.wrapBuffer(byteBuf(b), start, len)
},
{
"BytesValue.wrapBuffer() (nio ByteBuffer)",
(BytesValueCreator) (b) -> BytesValue.wrapBuffer(byteBuffer(b)),
@ -95,12 +87,6 @@ public class BytesValueImplementationsTest {
(BytesValueSliceCreator)
(b, start, len) -> MutableBytesValue.wrapBuffer(buffer(b), start, len)
},
{
"MutableBytesValue.wrapBuffer() (Netty ByteBuf)",
(BytesValueCreator) (b) -> MutableBytesValue.wrapBuffer(byteBuf(b)),
(BytesValueSliceCreator)
(b, start, len) -> MutableBytesValue.wrapBuffer(byteBuf(b), start, len)
},
{
"MutableBytesValue.wrapBuffer() (nio ByteBuffer)",
(BytesValueCreator) (b) -> MutableBytesValue.wrapBuffer(byteBuffer(b)),
@ -123,12 +109,6 @@ public class BytesValueImplementationsTest {
(BytesValueSliceCreator)
(b, start, len) -> new MutableBufferWrappingBytesValue(buffer(b), start, len)
},
{
MutableByteBufWrappingBytesValue.class.getSimpleName(),
(BytesValueCreator) (b) -> new MutableByteBufWrappingBytesValue(byteBuf(b)),
(BytesValueSliceCreator)
(b, start, len) -> new MutableByteBufWrappingBytesValue(byteBuf(b), start, len),
},
{
MutableByteBufferWrappingBytesValue.class.getSimpleName(),
(BytesValueCreator) (b) -> new MutableByteBufferWrappingBytesValue(byteBuffer(b)),
@ -143,10 +123,6 @@ public class BytesValueImplementationsTest {
return ByteBuffer.wrap(bytes);
}
private static ByteBuf byteBuf(final byte[] bytes) {
return Unpooled.copiedBuffer(bytes);
}
private static Buffer buffer(final byte[] bytes) {
return Buffer.buffer(bytes);
}
@ -155,10 +131,8 @@ public class BytesValueImplementationsTest {
return Buffer.buffer(fromHexString(hex).getArrayUnsafe());
}
private static ByteBuf hexToByteBuf(final String hex) {
final byte[] bytes = fromHexString(hex).getArrayUnsafe();
return Unpooled.unreleasableBuffer(Unpooled.buffer(bytes.length, Integer.MAX_VALUE))
.writeBytes(bytes);
private static byte[] hexToByteArray(final String hex) {
return fromHexString(hex).getArrayUnsafe();
}
private BytesValue fromHex(final String hex) {
@ -166,7 +140,7 @@ public class BytesValueImplementationsTest {
if (hex.substring(0, 2).equals("0x")) {
hexVal = hex.substring((2));
}
byte[] bytes = BaseEncoding.base16().decode(hexVal);
final byte[] bytes = BaseEncoding.base16().decode(hexVal);
return creator.create(bytes);
}
@ -354,43 +328,38 @@ public class BytesValueImplementationsTest {
}
@Test
public void appending() {
assertAppendTo(BytesValue.EMPTY, Buffer.buffer(), BytesValue.EMPTY);
assertAppendTo(BytesValue.EMPTY, hexToBuffer("0x1234"), fromHex("0x1234"));
assertAppendTo(fromHex("0x1234"), Buffer.buffer(), fromHex("0x1234"));
assertAppendTo(fromHex("0x5678"), hexToBuffer("0x1234"), fromHex("0x12345678"));
public void appendingToBuffer() {
assertAppendToBuffer(BytesValue.EMPTY, Buffer.buffer(), BytesValue.EMPTY);
assertAppendToBuffer(BytesValue.EMPTY, hexToBuffer("0x1234"), fromHex("0x1234"));
assertAppendToBuffer(fromHex("0x1234"), Buffer.buffer(), fromHex("0x1234"));
assertAppendToBuffer(fromHex("0x5678"), hexToBuffer("0x1234"), fromHex("0x12345678"));
}
private void assertAppendTo(
private void assertAppendToBuffer(
final BytesValue toAppend, final Buffer buffer, final BytesValue expected) {
toAppend.appendTo(buffer);
assertEquals(expected, BytesValue.wrap(buffer.getBytes()));
}
@Test
public void appendingToByteBuf() {
final byte[] bytes0 = new byte[0];
final byte[] bytes1 = new byte[0];
assertAppendToByteBuf(
BytesValue.EMPTY,
Unpooled.unreleasableBuffer(Unpooled.buffer(bytes0.length, Integer.MAX_VALUE))
.writeBytes(bytes0),
BytesValue.EMPTY);
assertAppendToByteBuf(BytesValue.EMPTY, hexToByteBuf("0x1234"), fromHex("0x1234"));
assertAppendToByteBuf(
fromHex("0x1234"),
Unpooled.unreleasableBuffer(Unpooled.buffer(bytes1.length, Integer.MAX_VALUE))
.writeBytes(bytes1),
fromHex("0x1234"));
assertAppendToByteBuf(fromHex("0x5678"), hexToByteBuf("0x1234"), fromHex("0x12345678"));
}
private void assertAppendToByteBuf(
final BytesValue toAppend, final ByteBuf buffer, final BytesValue expected) {
toAppend.appendTo(buffer);
final byte[] arr = new byte[buffer.writerIndex()];
buffer.getBytes(0, arr);
assertEquals(expected, BytesValue.wrap(arr));
public void copyingToByteArray() {
assertCopyToByteArray(BytesValue.EMPTY, 0, new byte[0], 0, BytesValue.EMPTY);
assertCopyToByteArray(BytesValue.EMPTY, 0, hexToByteArray("0x1234"), 2, fromHex("0x1234"));
assertCopyToByteArray(fromHex("0x1234"), 0, new byte[2], 0, fromHex("0x1234"));
assertCopyToByteArray(fromHex("0x1234"), 1, new byte[2], 0, fromHex("0x3400"));
assertCopyToByteArray(fromHex("0x1234"), 1, new byte[2], 1, fromHex("0x0034"));
assertCopyToByteArray(
fromHex("0x5678"), 0, Arrays.copyOf(hexToByteArray("0x1234"), 4), 2, fromHex("0x12345678"));
}
private void assertCopyToByteArray(
final BytesValue toAppend,
final int srcPos,
final byte[] dest,
final int destPos,
final BytesValue expected) {
toAppend.copyTo(dest, srcPos, destPos);
assertEquals(expected, BytesValue.wrap(dest));
}
@SuppressWarnings("DoNotInvokeMessageDigestDirectly")
@ -646,11 +615,11 @@ public class BytesValueImplementationsTest {
@FunctionalInterface
private interface BytesValueCreator {
public BytesValue create(byte[] bytes);
BytesValue create(byte[] bytes);
}
@FunctionalInterface
private interface BytesValueSliceCreator {
public BytesValue create(byte[] bytes, int start, int length);
BytesValue create(byte[] bytes, int start, int length);
}
}

Loading…
Cancel
Save