Implement max message size rather than limiting with fixed number of transactions (#1271)

* Implement max message size rather then cap with fixed number of transactions

Adding transactions to the RLP until the message size exceeds the limit and then send that.

* fix final variables

* Update AbstractRLPOutput.java

add javadoc

* pr discussion

put this factory method on LimitedTransactionsMessages rather than TransactionsMessage since it returns a LimitedTransactionsMessages.

* SpotlessApply

* fix PR discussion

- simplify design
- remove useless code

* Update LimitedTransactionsMessages.java

* fix PR discussion

- simplify logic
- add tests

* Update AbstractRLPOutput.java

* Update ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/LimitedTransactionsMessages.java

Co-Authored-By: abdelhamidbakhta <45264458+abdelhamidbakhta@users.noreply.github.com>

* Update Transaction.java

* fix PR discussion

* fix PR discussion

- add tests

* Update BlockDataGenerator.java

* Update LimitedTransactionsMessagesTest.java

fix PR unit test

* Update LimitedTransactionsMessagesTest.java

* Update LimitedTransactionsMessagesTest.java

Use LinkedHashSet to preserve order.

* Update LimitedTransactionsMessagesTest.java

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Abdelhamid Bakhta 6 years ago committed by GitHub
parent 622a03350b
commit fbf5db7828
  1. 43
      ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/BlockDataGenerator.java
  2. 69
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/LimitedTransactionsMessages.java
  3. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/TransactionsMessage.java
  4. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageSender.java
  5. 106
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/messages/LimitedTransactionsMessagesTest.java
  6. 19
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageSenderTest.java
  7. 1
      ethereum/rlp/src/main/java/tech/pegasys/pantheon/ethereum/rlp/AbstractRLPOutput.java

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.core;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.stream.Collectors.toSet;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;
import tech.pegasys.pantheon.crypto.SECP256K1;
@ -31,6 +32,8 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.stream.IntStream;
public class BlockDataGenerator {
private final Random random;
@ -241,6 +244,18 @@ public class BlockDataGenerator {
return new BlockBody(options.getTransactions(defaultTxs), ommers);
}
public Transaction transaction(final BytesValue payload) {
return Transaction.builder()
.nonce(positiveLong())
.gasPrice(Wei.wrap(bytes32()))
.gasLimit(positiveLong())
.to(address())
.value(Wei.wrap(bytes32()))
.payload(payload)
.chainId(1)
.signAndBuild(SECP256K1.KeyPair.generate());
}
public Transaction transaction() {
return Transaction.builder()
.nonce(positiveLong())
@ -253,6 +268,34 @@ public class BlockDataGenerator {
.signAndBuild(SECP256K1.KeyPair.generate());
}
public Set<Transaction> transactions(final int n) {
Wei gasPrice = Wei.wrap(bytes32());
long gasLimit = positiveLong();
Address to = address();
Wei value = Wei.wrap(bytes32());
int chainId = 1;
Bytes32 payload = bytes32();
SECP256K1.Signature signature = SECP256K1.sign(payload, SECP256K1.KeyPair.generate());
final Set<Transaction> txs =
IntStream.range(0, n)
.parallel()
.mapToObj(
v ->
new Transaction(
v,
gasPrice,
gasLimit,
Optional.of(to),
value,
signature,
payload,
to,
chainId))
.collect(toSet());
return txs;
}
public TransactionReceipt receipt(final long cumulativeGasUsed) {
return new TransactionReceipt(hash(), cumulativeGasUsed, Arrays.asList(log(), log()));
}

@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.HashSet;
import java.util.Set;
public final class LimitedTransactionsMessages {
private static final int LIMIT = 1048576;
private final TransactionsMessage transactionsMessage;
private final Set<Transaction> includedTransactions;
public LimitedTransactionsMessages(
final TransactionsMessage transactionsMessage, final Set<Transaction> includedTransactions) {
this.transactionsMessage = transactionsMessage;
this.includedTransactions = includedTransactions;
}
public static LimitedTransactionsMessages createLimited(
final Iterable<Transaction> transactions) {
final Set<Transaction> includedTransactions = new HashSet<>();
final BytesValueRLPOutput message = new BytesValueRLPOutput();
int messageSize = 0;
message.startList();
for (final Transaction transaction : transactions) {
final BytesValueRLPOutput encodedTransaction = new BytesValueRLPOutput();
transaction.writeTo(encodedTransaction);
BytesValue encodedBytes = encodedTransaction.encoded();
// Break if individual transaction size exceeds limit
if (encodedBytes.size() > LIMIT && (messageSize != 0)) {
break;
}
message.writeRLPUnsafe(encodedBytes);
includedTransactions.add(transaction);
// Check if last transaction to add to the message
messageSize += encodedBytes.size();
if (messageSize > LIMIT) {
break;
}
}
message.endList();
return new LimitedTransactionsMessages(
new TransactionsMessage(message.encoded()), includedTransactions);
}
public final TransactionsMessage getTransactionsMessage() {
return transactionsMessage;
}
public final Set<Transaction> getIncludedTransactions() {
return includedTransactions;
}
}

@ -47,7 +47,7 @@ public class TransactionsMessage extends AbstractMessageData {
return new TransactionsMessage(tmp.encoded());
}
private TransactionsMessage(final BytesValue data) {
TransactionsMessage(final BytesValue data) {
super(data);
}

@ -12,18 +12,16 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;
import static java.util.stream.Collectors.toSet;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.LimitedTransactionsMessages;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import java.util.Set;
import java.util.stream.StreamSupport;
class TransactionsMessageSender {
private static final int MAX_BATCH_SIZE = 10;
private final PeerTransactionTracker transactionTracker;
public TransactionsMessageSender(final PeerTransactionTracker transactionTracker) {
@ -31,17 +29,19 @@ class TransactionsMessageSender {
}
public void sendTransactionsToPeers() {
transactionTracker.getEthPeersWithUnsentTransactions().forEach(this::sendTransactionsToPeer);
StreamSupport.stream(transactionTracker.getEthPeersWithUnsentTransactions().spliterator(), true)
.parallel()
.forEach(this::sendTransactionsToPeer);
}
private void sendTransactionsToPeer(final EthPeer peer) {
final Set<Transaction> allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer);
while (!allTxToSend.isEmpty()) {
final Set<Transaction> subsetToSend =
allTxToSend.stream().limit(MAX_BATCH_SIZE).collect(toSet());
allTxToSend.removeAll(subsetToSend);
final LimitedTransactionsMessages limitedTransactionsMessages =
LimitedTransactionsMessages.createLimited(allTxToSend);
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
try {
peer.send(TransactionsMessage.create(subsetToSend));
peer.send(limitedTransactionsMessages.getTransactionsMessage());
} catch (final PeerNotConnected e) {
return;
}

@ -0,0 +1,106 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import org.junit.Test;
public class LimitedTransactionsMessagesTest {
private static final int LIMIT = 1048576;
private final BlockDataGenerator generator = new BlockDataGenerator();
private final Set<Transaction> sampleTxs = generator.transactions(1);
private final TransactionsMessage sampleTransactionMessages =
TransactionsMessage.create(sampleTxs);
private final LimitedTransactionsMessages sampleLimitedTransactionsMessages =
new LimitedTransactionsMessages(sampleTransactionMessages, sampleTxs);
@Test
public void createLimited() {
final Set<Transaction> txs = generator.transactions(6000);
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(5219, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(781, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(781, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(0, txs.size());
assertTrue(
(firstMessage.getTransactionsMessage().getSize()
+ secondMessage.getTransactionsMessage().getSize())
< 2 * LIMIT);
}
@Test
public void createLimitedWithFirstTransactionExceedingLimit() {
final Set<Transaction> txs = new HashSet<>();
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(2, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(1, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(0, txs.size());
}
@Test
public void createLimitedWithFirstTransactionExceedingLimit_2() {
final Set<Transaction> txs = new LinkedHashSet<>();
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT + 100 - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(2, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(1, txs.size());
final LimitedTransactionsMessages thirdMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, thirdMessage.getIncludedTransactions().size());
txs.removeAll(thirdMessage.getIncludedTransactions());
assertEquals(0, txs.size());
}
@Test
public void getTransactionsMessage() {
assertEquals(
sampleTransactionMessages, sampleLimitedTransactionsMessages.getTransactionsMessage());
}
@Test
public void getIncludedTransactions() {
assertEquals(sampleTxs, sampleLimitedTransactionsMessages.getIncludedTransactions());
}
}

@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.transactions;
import static com.google.common.collect.Sets.newHashSet;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
@ -30,7 +29,6 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;
import org.junit.Test;
@ -40,6 +38,7 @@ public class TransactionsMessageSenderTest {
private final EthPeer peer1 = mock(EthPeer.class);
private final EthPeer peer2 = mock(EthPeer.class);
private final BlockDataGenerator generator = new BlockDataGenerator();
private final Transaction transaction1 = generator.transaction();
private final Transaction transaction2 = generator.transaction();
@ -63,14 +62,12 @@ public class TransactionsMessageSenderTest {
}
@Test
public void shouldSendTransactionsInBatches() throws Exception {
final Set<Transaction> fifteenTransactions =
IntStream.range(0, 15).mapToObj(number -> generator.transaction()).collect(toSet());
fifteenTransactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Transaction> transactions = generator.transactions(6000);
messageSender.sendTransactionsToPeers();
transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
messageSender.sendTransactionsToPeers();
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
ArgumentCaptor.forClass(MessageData.class);
verify(peer1, times(2)).send(messageDataArgumentCaptor.capture());
@ -82,10 +79,10 @@ public class TransactionsMessageSenderTest {
final Set<Transaction> firstBatch = getTransactionsFromMessage(sentMessages.get(0));
final Set<Transaction> secondBatch = getTransactionsFromMessage(sentMessages.get(1));
assertThat(firstBatch).hasSize(10);
assertThat(secondBatch).hasSize(5);
assertThat(firstBatch).hasSize(5219);
assertThat(secondBatch).hasSize(781);
assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(fifteenTransactions);
assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(transactions);
}
private MessageData transactionsMessageContaining(final Transaction... transactions) {

@ -140,7 +140,6 @@ abstract class AbstractRLPOutput implements RLPOutput {
checkState(stackSize == 1, "A list has been entered (startList()) but not left (endList())");
return payloadSizes[0];
}
/**
* Write the rlp encoded value to the provided {@link MutableBytesValue}
*

Loading…
Cancel
Save