@ -22,7 +22,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer ;
import org.hyperledger.besu.ethereum.eth.manager.task.BufferedGetPooledTransactionsFromPeerFetcher ;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage ;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState ;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason ;
import org.hyperledger.besu.ethereum.rlp.RLPException ;
import org.hyperledger.besu.metrics.BesuMetricCategory ;
@ -34,6 +33,7 @@ import java.time.Duration;
import java.time.Instant ;
import java.util.List ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.function.Supplier ;
import java.util.stream.Collectors ;
import org.slf4j.Logger ;
@ -55,6 +55,7 @@ public class NewPooledTransactionHashesMessageProcessor {
private final TransactionPoolConfiguration transactionPoolConfiguration ;
private final EthContext ethContext ;
private final MetricsSystem metricsSystem ;
private final Supplier < Boolean > shouldProcessMessages ;
public NewPooledTransactionHashesMessageProcessor (
final PeerTransactionTracker transactionTracker ,
@ -62,12 +63,13 @@ public class NewPooledTransactionHashesMessageProcessor {
final TransactionPoolConfiguration transactionPoolConfiguration ,
final EthContext ethContext ,
final MetricsSystem metricsSystem ,
final SyncState syncState ) {
final Supplier < Boolean > shouldProcessMessages ) {
this . transactionTracker = transactionTracker ;
this . transactionPool = transactionPool ;
this . transactionPoolConfiguration = transactionPoolConfiguration ;
this . ethContext = ethContext ;
this . metricsSystem = metricsSystem ;
this . shouldProcessMessages = shouldProcessMessages ;
this . totalSkippedNewPooledTransactionHashesMessageCounter =
new RunnableCounter (
metricsSystem . createCounter (
@ -108,25 +110,26 @@ public class NewPooledTransactionHashesMessageProcessor {
incomingTransactionHashes : : size ,
incomingTransactionHashes : : toString ) ;
final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
scheduledTasks . computeIfAbsent (
peer ,
ethPeer - > {
ethContext
. getScheduler ( )
. scheduleFutureTask (
new FetcherCreatorTask ( peer ) ,
transactionPoolConfiguration . getEth65TrxAnnouncedBufferingPeriod ( ) ) ;
return new BufferedGetPooledTransactionsFromPeerFetcher (
ethContext , peer , transactionPool , transactionTracker , metricsSystem ) ;
} ) ;
bufferedTask . addHashes (
incomingTransactionHashes . stream ( )
. filter ( hash - > transactionPool . getTransactionByHash ( hash ) . isEmpty ( ) )
. collect ( Collectors . toList ( ) ) ) ;
if ( shouldProcessMessages . get ( ) ) {
final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
scheduledTasks . computeIfAbsent (
peer ,
ethPeer - > {
ethContext
. getScheduler ( )
. scheduleFutureTask (
new FetcherCreatorTask ( peer ) ,
transactionPoolConfiguration . getEth65TrxAnnouncedBufferingPeriod ( ) ) ;
return new BufferedGetPooledTransactionsFromPeerFetcher (
ethContext , peer , transactionPool , transactionTracker , metricsSystem ) ;
} ) ;
bufferedTask . addHashes (
incomingTransactionHashes . stream ( )
. filter ( hash - > transactionPool . getTransactionByHash ( hash ) . isEmpty ( ) )
. collect ( Collectors . toList ( ) ) ) ;
}
} catch ( final RLPException ex ) {
if ( peer ! = null ) {
LOG . debug (