Fix to avoid broadcasting full blob txs (#6835)

* separate queue for tx hashes

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>

* Refinements

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Update tests

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Update CHANGELOG

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

* Refinements

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>

---------

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
Co-authored-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
pull/6841/head
Fabio Di Fabio 8 months ago committed by GitHub
parent ceafa2a1e6
commit 3a2eb4e71e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSender.java
  3. 22
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java
  4. 37
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcaster.java
  5. 9
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java
  6. 58
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcasterTest.java

@ -34,6 +34,7 @@
- Don't enable the BFT mining coordinator when running sub commands such as `blocks export` [#6675](https://github.com/hyperledger/besu/pull/6675)
- In JSON-RPC return optional `v` fields for type 1 and type 2 transactions [#6762](https://github.com/hyperledger/besu/pull/6762)
- Fix Shanghai/QBFT block import bug when syncing new nodes [#6765](https://github.com/hyperledger/besu/pull/6765)
- Fix to avoid broadcasting full blob txs, instead of only the tx announcement, to a subset of nodes [#6835](https://github.com/hyperledger/besu/pull/6835)
### Download Links

@ -45,7 +45,7 @@ class NewPooledTransactionHashesMessageSender {
final Capability capability = peer.getConnection().capability(EthProtocol.NAME);
for (final List<Transaction> txBatch :
Iterables.partition(
transactionTracker.claimTransactionsToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
transactionTracker.claimTransactionHashesToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
try {
final List<Hash> txHashes = toHashList(txBatch);
LOG.atTrace()

@ -32,10 +32,12 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionHashesToSend = new ConcurrentHashMap<>();
public void reset() {
seenTransactions.clear();
transactionsToSend.clear();
transactionHashesToSend.clear();
}
public synchronized void markTransactionsAsSeen(
@ -55,6 +57,15 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
}
}
public synchronized void addToPeerHashSendQueue(
final EthPeer peer, final Transaction transaction) {
if (!hasPeerSeenTransaction(peer, transaction)) {
transactionHashesToSend
.computeIfAbsent(peer, key -> createTransactionsSet())
.add(transaction);
}
}
public Iterable<EthPeer> getEthPeersWithUnsentTransactions() {
return transactionsToSend.keySet();
}
@ -69,6 +80,16 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
}
}
public synchronized Set<Transaction> claimTransactionHashesToSendToPeer(final EthPeer peer) {
final Set<Transaction> transactionHashesToSend = this.transactionHashesToSend.remove(peer);
if (transactionHashesToSend != null) {
markTransactionHashesAsSeen(peer, toHashList(transactionHashesToSend));
return transactionHashesToSend;
} else {
return emptySet();
}
}
public boolean hasSeenTransaction(final Hash txHash) {
return seenTransactions.values().stream().anyMatch(seen -> seen.contains(txHash));
}
@ -100,5 +121,6 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
public void onDisconnect(final EthPeer peer) {
seenTransactions.remove(peer);
transactionsToSend.remove(peer);
transactionHashesToSend.remove(peer);
}
}

@ -30,8 +30,10 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,16 +49,33 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
private final TransactionsMessageSender transactionsMessageSender;
private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthContext ethContext;
private final Random random;
public TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) {
this(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
null);
}
@VisibleForTesting
protected TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final Long seed) {
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender;
this.ethContext = ethContext;
this.random = seed != null ? new Random(seed) : new Random();
}
public void relayTransactionPoolTo(
@ -65,7 +84,13 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
sendTransactionHashes(toTransactionList(pendingTransactions), List.of(peer));
} else {
sendFullTransactions(toTransactionList(pendingTransactions), List.of(peer));
// we need to exclude txs that support hash only broadcasting
final var fullBroadcastTxs =
pendingTransactions.stream()
.map(PendingTransaction::getTransaction)
.filter(tx -> !ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType()))
.toList();
sendFullTransactions(fullBroadcastTxs, List.of(peer));
}
}
}
@ -77,7 +102,7 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
return;
}
final int numPeersToSendFullTransactions = (int) Math.ceil(Math.sqrt(currPeerCount));
final int numPeersToSendFullTransactions = (int) Math.round(Math.sqrt(currPeerCount));
final Map<Boolean, List<Transaction>> transactionByBroadcastMode =
transactions.stream()
@ -107,7 +132,7 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
numPeersToSendFullTransactions - sendOnlyFullTransactionPeers.size(),
sendOnlyHashPeers.size());
Collections.shuffle(sendOnlyHashPeers);
Collections.shuffle(sendOnlyHashPeers, random);
// move peers from the mixed list to reach the required size for full transaction peers
movePeersBetweenLists(sendOnlyHashPeers, sendMixedPeers, delta);
@ -121,7 +146,7 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
.addArgument(sendOnlyHashPeers::size)
.addArgument(sendMixedPeers::size)
.addArgument(sendOnlyFullTransactionPeers)
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers.toString())
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers)
.log();
sendToFullTransactionsPeers(
@ -141,7 +166,7 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> hashOnlyPeers) {
final List<Transaction> allTransactions =
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).collect(Collectors.toList());
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).toList();
sendTransactionHashes(allTransactions, hashOnlyPeers);
}
@ -175,7 +200,7 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
transaction -> transactionTracker.addToPeerHashSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(

@ -78,9 +78,9 @@ public class NewPooledTransactionHashesMessageSenderTest {
@Test
public void shouldSendPendingTransactionsToEachPeer() throws Exception {
transactionTracker.addToPeerSendQueue(peer1, transaction1);
transactionTracker.addToPeerSendQueue(peer1, transaction2);
transactionTracker.addToPeerSendQueue(peer2, transaction3);
transactionTracker.addToPeerHashSendQueue(peer1, transaction1);
transactionTracker.addToPeerHashSendQueue(peer1, transaction2);
transactionTracker.addToPeerHashSendQueue(peer2, transaction3);
List.of(peer1, peer2).forEach(messageSender::sendTransactionHashesToPeer);
@ -96,7 +96,8 @@ public class NewPooledTransactionHashesMessageSenderTest {
final Set<Transaction> transactions =
generator.transactions(6000).stream().collect(Collectors.toSet());
transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
transactions.forEach(
transaction -> transactionTracker.addToPeerHashSendQueue(peer1, transaction));
messageSender.sendTransactionHashesToPeer(peer1);
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =

@ -55,7 +55,7 @@ import org.mockito.quality.Strictness;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class TransactionBroadcasterTest {
private static final Long FIXED_RANDOM_SEED = 0L;
@Mock private EthContext ethContext;
@Mock private EthPeers ethPeers;
@Mock private EthScheduler ethScheduler;
@ -92,12 +92,14 @@ public class TransactionBroadcasterTest {
when(ethContext.getEthPeers()).thenReturn(ethPeers);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
// we use the fixed random seed to have a predictable shuffle of peers
txBroadcaster =
new TransactionBroadcaster(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender);
newPooledTransactionHashesMessageSender,
FIXED_RANDOM_SEED);
}
@Test
@ -132,7 +134,7 @@ public class TransactionBroadcasterTest {
txBroadcaster.relayTransactionPoolTo(ethPeerWithEth65, pendingTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
sendTaskCapture.getValue().run();
@ -177,14 +179,16 @@ public class TransactionBroadcasterTest {
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.onTransactionsAdded(txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65_3, ethPeerWithEth65_2, ethPeerWithEth65]
// so ethPeerWithEth65 and ethPeerWithEth65_2 are moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
sendTaskCapture.getAllValues().forEach(Runnable::run);
verify(transactionsMessageSender, times(2)).sendTransactionsToPeer(any(EthPeer.class));
verifyNoInteractions(newPooledTransactionHashesMessageSender);
verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerWithEth65_2);
verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(ethPeerWithEth65);
}
@Test
@ -196,10 +200,12 @@ public class TransactionBroadcasterTest {
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.onTransactionsAdded(txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65_3, ethPeerWithEth65_2, ethPeerWithEth65]
// so ethPeerWithEth65 and ethPeerWithEth65_2 are moved to the mixed broadcast list
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_3, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_3, txs);
sendTaskCapture.getAllValues().forEach(Runnable::run);
@ -218,8 +224,10 @@ public class TransactionBroadcasterTest {
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.onTransactionsAdded(txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth65, txs);
@ -250,9 +258,11 @@ public class TransactionBroadcasterTest {
List<Transaction> txs = toTransactionList(setupTransactionPool(BLOB, 0, 1));
txBroadcaster.onTransactionsAdded(txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_2, txs);
verifyNoTransactionAddedToPeerSendingQueue(ethPeerNoEth65);
sendTaskCapture.getAllValues().forEach(Runnable::run);
@ -268,7 +278,6 @@ public class TransactionBroadcasterTest {
@Test
public void onTransactionsAddedWithMixedPeersAndMixedBroadcastKind() {
List<EthPeer> eth65Peers = List.of(ethPeerWithEth65, ethPeerWithEth65_2);
when(ethPeers.peerCount()).thenReturn(3);
@ -285,9 +294,12 @@ public class TransactionBroadcasterTest {
mixedTxs.addAll(hashBroadcastTxs);
txBroadcaster.onTransactionsAdded(mixedTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, mixedTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, mixedTxs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, mixedTxs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_2, hashBroadcastTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, fullBroadcastTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth65, fullBroadcastTxs);
sendTaskCapture.getAllValues().forEach(Runnable::run);
@ -348,6 +360,16 @@ public class TransactionBroadcasterTest {
.containsExactlyInAnyOrderElementsOf(transactions);
}
private void verifyTransactionAddedToPeerHashSendingQueue(
final EthPeer peer, final Collection<Transaction> transactions) {
ArgumentCaptor<Transaction> trackedTransactions = ArgumentCaptor.forClass(Transaction.class);
verify(transactionTracker, times(transactions.size()))
.addToPeerHashSendQueue(eq(peer), trackedTransactions.capture());
assertThat(trackedTransactions.getAllValues())
.containsExactlyInAnyOrderElementsOf(transactions);
}
private void verifyNoTransactionAddedToPeerSendingQueue(final EthPeer peer) {
verify(transactionTracker, times(0)).addToPeerSendQueue(eq(peer), any());

Loading…
Cancel
Save