diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java index f0db8b6cae..b4ae7ec87c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java @@ -29,7 +29,6 @@ import java.time.Instant; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +85,7 @@ public class NewPooledTransactionHashesMessageProcessor { LOG.atTrace() .setMessage( - "Received pooled transaction hashes message from {}... incoming hashes {}, incoming list {}") + "Received pooled transaction hashes message from {} incoming hashes {}, incoming list {}") .addArgument(() -> peer == null ? null : peer.getLoggableId()) .addArgument(incomingTransactionHashes::size) .addArgument(incomingTransactionHashes) @@ -121,7 +120,7 @@ public class NewPooledTransactionHashesMessageProcessor { bufferedTask.addHashes( incomingTransactionHashes.stream() .filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty()) - .collect(Collectors.toList())); + .toList()); } catch (final RLPException ex) { if (peer != null) { LOG.debug( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java index f9773ddd23..da95900429 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java @@ -20,20 +20,34 @@ import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PeerTransactionTracker implements EthPeer.DisconnectCallback { + private static final Logger LOG = LoggerFactory.getLogger(PeerTransactionTracker.class); + private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000; + + private final EthPeers ethPeers; private final Map> seenTransactions = new ConcurrentHashMap<>(); private final Map> transactionsToSend = new ConcurrentHashMap<>(); private final Map> transactionHashesToSend = new ConcurrentHashMap<>(); + public PeerTransactionTracker(final EthPeers ethPeers) { + this.ethPeers = ethPeers; + } + public void reset() { seenTransactions.clear(); transactionsToSend.clear(); @@ -119,8 +133,46 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback { @Override public void onDisconnect(final EthPeer peer) { - seenTransactions.remove(peer); - transactionsToSend.remove(peer); - transactionHashesToSend.remove(peer); + LOG.atTrace().setMessage("onDisconnect for peer {}").addArgument(peer::getLoggableId).log(); + + // here we reconcile all the trackers with the active peers, since due to the asynchronous + // processing of incoming messages it could seldom happen that a tracker is recreated just + // after a peer was disconnected, resulting in a memory leak. + final Set trackedPeers = new HashSet<>(seenTransactions.keySet()); + trackedPeers.addAll(transactionsToSend.keySet()); + trackedPeers.addAll(transactionHashesToSend.keySet()); + + LOG.atTrace() + .setMessage("{} tracked peers ({})") + .addArgument(trackedPeers.size()) + .addArgument(() -> logPeerSet(trackedPeers)) + .log(); + + final Set connectedPeers = + ethPeers.streamAllPeers().collect(Collectors.toUnmodifiableSet()); + + final var disconnectedPeers = trackedPeers; + disconnectedPeers.removeAll(connectedPeers); + LOG.atTrace() + .setMessage("Removing {} transaction trackers for disconnected peers ({})") + .addArgument(disconnectedPeers.size()) + .addArgument(() -> logPeerSet(disconnectedPeers)) + .log(); + + disconnectedPeers.stream() + .forEach( + disconnectedPeer -> { + seenTransactions.remove(disconnectedPeer); + transactionsToSend.remove(disconnectedPeer); + transactionHashesToSend.remove(disconnectedPeer); + LOG.atTrace() + .setMessage("Removed transaction trackers for disconnected peer {}") + .addArgument(disconnectedPeer::getLoggableId) + .log(); + }); + } + + private String logPeerSet(final Set peers) { + return peers.stream().map(EthPeer::getLoggableId).collect(Collectors.joining(",")); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index 2c667a9921..e2fa4936c8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -59,7 +59,8 @@ public class TransactionPoolFactory { final TransactionPoolMetrics metrics = new TransactionPoolMetrics(metricsSystem); - final PeerTransactionTracker transactionTracker = new PeerTransactionTracker(); + final PeerTransactionTracker transactionTracker = + new PeerTransactionTracker(ethContext.getEthPeers()); final TransactionsMessageSender transactionsMessageSender = new TransactionsMessageSender(transactionTracker); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java index 17b834cedf..232ccc0aae 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java @@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; @@ -57,6 +58,7 @@ public class BufferedGetPooledTransactionsFromPeerFetcherTest { @Mock TransactionPool transactionPool; @Mock EthContext ethContext; @Mock EthScheduler ethScheduler; + @Mock EthPeers ethPeers; private final BlockDataGenerator generator = new BlockDataGenerator(); @@ -67,7 +69,8 @@ public class BufferedGetPooledTransactionsFromPeerFetcherTest { @BeforeEach public void setup() { metricsSystem = new StubMetricsSystem(); - transactionTracker = new PeerTransactionTracker(); + when(ethContext.getEthPeers()).thenReturn(ethPeers); + transactionTracker = new PeerTransactionTracker(ethPeers); when(ethContext.getScheduler()).thenReturn(ethScheduler); ScheduledFuture mock = mock(ScheduledFuture.class); fetcher = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java index caf16aac5c..1922255676 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java @@ -242,7 +242,7 @@ public abstract class AbstractTransactionPoolTest { doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture()); doReturn(ethScheduler).when(ethContext).getScheduler(); - peerTransactionTracker = new PeerTransactionTracker(); + peerTransactionTracker = new PeerTransactionTracker(ethContext.getEthPeers()); transactionBroadcaster = spy( new TransactionBroadcaster( diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java index 6a72271bb1..4e544d39d0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java @@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection; import org.hyperledger.besu.ethereum.eth.messages.EthPV65; import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; @@ -47,6 +48,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; public class NewPooledTransactionHashesMessageSenderTest { + private final EthPeers ethPeers = mock(EthPeers.class); private final EthPeer peer1 = mock(EthPeer.class); private final EthPeer peer2 = mock(EthPeer.class); @@ -63,7 +65,7 @@ public class NewPooledTransactionHashesMessageSenderTest { @BeforeEach public void setUp() { - transactionTracker = new PeerTransactionTracker(); + transactionTracker = new PeerTransactionTracker(ethPeers); messageSender = new NewPooledTransactionHashesMessageSender(transactionTracker); final Transaction tx = mock(Transaction.class); pendingTransactions = mock(PendingTransactions.class); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTrackerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTrackerTest.java index e1f1c83f58..520f664148 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTrackerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTrackerTest.java @@ -16,20 +16,26 @@ package org.hyperledger.besu.ethereum.eth.transactions; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; + +import java.util.List; +import java.util.stream.Stream; import com.google.common.collect.ImmutableSet; import org.junit.jupiter.api.Test; public class PeerTransactionTrackerTest { + private final EthPeers ethPeers = mock(EthPeers.class); private final EthPeer ethPeer1 = mock(EthPeer.class); private final EthPeer ethPeer2 = mock(EthPeer.class); private final BlockDataGenerator generator = new BlockDataGenerator(); - private final PeerTransactionTracker tracker = new PeerTransactionTracker(); + private final PeerTransactionTracker tracker = new PeerTransactionTracker(ethPeers); private final Transaction transaction1 = generator.transaction(); private final Transaction transaction2 = generator.transaction(); private final Transaction transaction3 = generator.transaction(); @@ -79,6 +85,7 @@ public class PeerTransactionTrackerTest { tracker.addToPeerSendQueue(ethPeer1, transaction2); tracker.addToPeerSendQueue(ethPeer2, transaction3); + when(ethPeers.streamAllPeers()).thenReturn(Stream.of(ethPeer2)); tracker.onDisconnect(ethPeer1); assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2); @@ -90,4 +97,32 @@ public class PeerTransactionTrackerTest { assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(transaction1); assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(transaction3); } + + @Test + public void shouldClearDataForAllDisconnectedPeers() { + tracker.markTransactionsAsSeen(ethPeer1, List.of(transaction1)); + tracker.markTransactionsAsSeen(ethPeer2, List.of(transaction2)); + + when(ethPeers.streamAllPeers()).thenReturn(Stream.of(ethPeer2)); + tracker.onDisconnect(ethPeer1); + + // false because tracker removed for ethPeer1 + assertThat(tracker.hasPeerSeenTransaction(ethPeer1, transaction1)).isFalse(); + assertThat(tracker.hasPeerSeenTransaction(ethPeer2, transaction2)).isTrue(); + + // simulate a concurrent interaction, that just after the disconnection of the peer, + // recreates the transaction tackers for it + tracker.markTransactionsAsSeen(ethPeer1, List.of(transaction1)); + // ethPeer1 is here again, due to the above interaction with the tracker + assertThat(tracker.hasPeerSeenTransaction(ethPeer1, transaction1)).isTrue(); + + // disconnection of ethPeers2 will reconcile the tracker, removing also all the other + // disconnected peers + when(ethPeers.streamAllPeers()).thenReturn(Stream.of()); + tracker.onDisconnect(ethPeer2); + + // since no peers are connected, all the transaction trackers have been removed + assertThat(tracker.hasPeerSeenTransaction(ethPeer1, transaction1)).isFalse(); + assertThat(tracker.hasPeerSeenTransaction(ethPeer2, transaction2)).isFalse(); + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSenderTest.java index 02a2d248bc..85c38fd8fe 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSenderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSenderTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; public class TransactionsMessageSenderTest { + private final EthPeers ethPeers = mock(EthPeers.class); private final EthPeer peer1 = mock(EthPeer.class); private final EthPeer peer2 = mock(EthPeer.class); @@ -46,7 +48,7 @@ public class TransactionsMessageSenderTest { private final Transaction transaction2 = generator.transaction(); private final Transaction transaction3 = generator.transaction(); - private final PeerTransactionTracker transactionTracker = new PeerTransactionTracker(); + private final PeerTransactionTracker transactionTracker = new PeerTransactionTracker(ethPeers); private final TransactionsMessageSender messageSender = new TransactionsMessageSender(transactionTracker);