Tune transaction synchronization parameter to adapt to mainnet traffic (#3610)

* Tune transaction worker queue capacity to adapt to mainnet

Currently Besu handles hundreds of transaction message per seconds,
so having a queue of 1M messages result in a lot of expired messages,
make sense to reduce the capacity to drop incoming messages that could not
be handled anyway.

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Tune the number transactions seen by peer to adapt to mainnet traffic

Currently on mainnet there are many thousand of pending transactions
exchanged between peers, but Besu has a short memory of what has been
exchanged with a specific peer, with the result that the same transaction
is often exchanged back and forth with the same peer, expecially when the
peer is another Besu.

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Improve trace log of transaction exchanges with other peers

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Update CHANGELOG

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/3617/head
Fabio Di Fabio 3 years ago committed by GitHub
parent 8e38e1daac
commit 0d8f17553c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      CHANGELOG.md
  2. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java
  3. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTracker.java
  4. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java
  5. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageProcessor.java
  6. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsMessageSender.java
  7. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessor.java
  8. 21
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSender.java

@ -6,6 +6,7 @@
- Remove the experimental flag for bonsai tries CLI options '--data-storage-format' and '--bonsai-maximum-back-layers-to-load' [#3578](https://github.com/hyperledger/besu/pull/3578)
### Additions and Improvements
- Tune transaction synchronization parameter to adapt to mainnet traffic [#3610](https://github.com/hyperledger/besu/pull/3610)
## 22.1.2
@ -14,7 +15,7 @@
- Execution specific RPC endpoint [#3378](https://github.com/hyperledger/besu/issues/3378)
- Adds JWT authentication to Engine APIs
- Supports kiln V2.1 spec
- Tracing APIs
- Tracing APIs
- new API methods: trace_rawTransaction, trace_get, trace_callMany
- added revertReason to trace APIs including: trace_transaction, trace_get, trace_call, trace_callMany, and trace_rawTransaction
- Allow mining beneficiary to transition at specific blocks for ibft2 and qbft consensus mechanisms. [#3115](https://github.com/hyperledger/besu/issues/3115)

@ -46,7 +46,7 @@ public class EthScheduler {
private final Duration defaultTimeout = Duration.ofSeconds(5);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdown = new CountDownLatch(1);
private static final int TX_WORKER_CAPACITY = 1000000;
private static final int TX_WORKER_CAPACITY = 1_000;
protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler;

@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class PeerPendingTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 10_000;
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Hash>> transactionsToSend = new ConcurrentHashMap<>();
private final AbstractPendingTransactionsSorter pendingTransactions;

@ -28,7 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 10_000;
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions;
import static java.time.Instant.now;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
@ -30,6 +31,7 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@ -96,7 +98,14 @@ public class PendingTransactionsMessageProcessor {
private void processNewPooledTransactionHashesMessage(
final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) {
try {
LOG.trace("Received pooled transaction hashes message from {}", peer);
final List<Hash> incomingTransactionHashes = transactionsMessage.pendingTransactions();
traceLambda(
LOG,
"Received pooled transaction hashes message from {}, incoming hashes {}, incoming list {}",
peer::toString,
incomingTransactionHashes::size,
incomingTransactionHashes::toString);
transactionTracker.markTransactionsHashesAsSeen(
peer, transactionsMessage.pendingTransactions());

@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
@ -23,8 +25,11 @@ import java.util.List;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class PendingTransactionsMessageSender {
private static final Logger LOG = LoggerFactory.getLogger(PendingTransactionsMessageSender.class);
private final PeerPendingTransactionTracker transactionTracker;
@ -44,6 +49,13 @@ class PendingTransactionsMessageSender {
transactionTracker.claimTransactionsToSendToPeer(peer),
TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS_HASHES)) {
try {
traceLambda(
LOG,
"Sending transaction hashes to peer {}, transaction hashes count {}, list {}",
peer::toString,
hashes::size,
hashes::toString);
peer.send(NewPooledTransactionHashesMessage.create(hashes));
} catch (final PeerNotConnected __) {
break;

@ -15,7 +15,9 @@
package org.hyperledger.besu.ethereum.eth.transactions;
import static java.time.Instant.now;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
@ -26,8 +28,10 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
@ -73,9 +77,15 @@ class TransactionsMessageProcessor {
private void processTransactionsMessage(
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
LOG.trace("Received transactions message from {}", peer);
final List<Transaction> readTransactions = transactionsMessage.transactions();
traceLambda(
LOG,
"Received transactions message from {}, incoming transactions {}, incoming list {}",
peer::toString,
readTransactions::size,
() -> toHashList(readTransactions));
final Set<Transaction> transactions = Sets.newHashSet(readTransactions);
transactionTracker.markTransactionsAsSeen(peer, transactions);
transactionPool.addRemoteTransactions(transactions);
@ -86,4 +96,8 @@ class TransactionsMessageProcessor {
}
}
}
private List<Hash> toHashList(final Collection<Transaction> txs) {
return txs.stream().map(Transaction::getHash).collect(Collectors.toList());
}
}

@ -14,12 +14,18 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.LimitedTransactionsMessages;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
@ -45,7 +51,16 @@ class TransactionsMessageSender {
while (!allTxToSend.isEmpty()) {
final LimitedTransactionsMessages limitedTransactionsMessages =
LimitedTransactionsMessages.createLimited(allTxToSend);
LOG.trace("Sending transactions to peer {} TRANSACTIONS count {}", peer, allTxToSend.size());
final Set<Transaction> includedTransactions =
limitedTransactionsMessages.getIncludedTransactions();
traceLambda(
LOG,
"Sending transactions to peer {} all transactions count {}, "
+ "single message transactions {}, single message list {}",
peer::toString,
allTxToSend::size,
includedTransactions::size,
() -> toHashList(includedTransactions));
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
try {
peer.send(limitedTransactionsMessages.getTransactionsMessage());
@ -54,4 +69,8 @@ class TransactionsMessageSender {
}
}
}
private List<Hash> toHashList(final Collection<Transaction> txs) {
return txs.stream().map(Transaction::getHash).collect(Collectors.toList());
}
}

Loading…
Cancel
Save