diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java index b7f57a2e9c..28a72749df 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java @@ -45,7 +45,6 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Optional; import java.util.OptionalLong; -import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -67,7 +66,7 @@ public class PendingTransactions { private final int maxTransactionRetentionHours; private final Clock clock; - private final Queue newPooledHashes; + private final EvictingQueue newPooledHashes; private final Map pendingTransactions = new ConcurrentHashMap<>(); private final NavigableSet prioritizedTransactions = new TreeSet<>( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 480bd3a1f8..099d75211d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -47,7 +47,9 @@ import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; @@ -77,7 +79,7 @@ public class TransactionPool implements BlockAddedObserver { private final Wei minTransactionGasPrice; private final LabelledMetric duplicateTransactionCounter; private final PeerTransactionTracker peerTransactionTracker; - private final Optional peerPendingTransactionTracker; + private final Optional maybePeerPendingTransactionTracker; private final Optional eip1559; private final TransactionPriceCalculator frontierPriceCalculator = TransactionPriceCalculator.frontier(); @@ -94,7 +96,7 @@ public class TransactionPool implements BlockAddedObserver { final SyncState syncState, final EthContext ethContext, final PeerTransactionTracker peerTransactionTracker, - final Optional peerPendingTransactionTracker, + final Optional maybePeerPendingTransactionTracker, final Wei minTransactionGasPrice, final MetricsSystem metricsSystem, final Optional eip1559, @@ -106,7 +108,7 @@ public class TransactionPool implements BlockAddedObserver { this.pendingTransactionBatchAddedListener = pendingTransactionBatchAddedListener; this.syncState = syncState; this.peerTransactionTracker = peerTransactionTracker; - this.peerPendingTransactionTracker = peerPendingTransactionTracker; + this.maybePeerPendingTransactionTracker = maybePeerPendingTransactionTracker; this.minTransactionGasPrice = minTransactionGasPrice; this.eip1559 = eip1559; this.configuration = configuration; @@ -126,12 +128,19 @@ public class TransactionPool implements BlockAddedObserver { for (final Transaction transaction : localTransactions) { peerTransactionTracker.addToPeerSendQueue(peer, transaction); } - peerPendingTransactionTracker.ifPresent( + maybePeerPendingTransactionTracker.ifPresent( peerPendingTransactionTracker -> { if (peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)) { - final Collection hashes = getNewPooledHashes(); - for (final Hash hash : hashes) { - peerPendingTransactionTracker.addToPeerSendQueue(peer, hash); + Iterator hashIterator = getNewPooledHashes().iterator(); + while (hashIterator.hasNext()) { + try { + peerPendingTransactionTracker.addToPeerSendQueue(peer, hashIterator.next()); + } catch (ConcurrentModificationException __) { + // The hash got evicted somehow. + // For example, it was pushed out by new pooled transaction hashes or we got the + // full transaction and evicted it manually. + // Either way, we don't care and can continue. + } } } });