Thread Safe Evicting Queue Access (#1498)

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
pull/1501/head
Ratan (Rai) Sur 4 years ago committed by GitHub
parent 52814d537e
commit a01b588a12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactions.java
  2. 44
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java

@ -38,7 +38,6 @@ import java.time.Clock;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -375,8 +374,10 @@ public class PendingTransactions {
} }
} }
Collection<Hash> getNewPooledHashes() { List<Hash> getNewPooledHashes() {
return newPooledHashes; synchronized (newPooledHashes) {
return List.copyOf(newPooledHashes);
}
} }
/** /**

@ -47,10 +47,7 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import java.util.Collection; import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -124,34 +121,19 @@ public class TransactionPool implements BlockAddedObserver {
} }
void handleConnect(final EthPeer peer) { void handleConnect(final EthPeer peer) {
final List<Transaction> localTransactions = getLocalTransactions(); pendingTransactions
for (final Transaction transaction : localTransactions) { .getLocalTransactions()
peerTransactionTracker.addToPeerSendQueue(peer, transaction); .forEach(transaction -> peerTransactionTracker.addToPeerSendQueue(peer, transaction));
}
maybePeerPendingTransactionTracker.ifPresent( maybePeerPendingTransactionTracker
peerPendingTransactionTracker -> { .filter(
if (peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)) { peerPendingTransactionTracker ->
Iterator<Hash> hashIterator = getNewPooledHashes().iterator(); peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65))
while (hashIterator.hasNext()) { .ifPresent(
try { peerPendingTransactionTracker ->
peerPendingTransactionTracker.addToPeerSendQueue(peer, hashIterator.next()); pendingTransactions
} catch (ConcurrentModificationException __) { .getNewPooledHashes()
// The hash got evicted somehow. .forEach(hash -> peerPendingTransactionTracker.addToPeerSendQueue(peer, hash)));
// For example, it was pushed out by new pooled transaction hashes or we got the
// full transaction and evicted it manually.
// Either way, we don't care and can continue.
}
}
}
});
}
private List<Transaction> getLocalTransactions() {
return pendingTransactions.getLocalTransactions();
}
public Collection<Hash> getNewPooledHashes() {
return pendingTransactions.getNewPooledHashes();
} }
public boolean addTransactionHash(final Hash transactionHash) { public boolean addTransactionHash(final Hash transactionHash) {

Loading…
Cancel
Save