[Issue-3867] Limit outbound eth message sizes (#4034)

Signed-off-by: Meredith Baxter <meredith.baxter@palm.io>
pull/4059/head
mbaxter 2 years ago committed by GitHub
parent 6aa88129eb
commit 3dce6a93ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 8
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  3. 7
      consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/protocol/Istanbul99ProtocolManagerTest.java
  4. 4
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/AdminJsonRpcHttpServiceTest.java
  5. 10
      ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminPeersTest.java
  6. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java
  7. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  8. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  9. 187
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthServer.java
  10. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/BlockBodiesMessage.java
  11. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/BlockHeadersMessage.java
  12. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NodeDataMessage.java
  13. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/PooledTransactionsMessage.java
  14. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/ReceiptsMessage.java
  15. 10
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java
  16. 24
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  17. 393
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthServerTest.java
  18. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java
  19. 15
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java
  20. 9
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  21. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java
  22. 14
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java
  23. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java
  24. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
  25. 9
      ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java
  26. 10
      ethereum/rlp/src/main/java/org/hyperledger/besu/ethereum/rlp/AbstractRLPOutput.java
  27. 5
      ethereum/rlp/src/main/java/org/hyperledger/besu/ethereum/rlp/RLP.java

@ -12,6 +12,7 @@
### Bug Fixes
- Fixed a snapsync issue that can sometimes block the healing step [#3920](https://github.com/hyperledger/besu/pull/3920)
- Support free gas networks in the London fee market [#4003](https://github.com/hyperledger/besu/pull/4003)
- Limit the size of outgoing eth subprotocol messages. [#4034](https://github.com/hyperledger/besu/pull/4034)
- Fixed a state root mismatch issue on bonsai that may appear occasionally [#4041](https://github.com/hyperledger/besu/pull/4041)
## 22.4.3

@ -336,9 +336,15 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
prunerConfiguration));
}
}
final int maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize();
final EthPeers ethPeers =
new EthPeers(
getSupportedProtocol(), clock, metricsSystem, maxPeers, messagePermissioningProviders);
getSupportedProtocol(),
clock,
metricsSystem,
maxPeers,
maxMessageSize,
messagePermissioningProviders);
final EthMessages ethMessages = new EthMessages();
final EthMessages snapMessages = new EthMessages();

@ -107,7 +107,12 @@ public class Istanbul99ProtocolManagerTest {
final CompletableFuture<Void> done = new CompletableFuture<>();
final EthScheduler ethScheduler = new DeterministicEthScheduler(() -> false);
EthPeers peers =
new EthPeers(Istanbul99Protocol.NAME, TestClock.fixed(), new NoOpMetricsSystem(), 25);
new EthPeers(
Istanbul99Protocol.NAME,
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
final BigInteger networkId = BigInteger.ONE;

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.api.jsonrpc;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo;
@ -76,6 +77,7 @@ public class AdminJsonRpcHttpServiceTest extends JsonRpcHttpServiceTestBase {
"eth",
c -> {},
List.of(),
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
TestClock.fixed(),
Collections.emptyList()));
peerList.add(
@ -84,6 +86,7 @@ public class AdminJsonRpcHttpServiceTest extends JsonRpcHttpServiceTestBase {
"eth",
c -> {},
List.of(),
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
TestClock.fixed(),
Collections.emptyList()));
peerList.add(
@ -92,6 +95,7 @@ public class AdminJsonRpcHttpServiceTest extends JsonRpcHttpServiceTestBase {
"eth",
c -> {},
List.of(),
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
TestClock.fixed(),
Collections.emptyList()));

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorR
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.PeerResult;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.P2PDisabledException;
@ -117,7 +118,14 @@ public class AdminPeersTest {
new InetSocketAddress("1.2.3.4", 9876),
new InetSocketAddress("4.3.2.1", 6789));
final EthPeer ethPeer =
new EthPeer(p, "eth", c -> {}, List.of(), TestClock.fixed(), Collections.emptyList());
new EthPeer(
p,
"eth",
c -> {},
List.of(),
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
TestClock.fixed(),
Collections.emptyList());
return Lists.newArrayList(ethPeer);
}

@ -86,6 +86,7 @@ public class EthPeer implements Comparable<EthPeer> {
private Optional<BlockHeader> checkpointHeader = Optional.empty();
private final String protocolName;
private final int maxMessageSize;
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
private final ChainState chainHeadState = new ChainState();
@ -124,10 +125,12 @@ public class EthPeer implements Comparable<EthPeer> {
final String protocolName,
final Consumer<EthPeer> onStatusesExchanged,
final List<PeerValidator> peerValidators,
final int maxMessageSize,
final Clock clock,
final List<NodeMessagePermissioningProvider> permissioningProviders) {
this.connection = connection;
this.protocolName = protocolName;
this.maxMessageSize = maxMessageSize;
this.clock = clock;
this.permissioningProviders = permissioningProviders;
this.onStatusesExchanged.set(onStatusesExchanged);
@ -243,6 +246,17 @@ public class EthPeer implements Comparable<EthPeer> {
}
return null;
}
// Check message size is within limits
if (messageData.getSize() > maxMessageSize) {
// This is a bug or else a misconfiguration of the max message size.
LOG.error(
"Sending {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}",
protocolName,
connection.getRemoteEnode(),
maxMessageSize,
messageData.getCode(),
messageData.getSize());
}
if (requestManagers.containsKey(protocolName)) {
final Map<Integer, RequestManager> managers = this.requestManagers.get(protocolName);

@ -54,6 +54,7 @@ public class EthPeers {
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
private final int maxPeers;
private final int maxMessageSize;
private final Subscribers<ConnectCallback> connectCallbacks = Subscribers.create();
private final Subscribers<DisconnectCallback> disconnectCallbacks = Subscribers.create();
private final Collection<PendingPeerRequest> pendingRequests = new CopyOnWriteArrayList<>();
@ -62,8 +63,9 @@ public class EthPeers {
final String protocolName,
final Clock clock,
final MetricsSystem metricsSystem,
final int maxPeers) {
this(protocolName, clock, metricsSystem, maxPeers, Collections.emptyList());
final int maxPeers,
final int maxMessageSize) {
this(protocolName, clock, metricsSystem, maxPeers, maxMessageSize, Collections.emptyList());
}
public EthPeers(
@ -71,11 +73,13 @@ public class EthPeers {
final Clock clock,
final MetricsSystem metricsSystem,
final int maxPeers,
final int maxMessageSize,
final List<NodeMessagePermissioningProvider> permissioningProviders) {
this.protocolName = protocolName;
this.clock = clock;
this.permissioningProviders = permissioningProviders;
this.maxPeers = maxPeers;
this.maxMessageSize = maxMessageSize;
metricsSystem.createIntegerGauge(
BesuMetricCategory.PEERS,
"pending_peer_requests_current",
@ -91,6 +95,7 @@ public class EthPeers {
protocolName,
this::invokeConnectionCallbacks,
peerValidators,
maxMessageSize,
clock,
permissioningProviders);
connections.putIfAbsent(peerConnection, peer);

@ -246,7 +246,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
if (messageData.getSize() > maxMessageSize) {
LOG.warn(
"Received message exceeding size limit of {} bytes: {} bytes. Disconnecting from {}",
"Received message (code: {}) exceeding size limit of {} bytes: {} bytes. Disconnecting from {}",
Integer.toString(code, 16),
maxMessageSize,
messageData.getSize(),
ethPeer);

@ -36,15 +36,14 @@ import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import com.google.common.collect.Lists;
import org.apache.tuweni.bytes.Bytes;
class EthServer {
@ -69,47 +68,63 @@ class EthServer {
}
private void registerResponseConstructors() {
final int maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize();
ethMessages.registerResponseConstructor(
EthPV62.GET_BLOCK_HEADERS,
messageData ->
constructGetHeadersResponse(
blockchain,
messageData,
ethereumWireProtocolConfiguration.getMaxGetBlockHeaders()));
ethereumWireProtocolConfiguration.getMaxGetBlockHeaders(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV62.GET_BLOCK_BODIES,
messageData ->
constructGetBodiesResponse(
blockchain, messageData, ethereumWireProtocolConfiguration.getMaxGetBlockBodies()));
blockchain,
messageData,
ethereumWireProtocolConfiguration.getMaxGetBlockBodies(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV63.GET_RECEIPTS,
messageData ->
constructGetReceiptsResponse(
blockchain, messageData, ethereumWireProtocolConfiguration.getMaxGetReceipts()));
blockchain,
messageData,
ethereumWireProtocolConfiguration.getMaxGetReceipts(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV63.GET_NODE_DATA,
messageData ->
constructGetNodeDataResponse(
worldStateArchive,
messageData,
ethereumWireProtocolConfiguration.getMaxGetNodeData()));
ethereumWireProtocolConfiguration.getMaxGetNodeData(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV65.GET_POOLED_TRANSACTIONS,
messageData ->
constructGetPooledTransactionsResponse(
transactionPool,
messageData,
ethereumWireProtocolConfiguration.getMaxGetPooledTransactions()));
ethereumWireProtocolConfiguration.getMaxGetPooledTransactions(),
maxMessageSize));
}
static MessageData constructGetHeadersResponse(
final Blockchain blockchain, final MessageData message, final int requestLimit) {
final Blockchain blockchain,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
// Extract parameters from request
final GetBlockHeadersMessage getHeaders = GetBlockHeadersMessage.readFrom(message);
final Optional<Hash> hash = getHeaders.hash();
final int skip = getHeaders.skip();
final int maxHeaders = Math.min(requestLimit, getHeaders.maxHeaders());
final boolean reversed = getHeaders.reverse();
final BlockHeader firstHeader;
// Query first header by hash or number depending on request arguments
if (hash.isPresent()) {
final Hash startHash = hash.get();
firstHeader = blockchain.getBlockHeader(startHash).orElse(null);
@ -117,34 +132,58 @@ class EthServer {
final long firstNumber = getHeaders.blockNumber().getAsLong();
firstHeader = blockchain.getBlockHeader(firstNumber).orElse(null);
}
final Collection<BlockHeader> resp;
// The initial header was not found, nothing to return
if (firstHeader == null) {
resp = Collections.emptyList();
} else {
resp = Lists.newArrayList(firstHeader);
final long numberDelta = reversed ? -(skip + 1) : (skip + 1);
for (int i = 1; i < maxHeaders; i++) {
final long blockNumber = firstHeader.getNumber() + i * numberDelta;
if (blockNumber < BlockHeader.GENESIS_BLOCK_NUMBER) {
break;
}
final Optional<BlockHeader> maybeHeader = blockchain.getBlockHeader(blockNumber);
if (maybeHeader.isPresent()) {
resp.add(maybeHeader.get());
} else {
break;
}
return BlockHeadersMessage.create(Collections.emptyList());
}
// Encode the first header
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
final Bytes firstEncodedHeader = RLP.encode(firstHeader::writeTo);
if (responseSizeEstimate + firstEncodedHeader.size() > maxMessageSize) {
return BlockHeadersMessage.create(Collections.emptyList());
}
responseSizeEstimate += firstEncodedHeader.size();
rlp.writeRaw(firstEncodedHeader);
// Collect and encode the remaining headers
final long numberDelta = reversed ? -(skip + 1) : (skip + 1);
for (int i = 1; i < maxHeaders; i++) {
final long blockNumber = firstHeader.getNumber() + i * numberDelta;
if (blockNumber < BlockHeader.GENESIS_BLOCK_NUMBER) {
break;
}
final Optional<BlockHeader> maybeHeader = blockchain.getBlockHeader(blockNumber);
if (maybeHeader.isEmpty()) {
break;
}
final BytesValueRLPOutput headerRlp = new BytesValueRLPOutput();
maybeHeader.get().writeTo(headerRlp);
final int encodedSize = headerRlp.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(headerRlp.encoded());
}
return BlockHeadersMessage.create(resp);
rlp.endList();
return BlockHeadersMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetBodiesResponse(
final Blockchain blockchain, final MessageData message, final int requestLimit) {
final Blockchain blockchain,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
final GetBlockBodiesMessage getBlockBodiesMessage = GetBlockBodiesMessage.readFrom(message);
final Iterable<Hash> hashes = getBlockBodiesMessage.hashes();
final Collection<BlockBody> bodies = new ArrayList<>();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
@ -152,20 +191,35 @@ class EthServer {
}
count++;
final Optional<BlockBody> maybeBody = blockchain.getBlockBody(hash);
if (!maybeBody.isPresent()) {
if (maybeBody.isEmpty()) {
continue;
}
bodies.add(maybeBody.get());
final BlockBody body = maybeBody.get();
final BytesValueRLPOutput bodyOutput = new BytesValueRLPOutput();
body.writeTo(bodyOutput);
final int encodedSize = bodyOutput.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(bodyOutput.encoded());
}
return BlockBodiesMessage.create(bodies);
rlp.endList();
return BlockBodiesMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetReceiptsResponse(
final Blockchain blockchain, final MessageData message, final int requestLimit) {
final Blockchain blockchain,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
final GetReceiptsMessage getReceipts = GetReceiptsMessage.readFrom(message);
final Iterable<Hash> hashes = getReceipts.hashes();
final List<List<TransactionReceipt>> receipts = new ArrayList<>();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
@ -173,21 +227,38 @@ class EthServer {
}
count++;
final Optional<List<TransactionReceipt>> maybeReceipts = blockchain.getTxReceipts(hash);
if (!maybeReceipts.isPresent()) {
if (maybeReceipts.isEmpty()) {
continue;
}
receipts.add(maybeReceipts.get());
final BytesValueRLPOutput encodedReceipts = new BytesValueRLPOutput();
encodedReceipts.startList();
maybeReceipts.get().forEach(r -> r.writeTo(encodedReceipts));
encodedReceipts.endList();
final int encodedSize = encodedReceipts.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(encodedReceipts.encoded());
}
return ReceiptsMessage.create(receipts);
rlp.endList();
return ReceiptsMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetPooledTransactionsResponse(
final TransactionPool transactionPool, final MessageData message, final int requestLimit) {
final TransactionPool transactionPool,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
final GetPooledTransactionsMessage getPooledTransactions =
GetPooledTransactionsMessage.readFrom(message);
final Iterable<Hash> hashes = getPooledTransactions.pooledTransactions();
final List<Transaction> tx = new ArrayList<>();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
@ -198,19 +269,32 @@ class EthServer {
if (maybeTx.isEmpty()) {
continue;
}
tx.add(maybeTx.get());
final BytesValueRLPOutput txRlp = new BytesValueRLPOutput();
maybeTx.get().writeTo(txRlp);
final int encodedSize = txRlp.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(txRlp.encoded());
}
return PooledTransactionsMessage.create(tx);
rlp.endList();
return PooledTransactionsMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetNodeDataResponse(
final WorldStateArchive worldStateArchive,
final MessageData message,
final int requestLimit) {
final int requestLimit,
final int maxMessageSize) {
final GetNodeDataMessage getNodeDataMessage = GetNodeDataMessage.readFrom(message);
final Iterable<Hash> hashes = getNodeDataMessage.hashes();
final List<Bytes> nodeData = new ArrayList<>();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
@ -218,8 +302,23 @@ class EthServer {
}
count++;
worldStateArchive.getNodeData(hash).ifPresent(nodeData::add);
final Optional<Bytes> maybeNodeData = worldStateArchive.getNodeData(hash);
if (maybeNodeData.isEmpty()) {
continue;
}
final BytesValueRLPOutput rlpNodeData = new BytesValueRLPOutput();
rlpNodeData.writeBytes(maybeNodeData.get());
final int encodedSize = rlpNodeData.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(rlpNodeData.encoded());
}
return NodeDataMessage.create(nodeData);
rlp.endList();
return NodeDataMessage.createUnsafe(rlp.encoded());
}
}

@ -47,6 +47,17 @@ public final class BlockBodiesMessage extends AbstractMessageData {
return new BlockBodiesMessage(tmp.encoded());
}
/**
* Create a message with raw, already encoded body data. No checks are performed to validate the
* rlp-encoded data.
*
* @param data An rlp-encoded list of block bodies
* @return A new BlockBodiesMessage
*/
public static BlockBodiesMessage createUnsafe(final Bytes data) {
return new BlockBodiesMessage(data);
}
private BlockBodiesMessage(final Bytes data) {
super(data);
}

@ -56,6 +56,17 @@ public final class BlockHeadersMessage extends AbstractMessageData {
return new BlockHeadersMessage(tmp.encoded());
}
/**
* Create a message with raw, already encoded data. No checks are performed to validate the
* rlp-encoded data.
*
* @param data An rlp-encoded list of headers
* @return A new BlockHeadersMessage
*/
public static BlockHeadersMessage createUnsafe(final Bytes data) {
return new BlockHeadersMessage(data);
}
private BlockHeadersMessage(final Bytes data) {
super(data);
}

@ -47,6 +47,17 @@ public final class NodeDataMessage extends AbstractMessageData {
return new NodeDataMessage(tmp.encoded());
}
/**
* Create a message with raw, already encoded data. No checks are performed to validate the
* rlp-encoded data.
*
* @param data An rlp-encoded list of node data
* @return A new NodeDataMessage
*/
public static NodeDataMessage createUnsafe(final Bytes data) {
return new NodeDataMessage(data);
}
private NodeDataMessage(final Bytes data) {
super(data);
}

@ -45,6 +45,17 @@ public final class PooledTransactionsMessage extends AbstractMessageData {
return new PooledTransactionsMessage(out.encoded());
}
/**
* Create a message with raw, already encoded data. No checks are performed to validate the
* rlp-encoded data.
*
* @param data An rlp-encoded list of transactions
* @return A new PooledTransactionsMessage
*/
public static PooledTransactionsMessage createUnsafe(final Bytes data) {
return new PooledTransactionsMessage(data);
}
public static PooledTransactionsMessage readFrom(final MessageData message) {
if (message instanceof PooledTransactionsMessage) {
return (PooledTransactionsMessage) message;

@ -53,6 +53,17 @@ public final class ReceiptsMessage extends AbstractMessageData {
return new ReceiptsMessage(tmp.encoded());
}
/**
* Create a message with raw, already encoded data. No checks are performed to validate the
* rlp-encoded data.
*
* @param data An rlp-encoded list of sets of receipts
* @return A new ReceiptsMessage
*/
public static ReceiptsMessage createUnsafe(final Bytes data) {
return new ReceiptsMessage(data);
}
private ReceiptsMessage(final Bytes data) {
super(data);
}

@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
@ -462,6 +463,7 @@ public class EthPeerTest {
"foo",
onPeerReady,
Collections.emptyList(),
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
clock,
Collections.emptyList());
}
@ -474,7 +476,13 @@ public class EthPeerTest {
// Use a non-eth protocol name to ensure that EthPeer with sub-protocols such as Istanbul
// that extend the sub-protocol work correctly
return new EthPeer(
peerConnection, "foo", onPeerReady, peerValidators, clock, permissioningProviders);
peerConnection,
"foo",
onPeerReady,
peerValidators,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
clock,
permissioningProviders);
}
@FunctionalInterface

@ -137,7 +137,13 @@ public class EthProtocolManagerTestUtil {
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration configuration) {
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem(), 25);
EthPeers peers =
new EthPeers(
EthProtocol.NAME,
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
return create(
@ -158,7 +164,13 @@ public class EthProtocolManagerTestUtil {
final TransactionPool transactionPool,
final EthProtocolConfiguration configuration,
final ForkIdManager forkIdManager) {
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem(), 25);
EthPeers peers =
new EthPeers(
EthProtocol.NAME,
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
return create(
@ -175,7 +187,13 @@ public class EthProtocolManagerTestUtil {
public static EthProtocolManager create(
final Blockchain blockchain, final EthScheduler ethScheduler) {
EthPeers peers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), new NoOpMetricsSystem(), 25);
EthPeers peers =
new EthPeers(
EthProtocol.NAME,
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
EthMessages messages = new EthMessages();
return create(

@ -14,84 +14,393 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetPooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.junit.Before;
import org.junit.Test;
public class EthServerTest {
private static final Bytes VALUE1 = Bytes.of(1);
private static final Bytes VALUE2 = Bytes.of(2);
private static final Bytes VALUE3 = Bytes.of(3);
private static final Hash HASH1 = Hash.hash(VALUE1);
private static final Hash HASH2 = Hash.hash(VALUE2);
private static final Hash HASH3 = Hash.hash(VALUE3);
private final BlockDataGenerator dataGenerator = new BlockDataGenerator(0);
private final Blockchain blockchain = mock(Blockchain.class);
private final WorldStateArchive worldStateArchive = mock(WorldStateArchive.class);
private final TransactionPool transactionPool = mock(TransactionPool.class);
private final EthPeer ethPeer = mock(EthPeer.class);
private final EthMessages ethMessages = new EthMessages();
@Before
public void setUp() {
@Test
public void shouldHandleDataBeingUnavailableWhenRespondingToNodeDataRequests() {
final Map<Hash, Bytes> nodeData = setupNodeData(1);
setupEthServer();
final List<Hash> hashes = new ArrayList<>(nodeData.keySet());
hashes.add(dataGenerator.hash()); // Add unknown hash
final List<Bytes> expectedResult = new ArrayList<>(nodeData.values());
assertThat(ethMessages.dispatch(new EthMessage(ethPeer, GetNodeDataMessage.create(hashes))))
.contains(NodeDataMessage.create(expectedResult));
}
@Test
public void shouldLimitNumberOfResponsesToNodeDataRequests() {
final int limit = 2;
final EthProtocolConfiguration ethConfig =
EthProtocolConfiguration.builder()
.maxGetBlockHeaders(limit)
.maxGetBlockBodies(limit)
.maxGetReceipts(limit)
.maxGetNodeData(limit)
.maxGetPooledTransactions(limit)
.build();
final Map<Hash, Bytes> nodeData = setupNodeData(3);
setupEthServer(b -> b.maxGetNodeData(limit));
new EthServer(blockchain, worldStateArchive, transactionPool, ethMessages, ethConfig);
final List<Hash> hashes = new ArrayList<>(nodeData.keySet());
final List<Bytes> expectedResult =
hashes.stream().limit(limit).map(nodeData::get).collect(Collectors.toList());
assertThat(ethMessages.dispatch(new EthMessage(ethPeer, GetNodeDataMessage.create(hashes))))
.contains(NodeDataMessage.create(expectedResult));
}
@Test
public void shouldLimitTheNumberOfNodeDataResponsesLookedUpNotTheNumberReturned() {
final Map<Hash, Bytes> nodeData = setupNodeData(2);
setupEthServer(b -> b.maxGetNodeData(2));
final List<Hash> knownHashes = new ArrayList<>(nodeData.keySet());
final List<Hash> hashes =
List.of(
knownHashes.get(0),
dataGenerator.hash(), // Insert a hash that will return an empty response
knownHashes.get(1));
final List<Bytes> expectedResult = singletonList(nodeData.get(knownHashes.get(0)));
assertThat(ethMessages.dispatch(new EthMessage(ethPeer, GetNodeDataMessage.create(hashes))))
.contains(NodeDataMessage.create(expectedResult));
}
@Test
public void shouldLimitNodeDataByMessageSize() {
final Map<Hash, Bytes> nodeData = setupNodeData(10);
final List<Hash> hashes = new ArrayList<>(nodeData.keySet());
int sizeLimit = RLP.MAX_PREFIX_SIZE;
final List<Bytes> expectedResult = new ArrayList<>();
for (int i = 0; i < 4; i++) {
final Bytes data = nodeData.get(hashes.get(i));
expectedResult.add(data);
sizeLimit += calculateRlpEncodedSize(data);
}
final int messageSizeLimit = sizeLimit;
setupEthServer(b -> b.maxMessageSize(messageSizeLimit));
assertThat(ethMessages.dispatch(new EthMessage(ethPeer, GetNodeDataMessage.create(hashes))))
.contains(NodeDataMessage.create(expectedResult));
}
@Test
public void shouldLimitBlockHeadersByMessageSize() {
final List<Block> blocks = setupBlocks(10);
final List<BlockHeader> expectedHeaders = new ArrayList<>();
int sizeLimit = RLP.MAX_PREFIX_SIZE;
for (int i = 0; i < 4; i++) {
final BlockHeader header = blocks.get(i).getHeader();
sizeLimit += calculateRlpEncodedSize(header);
expectedHeaders.add(header);
}
final int msgSizeLimit = sizeLimit;
setupEthServer(b -> b.maxMessageSize(msgSizeLimit));
// Request all blocks, which will exceed the limit
final BlockHeader firstHeader = blocks.get(0).getHeader();
final GetBlockHeadersMessage headersMsg =
GetBlockHeadersMessage.create(firstHeader.getHash(), blocks.size(), 0, false);
final EthMessage ethMsg = new EthMessage(ethPeer, headersMsg);
// Check response
final BlockHeadersMessage expectedMsg = BlockHeadersMessage.create(expectedHeaders);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
@Test
public void shouldLimitBlockHeadersByCount() {
final int blockCount = 10;
final int limit = 6;
final List<Block> blocks = setupBlocks(blockCount);
final List<BlockHeader> expectedHeaders =
blocks.stream().limit(limit).map(Block::getHeader).collect(Collectors.toList());
setupEthServer(b -> b.maxGetBlockHeaders(limit));
// Request all blocks, which will exceed the limit
final BlockHeader firstHeader = blocks.get(0).getHeader();
final GetBlockHeadersMessage headersMsg =
GetBlockHeadersMessage.create(firstHeader.getHash(), blocks.size(), 0, false);
final EthMessage ethMsg = new EthMessage(ethPeer, headersMsg);
// Check response
final BlockHeadersMessage expectedMsg = BlockHeadersMessage.create(expectedHeaders);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
@Test
public void shouldLimitBlockBodiesByMessageSize() {
final List<Block> blocks = setupBlocks(10);
final List<BlockBody> expectedBodies = new ArrayList<>();
int sizeLimit = RLP.MAX_PREFIX_SIZE;
for (int i = 0; i < 4; i++) {
final BlockBody body = blocks.get(i).getBody();
sizeLimit += calculateRlpEncodedSize(body);
expectedBodies.add(body);
}
final int msgSizeLimit = sizeLimit;
setupEthServer(b -> b.maxMessageSize(msgSizeLimit));
// Request all blocks, which will exceed the limit
final List<Hash> blockHashes = blocks.stream().map(Block::getHash).collect(Collectors.toList());
final GetBlockBodiesMessage bodiesMsg = GetBlockBodiesMessage.create(blockHashes);
final EthMessage ethMsg = new EthMessage(ethPeer, bodiesMsg);
// Check response
final BlockBodiesMessage expectedMsg = BlockBodiesMessage.create(expectedBodies);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
@Test
public void shouldHandleDataBeingUnavailableWhenRespondingToNodeDataRequests() throws Exception {
when(worldStateArchive.getNodeData(HASH1)).thenReturn(Optional.of(VALUE1));
when(worldStateArchive.getNodeData(HASH2)).thenReturn(Optional.empty());
assertThat(
ethMessages.dispatch(
new EthMessage(ethPeer, GetNodeDataMessage.create(asList(HASH1, HASH2)))))
.contains(NodeDataMessage.create(singletonList(VALUE1)));
public void shouldLimitBlockBodiesByCount() {
final int blockCount = 10;
final int limit = 6;
final List<Block> blocks = setupBlocks(blockCount);
final List<BlockBody> expectedBodies =
blocks.stream().limit(limit).map(Block::getBody).collect(Collectors.toList());
setupEthServer(b -> b.maxGetBlockBodies(limit));
// Request all blocks, which will exceed the limit
final List<Hash> blockHashes = blocks.stream().map(Block::getHash).collect(Collectors.toList());
final GetBlockBodiesMessage bodiesMsg = GetBlockBodiesMessage.create(blockHashes);
final EthMessage ethMsg = new EthMessage(ethPeer, bodiesMsg);
// Check response
final BlockBodiesMessage expectedMsg = BlockBodiesMessage.create(expectedBodies);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
@Test
public void shouldLimitNumberOfResponsesToNodeDataRequests() throws Exception {
when(worldStateArchive.getNodeData(HASH1)).thenReturn(Optional.of(VALUE1));
when(worldStateArchive.getNodeData(HASH2)).thenReturn(Optional.of(VALUE2));
assertThat(
ethMessages.dispatch(
new EthMessage(ethPeer, GetNodeDataMessage.create(asList(HASH1, HASH2, HASH3)))))
.contains(NodeDataMessage.create(asList(VALUE1, VALUE2)));
public void shouldLimitTxReceiptsByMessageSize() {
final Map<Hash, List<TransactionReceipt>> receiptsByHash = setupBlockReceipts(10);
final List<Hash> hashes = new ArrayList<>(receiptsByHash.keySet());
final List<List<TransactionReceipt>> expectedResults = new ArrayList<>();
int sizeLimit = RLP.MAX_PREFIX_SIZE;
for (int i = 0; i < 4; i++) {
final List<TransactionReceipt> receipts = receiptsByHash.get(hashes.get(i));
sizeLimit += calculateRlpEncodedSize(receipts);
expectedResults.add(receipts);
}
final int msgSizeLimit = sizeLimit;
setupEthServer(b -> b.maxMessageSize(msgSizeLimit));
// Request all records, which will exceed the limit
final GetReceiptsMessage bodiesMsg = GetReceiptsMessage.create(hashes);
final EthMessage ethMsg = new EthMessage(ethPeer, bodiesMsg);
// Check response
final ReceiptsMessage expectedMsg = ReceiptsMessage.create(expectedResults);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
@Test
public void shouldLimitTheNumberOfNodeDataResponsesLookedUpNotTheNumberReturned()
throws Exception {
when(worldStateArchive.getNodeData(HASH1)).thenReturn(Optional.of(VALUE1));
when(worldStateArchive.getNodeData(HASH2)).thenReturn(Optional.empty());
when(worldStateArchive.getNodeData(HASH3)).thenReturn(Optional.of(VALUE3));
assertThat(
ethMessages.dispatch(
new EthMessage(ethPeer, GetNodeDataMessage.create(asList(HASH1, HASH2, HASH3)))))
.contains(NodeDataMessage.create(singletonList(VALUE1)));
public void shouldLimitTxReceiptsByCount() {
final int limit = 6;
final Map<Hash, List<TransactionReceipt>> receiptsByHash = setupBlockReceipts(10);
final List<Hash> hashes = new ArrayList<>(receiptsByHash.keySet());
final List<List<TransactionReceipt>> expectedResults =
receiptsByHash.values().stream().limit(limit).collect(Collectors.toList());
setupEthServer(b -> b.maxGetReceipts(limit));
// Request all records, which will exceed the limit
final GetReceiptsMessage bodiesMsg = GetReceiptsMessage.create(hashes);
final EthMessage ethMsg = new EthMessage(ethPeer, bodiesMsg);
// Check response
final ReceiptsMessage expectedMsg = ReceiptsMessage.create(expectedResults);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
@Test
public void shouldLimitTransactionsByMessageSize() {
final List<Transaction> transactions = setupTransactions(10);
final List<Transaction> expectedResult = new ArrayList<>();
int sizeLimit = RLP.MAX_PREFIX_SIZE;
for (int i = 0; i < 4; i++) {
final Transaction tx = transactions.get(i);
sizeLimit += calculateRlpEncodedSize(tx);
expectedResult.add(tx);
}
final int msgSizeLimit = sizeLimit;
setupEthServer(b -> b.maxMessageSize(msgSizeLimit));
// Request all hashes, which will exceed the limit
final List<Hash> hashes =
transactions.stream().map(Transaction::getHash).collect(Collectors.toList());
final GetPooledTransactionsMessage msgData = GetPooledTransactionsMessage.create(hashes);
final EthMessage ethMsg = new EthMessage(ethPeer, msgData);
// Check response
final PooledTransactionsMessage expectedMsg = PooledTransactionsMessage.create(expectedResult);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
@Test
public void shouldLimitTransactionsByCount() {
final int limit = 6;
final List<Transaction> transactions = setupTransactions(10);
final List<Transaction> expectedResult =
transactions.stream().limit(limit).collect(Collectors.toList());
setupEthServer(b -> b.maxGetPooledTransactions(limit));
// Request all hashes, which will exceed the limit
final List<Hash> hashes =
transactions.stream().map(Transaction::getHash).collect(Collectors.toList());
final GetPooledTransactionsMessage msgData = GetPooledTransactionsMessage.create(hashes);
final EthMessage ethMsg = new EthMessage(ethPeer, msgData);
// Check response
final PooledTransactionsMessage expectedMsg = PooledTransactionsMessage.create(expectedResult);
final Optional<MessageData> result = ethMessages.dispatch(ethMsg);
assertThat(result).contains(expectedMsg);
}
private void setupEthServer() {
setupEthServer(Function.identity());
}
private void setupEthServer(
final Function<EthProtocolConfiguration.Builder, EthProtocolConfiguration.Builder>
configModifier) {
final EthProtocolConfiguration.Builder configBuilder = EthProtocolConfiguration.builder();
final EthProtocolConfiguration ethConfig = configModifier.apply(configBuilder).build();
new EthServer(blockchain, worldStateArchive, transactionPool, ethMessages, ethConfig);
}
private Map<Hash, Bytes> setupNodeData(final int count) {
// Return empty value unless otherwise specified
when(worldStateArchive.getNodeData(any())).thenReturn(Optional.empty());
final Map<Hash, Bytes> nodeDataByHash = new HashMap<>();
for (int i = 0; i < count; i++) {
final Hash hash = dataGenerator.hash();
final Bytes data = dataGenerator.bytesValue(10, 30);
when(worldStateArchive.getNodeData(hash)).thenReturn(Optional.of(data));
nodeDataByHash.put(hash, data);
}
return nodeDataByHash;
}
private List<Block> setupBlocks(final int count) {
final List<Block> blocks = dataGenerator.blockSequence(count);
for (Block block : blocks) {
when(blockchain.getBlockBody(block.getHash())).thenReturn(Optional.of(block.getBody()));
when(blockchain.getBlockHeader(block.getHash())).thenReturn(Optional.of(block.getHeader()));
when(blockchain.getBlockHeader(block.getHeader().getNumber()))
.thenReturn(Optional.of(block.getHeader()));
}
return blocks;
}
private Map<Hash, List<TransactionReceipt>> setupBlockReceipts(final int count) {
final Map<Hash, List<TransactionReceipt>> txReceiptsByHash = new HashMap<>();
final List<Block> blocks = dataGenerator.blockSequence(count);
for (Block block : blocks) {
final List<TransactionReceipt> receipts = dataGenerator.receipts(block);
when(blockchain.getTxReceipts(block.getHash())).thenReturn(Optional.of(receipts));
txReceiptsByHash.put(block.getHash(), receipts);
}
return txReceiptsByHash;
}
private List<Transaction> setupTransactions(final int count) {
List<Transaction> txs =
Stream.generate(dataGenerator::transaction).limit(count).collect(Collectors.toList());
for (Transaction tx : txs) {
when(transactionPool.getTransactionByHash(tx.getHash())).thenReturn(Optional.of(tx));
}
return txs;
}
private int calculateRlpEncodedSize(final BlockBody blockBody) {
return RLP.encode(blockBody::writeTo).size();
}
private int calculateRlpEncodedSize(final BlockHeader header) {
return RLP.encode(header::writeTo).size();
}
private int calculateRlpEncodedSize(final Transaction tx) {
return RLP.encode(tx::writeTo).size();
}
private int calculateRlpEncodedSize(final Bytes data) {
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.writeBytes(data);
return rlp.encodedSize();
}
private int calculateRlpEncodedSize(final List<TransactionReceipt> receipts) {
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
receipts.forEach(r -> r.writeTo(rlp));
rlp.endList();
return rlp.encodedSize();
}
}

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.manager;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
@ -304,6 +305,7 @@ public class RequestManagerTest {
EthProtocol.NAME,
onPeerReady,
Collections.emptyList(),
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
TestClock.fixed(),
Collections.emptyList());
}

@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
@ -246,23 +247,27 @@ public class RespondingEthPeer {
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool) {
final int maxMsgSize = EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE;
return (cap, msg) -> {
MessageData response = null;
switch (msg.getCode()) {
case EthPV62.GET_BLOCK_HEADERS:
response = EthServer.constructGetHeadersResponse(blockchain, msg, 200);
response = EthServer.constructGetHeadersResponse(blockchain, msg, 200, maxMsgSize);
break;
case EthPV62.GET_BLOCK_BODIES:
response = EthServer.constructGetBodiesResponse(blockchain, msg, 200);
response = EthServer.constructGetBodiesResponse(blockchain, msg, 200, maxMsgSize);
break;
case EthPV63.GET_RECEIPTS:
response = EthServer.constructGetReceiptsResponse(blockchain, msg, 200);
response = EthServer.constructGetReceiptsResponse(blockchain, msg, 200, maxMsgSize);
break;
case EthPV63.GET_NODE_DATA:
response = EthServer.constructGetNodeDataResponse(worldStateArchive, msg, 200);
response =
EthServer.constructGetNodeDataResponse(worldStateArchive, msg, 200, maxMsgSize);
break;
case EthPV65.GET_POOLED_TRANSACTIONS:
response = EthServer.constructGetPooledTransactionsResponse(transactionPool, msg, 200);
response =
EthServer.constructGetPooledTransactionsResponse(
transactionPool, msg, 200, maxMsgSize);
}
return Optional.ofNullable(response);
};

@ -85,7 +85,14 @@ public abstract class AbstractMessageTaskTest<T, R> {
public void setupTest() {
peersDoTimeout = new AtomicBoolean(false);
peerCountToTimeout = new AtomicInteger(0);
ethPeers = spy(new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem, 25));
ethPeers =
spy(
new EthPeers(
EthProtocol.NAME,
TestClock.fixed(),
metricsSystem,
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE));
final EthMessages ethMessages = new EthMessages();
final EthScheduler ethScheduler =
new DeterministicEthScheduler(

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.manager.ethtaskutils;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection;
@ -163,6 +164,7 @@ public abstract class PeerMessageTaskTest<T>
EthProtocol.NAME,
onPeerReady,
Collections.emptyList(),
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE,
TestClock.fixed(),
Collections.emptyList());
}

@ -606,7 +606,12 @@ public abstract class AbstractBlockPropagationManagerTest {
.thenReturn(new CompletableFuture<>());
final EthContext ethContext =
new EthContext(
new EthPeers("eth", TestClock.fixed(), metricsSystem, 25),
new EthPeers(
"eth",
TestClock.fixed(),
metricsSystem,
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE),
new EthMessages(),
ethScheduler);
final BlockPropagationManager blockPropagationManager =
@ -665,7 +670,12 @@ public abstract class AbstractBlockPropagationManagerTest {
});
final EthContext ethContext =
new EthContext(
new EthPeers("eth", TestClock.fixed(), metricsSystem, 25),
new EthPeers(
"eth",
TestClock.fixed(),
metricsSystem,
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE),
new EthMessages(),
ethScheduler);
final BlockPropagationManager blockPropagationManager =

@ -124,7 +124,13 @@ public class TestNode implements Closeable {
final EthMessages ethMessages = new EthMessages();
final EthPeers ethPeers = new EthPeers(EthProtocol.NAME, TestClock.fixed(), metricsSystem, 25);
final EthPeers ethPeers =
new EthPeers(
EthProtocol.NAME,
TestClock.fixed(),
metricsSystem,
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
final EthScheduler scheduler = new EthScheduler(1, 1, 1, metricsSystem);
final EthContext ethContext = new EthContext(ethPeers, ethMessages, scheduler);

@ -63,7 +63,13 @@ public class TransactionPoolFactoryTest {
when(blockchain.getBlockByNumber(anyLong())).thenReturn(Optional.of(mock(Block.class)));
when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(mock(Hash.class)));
when(context.getBlockchain()).thenReturn(blockchain);
final EthPeers ethPeers = new EthPeers("ETH", TestClock.fixed(), new NoOpMetricsSystem(), 25);
final EthPeers ethPeers =
new EthPeers(
"ETH",
TestClock.fixed(),
new NoOpMetricsSystem(),
25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthMessages()).thenReturn(mock(EthMessages.class));
when(ethContext.getEthPeers()).thenReturn(ethPeers);

@ -33,6 +33,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
@ -190,7 +191,13 @@ public class RetestethContext {
// mining support
final EthPeers ethPeers = new EthPeers("reteseth", retestethClock, metricsSystem, 0);
final EthPeers ethPeers =
new EthPeers(
"reteseth",
retestethClock,
metricsSystem,
0,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
final SyncState syncState = new SyncState(blockchain, ethPeers);
ethScheduler = new EthScheduler(1, 1, 1, 1, metricsSystem);

@ -36,7 +36,7 @@ abstract class AbstractRLPOutput implements RLPOutput {
* (that is, the list that starts at the ith LIST_MARKER in 'values').
*
* With that information gathered, encoded() can write its output in a single walk of 'values':
* values can encoded directly, and every time we read a list marker, we use the corresponding
* values can be encoded directly, and every time we read a list marker, we use the corresponding
* payload size to write the proper prefix and continue.
*
* The main remaining aspect is how the values of 'payloadSizes' are computed. Computing the size
@ -44,15 +44,15 @@ abstract class AbstractRLPOutput implements RLPOutput {
* to the running size. The difficulty is with nesting: when we start a new list, we need to
* track both the sizes of the previous list and the new one. To deal with that, we use the small
* stack 'parentListStack': it stores the index in 'payloadSizes' of every currently "open" lists.
* In other words, payloadSises[parentListStack[stackSize - 1]] corresponds to the size of the
* In other words, payloadSizes[parentListStack[stackSize - 1]] corresponds to the size of the
* current list, the one to which newly added value are currently written (until the next call
* to 'endList()' that is, while payloadSises[parentListStack[stackSize - 2]] would be the size
* to 'endList()' that is, while payloadSizes[parentListStack[stackSize - 2]] would be the size
* of the parent list, ....
*
* Note that when a new value is added, we add its size only the currently running list. We should
* add that size to that of any parent list as well, but we do so indirectly when a list is
* finished: when 'endList()' is called, we add the size of the full list we just finished (and
* whose size we have now have completely) to its parent size.
* whose size we have now completed) to its parent size.
*
* Side-note: this class internally and informally use "element" to refer to a non list items.
*/
@ -65,7 +65,7 @@ abstract class AbstractRLPOutput implements RLPOutput {
private final BitSet rlpEncoded = new BitSet();
// First element is the total size of everything (the encoding may be a single non-list item, so
// this handle that case more easily; we need that value to size out final output). Following
// this handles that case more easily; we need that value to size out final output). Following
// elements holds the size of the payload of the ith list in 'values'.
private int[] payloadSizes = new int[8];
private int listsCount = 1; // number of lists current in 'values' + 1.

@ -30,6 +30,11 @@ public abstract class RLP {
public static final Bytes EMPTY_LIST;
// RLP encoding requires payloads to be less thatn 2^64 bytes in length
// As a result, the longest RLP strings will have a prefix composed of 1 byte encoding the type
// of string followed by at most 8 bytes describing the length of the string
public static final int MAX_PREFIX_SIZE = 9;
static {
final BytesValueRLPOutput out = new BytesValueRLPOutput();
out.startList();

Loading…
Cancel
Save