From a01b588a129d2896500f49ed12a5ba7479abd0b7 Mon Sep 17 00:00:00 2001 From: "Ratan (Rai) Sur" Date: Thu, 29 Oct 2020 16:34:18 -0400 Subject: [PATCH] Thread Safe Evicting Queue Access (#1498) Signed-off-by: Ratan Rai Sur --- .../eth/transactions/PendingTransactions.java | 7 +-- .../eth/transactions/TransactionPool.java | 44 ++++++------------- 2 files changed, 17 insertions(+), 34 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java index a5bdd84ec8..46031dc0bc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java @@ -38,7 +38,6 @@ import java.time.Clock; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -375,8 +374,10 @@ public class PendingTransactions { } } - Collection getNewPooledHashes() { - return newPooledHashes; + List getNewPooledHashes() { + synchronized (newPooledHashes) { + return List.copyOf(newPooledHashes); + } } /** diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index c13afe5762..9e5d26c5f4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -47,10 +47,7 @@ import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import java.util.Collection; -import java.util.ConcurrentModificationException; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; import java.util.Optional; import java.util.Set; @@ -124,34 +121,19 @@ public class TransactionPool implements BlockAddedObserver { } void handleConnect(final EthPeer peer) { - final List localTransactions = getLocalTransactions(); - for (final Transaction transaction : localTransactions) { - peerTransactionTracker.addToPeerSendQueue(peer, transaction); - } - maybePeerPendingTransactionTracker.ifPresent( - peerPendingTransactionTracker -> { - if (peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)) { - Iterator hashIterator = getNewPooledHashes().iterator(); - while (hashIterator.hasNext()) { - try { - peerPendingTransactionTracker.addToPeerSendQueue(peer, hashIterator.next()); - } catch (ConcurrentModificationException __) { - // The hash got evicted somehow. - // For example, it was pushed out by new pooled transaction hashes or we got the - // full transaction and evicted it manually. - // Either way, we don't care and can continue. - } - } - } - }); - } - - private List getLocalTransactions() { - return pendingTransactions.getLocalTransactions(); - } - - public Collection getNewPooledHashes() { - return pendingTransactions.getNewPooledHashes(); + pendingTransactions + .getLocalTransactions() + .forEach(transaction -> peerTransactionTracker.addToPeerSendQueue(peer, transaction)); + + maybePeerPendingTransactionTracker + .filter( + peerPendingTransactionTracker -> + peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)) + .ifPresent( + peerPendingTransactionTracker -> + pendingTransactions + .getNewPooledHashes() + .forEach(hash -> peerPendingTransactionTracker.addToPeerSendQueue(peer, hash))); } public boolean addTransactionHash(final Hash transactionHash) {