[PIE-1224] Different request limits for different request types (#1224)

* [PIE-1224] Different request limits for different request types

- create `EthLimits` with max size of response according to geth (https://github.com/ethereum/go-ethereum/blob/master/eth/downloader/downloader.go)
- update `EthServer` to use those constants limits instead of the default one
- remove requestLimit from EthServer and EthProtocolManager constructor
fix PIE-1224

* eth wire protocol request limits

- add fields in `SynchronizerConfiguration` to configure per type request limit
- update `EthServer` constructor to add new fields
- update `EthProtocolManager` and subclasses to support new fields
- update unit tests accordingly

* Update SynchronizerConfiguration.java

* Update EthProtocolManagerTestUtil.java

* Update EthServerTest.java

* Update EthProtocolManagerTest.java

* Update EthServerTest.java

* fix after review discussion

- remove per type request limit configuration from `SynchronizerConfiguration`.
- add `EthServer` constructor without new fields that use default values.
- update unit tests accordingly
- update instanciation of different `PantheonController` accordingly

* Update EthServerTest.java

* fix review

- spotless
- fix clean up review comment

* spotlessApply

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Abdelhamid Bakhta 6 years ago committed by GitHub
parent 5b3fd96326
commit 335ec91f7b
  1. 43
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java
  2. 42
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServer.java
  3. 140
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
  4. 7
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  5. 2
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthServerTest.java
  6. 13
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java

@ -44,7 +44,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
static final int DEFAULT_REQUEST_LIMIT = 200;
private static final Logger LOG = LogManager.getLogger();
private static final List<Capability> FAST_SYNC_CAPS =
Collections.singletonList(EthProtocol.ETH63);
@ -70,8 +69,11 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final WorldStateArchive worldStateArchive,
final int networkId,
final boolean fastSyncEnabled,
final int requestLimit,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final int maxGetBlockHeaders,
final int maxGetBlockBodies,
final int maxGetReceipts,
final int maxGetNodeData) {
this.networkId = networkId;
this.scheduler = scheduler;
this.blockchain = blockchain;
@ -87,10 +89,17 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
this.blockBroadcaster = new BlockBroadcaster(ethContext);
// Set up request handlers
new EthServer(blockchain, worldStateArchive, ethMessages, requestLimit);
new EthServer(
blockchain,
worldStateArchive,
ethMessages,
maxGetBlockHeaders,
maxGetBlockBodies,
maxGetReceipts,
maxGetNodeData);
}
EthProtocolManager(
public EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final int networkId,
@ -98,15 +107,17 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final int requestLimit,
final MetricsSystem metricsSystem) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
requestLimit,
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem));
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
EthServer.DEFAULT_MAX_GET_BLOCK_HEADERS,
EthServer.DEFAULT_MAX_GET_BLOCK_BODIES,
EthServer.DEFAULT_MAX_GET_RECEIPTS,
EthServer.DEFAULT_MAX_GET_NODE_DATA);
}
public EthProtocolManager(
@ -117,17 +128,21 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final int maxGetBlockHeaders,
final int maxGetBlockBodies,
final int maxGetReceipts,
final int maxGetNodeData) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers,
DEFAULT_REQUEST_LIMIT,
metricsSystem);
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem),
maxGetBlockHeaders,
maxGetBlockBodies,
maxGetReceipts,
maxGetNodeData);
}
public EthContext ethContext() {

@ -47,23 +47,51 @@ import org.apache.logging.log4j.Logger;
class EthServer {
private static final Logger LOG = LogManager.getLogger();
public static final int DEFAULT_MAX_GET_BLOCK_HEADERS = 192;
public static final int DEFAULT_MAX_GET_BLOCK_BODIES = 128;
public static final int DEFAULT_MAX_GET_RECEIPTS = 256;
public static final int DEFAULT_MAX_GET_NODE_DATA = 384;
private final Blockchain blockchain;
private final WorldStateArchive worldStateArchive;
private final EthMessages ethMessages;
private final int requestLimit;
private final int maxGetBlockHeaders;
private final int maxGetBlockBodies;
private final int maxGetReceipts;
private final int maxGetNodeData;
EthServer(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final EthMessages ethMessages,
final int requestLimit) {
final int maxGetBlockHeaders,
final int maxGetBlockBodies,
final int maxGetReceipts,
final int maxGetNodeData) {
this.blockchain = blockchain;
this.worldStateArchive = worldStateArchive;
this.ethMessages = ethMessages;
this.requestLimit = requestLimit;
this.maxGetBlockHeaders = maxGetBlockHeaders;
this.maxGetBlockBodies = maxGetBlockBodies;
this.maxGetReceipts = maxGetReceipts;
this.maxGetNodeData = maxGetNodeData;
this.setupListeners();
}
EthServer(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final EthMessages ethMessages) {
this(
blockchain,
worldStateArchive,
ethMessages,
DEFAULT_MAX_GET_BLOCK_HEADERS,
DEFAULT_MAX_GET_BLOCK_BODIES,
DEFAULT_MAX_GET_RECEIPTS,
DEFAULT_MAX_GET_NODE_DATA);
}
private void setupListeners() {
ethMessages.subscribe(EthPV62.GET_BLOCK_HEADERS, this::handleGetBlockHeaders);
ethMessages.subscribe(EthPV62.GET_BLOCK_BODIES, this::handleGetBlockBodies);
@ -75,7 +103,7 @@ class EthServer {
LOG.trace("Responding to GET_BLOCK_HEADERS request");
try {
final MessageData response =
constructGetHeadersResponse(blockchain, message.getData(), requestLimit);
constructGetHeadersResponse(blockchain, message.getData(), maxGetBlockHeaders);
message.getPeer().send(response);
} catch (final RLPException e) {
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
@ -88,7 +116,7 @@ class EthServer {
LOG.trace("Responding to GET_BLOCK_BODIES request");
try {
final MessageData response =
constructGetBodiesResponse(blockchain, message.getData(), requestLimit);
constructGetBodiesResponse(blockchain, message.getData(), maxGetBlockBodies);
message.getPeer().send(response);
} catch (final RLPException e) {
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
@ -101,7 +129,7 @@ class EthServer {
LOG.trace("Responding to GET_RECEIPTS request");
try {
final MessageData response =
constructGetReceiptsResponse(blockchain, message.getData(), requestLimit);
constructGetReceiptsResponse(blockchain, message.getData(), maxGetReceipts);
message.getPeer().send(response);
} catch (final RLPException e) {
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
@ -114,7 +142,7 @@ class EthServer {
LOG.trace("Responding to GET_NODE_DATA request");
try {
final MessageData response =
constructGetNodeDataResponse(worldStateArchive, message.getData(), requestLimit);
constructGetNodeDataResponse(worldStateArchive, message.getData(), maxGetNodeData);
message.getPeer().send(response);
} catch (final RLPException e) {
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);

@ -116,7 +116,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
@ -136,7 +140,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer =
@ -157,7 +165,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer =
@ -189,7 +201,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer =
@ -221,7 +237,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final MessageData messageData =
GetBlockBodiesMessage.create(Collections.singletonList(gen.hash()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
@ -245,7 +265,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final long startBlock = 5L;
final int blockCount = 5;
final MessageData messageData =
@ -285,8 +309,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem(),
limit,
new NoOpMetricsSystem())) {
limit,
limit,
limit)) {
final long startBlock = 5L;
final int blockCount = 10;
final MessageData messageData =
@ -325,7 +352,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final long endBlock = 10L;
final int blockCount = 5;
final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true);
@ -363,7 +394,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final long startBlock = 5L;
final int blockCount = 5;
final int skip = 1;
@ -404,7 +439,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final long endBlock = 10L;
final int blockCount = 5;
final int skip = 1;
@ -466,7 +505,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final long startBlock = blockchain.getChainHeadBlockNumber() - 1L;
final int blockCount = 5;
final MessageData messageData =
@ -505,7 +548,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final long startBlock = blockchain.getChainHeadBlockNumber() + 1;
final int blockCount = 5;
final MessageData messageData =
@ -541,7 +588,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
// Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2;
@ -593,8 +644,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem(),
limit,
limit,
limit,
new NoOpMetricsSystem())) {
limit)) {
// Setup blocks query
final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
@ -645,7 +699,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
// Setup blocks query
final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1;
final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get();
@ -690,7 +748,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
// Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2;
@ -741,8 +803,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem(),
limit,
new NoOpMetricsSystem())) {
limit,
limit,
limit)) {
// Setup blocks query
final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
@ -792,7 +857,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
// Setup blocks query
final long blockNumber = blockchain.getChainHeadBlockNumber() - 5;
final BlockHeader header = blockchain.getBlockHeader(blockNumber).get();
@ -832,7 +901,18 @@ public final class EthProtocolManagerTest {
try (final EthProtocolManager ethManager =
new EthProtocolManager(
blockchain, worldStateArchive, 1, true, 1, 1, 1, new NoOpMetricsSystem())) {
blockchain,
worldStateArchive,
1,
true,
1,
1,
1,
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
// Setup node data query
final List<BytesValue> expectedResults = new ArrayList<>();
@ -882,7 +962,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem());
new NoOpMetricsSystem(),
200,
200,
200,
200);
// Define handler to validate response
final PeerSendHandler onSend = mock(PeerSendHandler.class);
@ -953,7 +1037,11 @@ public final class EthProtocolManagerTest {
1,
1,
1,
new NoOpMetricsSystem())) {
new NoOpMetricsSystem(),
200,
200,
200,
200)) {
final long startBlock = 1L;
final int requestedBlockCount = 13;
final int receivedBlockCount = 2;
@ -1013,7 +1101,15 @@ public final class EthProtocolManagerTest {
try (final EthProtocolManager ethManager =
new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, ethScheduler)) {
blockchain,
protocolContext.getWorldStateArchive(),
1,
true,
ethScheduler,
200,
200,
200,
200)) {
// Create a transaction pool. This has a side effect of registring a listener for the
// transactions message.

@ -44,12 +44,7 @@ public class EthProtocolManagerTestUtil {
final EthScheduler ethScheduler) {
final int networkId = 1;
return new EthProtocolManager(
blockchain,
worldStateArchive,
networkId,
false,
EthProtocolManager.DEFAULT_REQUEST_LIMIT,
ethScheduler);
blockchain, worldStateArchive, networkId, false, ethScheduler, 200, 200, 200, 200);
}
public static EthProtocolManager create(

@ -45,7 +45,7 @@ public class EthServerTest {
@Before
public void setUp() {
new EthServer(blockchain, worldStateArchive, ethMessages, 2);
new EthServer(blockchain, worldStateArchive, ethMessages, 2, 2, 2, 2);
}
@Test

@ -109,7 +109,18 @@ public class TestNode implements Closeable {
new ProtocolContext<>(blockchain, worldStateArchive, null);
final EthProtocolManager ethProtocolManager =
new EthProtocolManager(
blockchain, worldStateArchive, 1, false, 1, 1, 1, new NoOpMetricsSystem());
blockchain,
worldStateArchive,
1,
false,
1,
1,
1,
new NoOpMetricsSystem(),
200,
200,
200,
200);
final NetworkRunner networkRunner =
NetworkRunner.builder()

Loading…
Cancel
Save