diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cfb98c9c3..c44f403c87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 22.7.5 ### Additions and Improvements +- Avoid sending added block events to transaction pool, and processing incoming transactions during initial sync [#4457](https://github.com/hyperledger/besu/pull/4457) - When building a new proposal, keep the best block built until now instead of the last one [#4455](https://github.com/hyperledger/besu/pull/4455) ### Bug Fixes diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index f804ad23de..ad97be510a 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -386,7 +386,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides ethContext, clock, metricsSystem, - syncState::isInitialSyncPhaseDone, + syncState, miningParameters, transactionPoolConfiguration); diff --git a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java index ab08f2010e..21d837ca3b 100644 --- a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java +++ b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java @@ -147,7 +147,7 @@ public class BesuEventsImplTest { mockEthContext, TestClock.system(ZoneId.systemDefault()), new NoOpMetricsSystem(), - syncState::isInitialSyncPhaseDone, + syncState, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), txPoolConfig); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java index e0e00dd4aa..7fc5bd5406 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java @@ -33,7 +33,6 @@ import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -55,21 +54,18 @@ public class NewPooledTransactionHashesMessageProcessor { private final TransactionPoolConfiguration transactionPoolConfiguration; private final EthContext ethContext; private final MetricsSystem metricsSystem; - private final Supplier shouldProcessMessages; public NewPooledTransactionHashesMessageProcessor( final PeerTransactionTracker transactionTracker, final TransactionPool transactionPool, final TransactionPoolConfiguration transactionPoolConfiguration, final EthContext ethContext, - final MetricsSystem metricsSystem, - final Supplier shouldProcessMessages) { + final MetricsSystem metricsSystem) { this.transactionTracker = transactionTracker; this.transactionPool = transactionPool; this.transactionPoolConfiguration = transactionPoolConfiguration; this.ethContext = ethContext; this.metricsSystem = metricsSystem; - this.shouldProcessMessages = shouldProcessMessages; this.totalSkippedNewPooledTransactionHashesMessageCounter = new RunnableCounter( metricsSystem.createCounter( @@ -110,26 +106,24 @@ public class NewPooledTransactionHashesMessageProcessor { incomingTransactionHashes::size, incomingTransactionHashes::toString); - if (shouldProcessMessages.get()) { - final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask = - scheduledTasks.computeIfAbsent( - peer, - ethPeer -> { - ethContext - .getScheduler() - .scheduleFutureTask( - new FetcherCreatorTask(peer), - transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod()); - - return new BufferedGetPooledTransactionsFromPeerFetcher( - ethContext, peer, transactionPool, transactionTracker, metricsSystem); - }); - - bufferedTask.addHashes( - incomingTransactionHashes.stream() - .filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty()) - .collect(Collectors.toList())); - } + final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask = + scheduledTasks.computeIfAbsent( + peer, + ethPeer -> { + ethContext + .getScheduler() + .scheduleFutureTask( + new FetcherCreatorTask(peer), + transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod()); + + return new BufferedGetPooledTransactionsFromPeerFetcher( + ethContext, peer, transactionPool, transactionTracker, metricsSystem); + }); + + bufferedTask.addHashes( + incomingTransactionHashes.stream() + .filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty()) + .collect(Collectors.toList())); } catch (final RLPException ex) { if (peer != null) { LOG.debug( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index 02a01f8570..b2aea31c7a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.EthPV65; +import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter; import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter; import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; @@ -28,9 +29,12 @@ import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.time.Clock; -import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TransactionPoolFactory { + private static final Logger LOG = LoggerFactory.getLogger(TransactionPoolFactory.class); public static TransactionPool createTransactionPool( final ProtocolSchedule protocolSchedule, @@ -38,7 +42,7 @@ public class TransactionPoolFactory { final EthContext ethContext, final Clock clock, final MetricsSystem metricsSystem, - final Supplier shouldProcessTransactions, + final SyncState syncState, final MiningParameters miningParameters, final TransactionPoolConfiguration transactionPoolConfiguration) { @@ -58,7 +62,7 @@ public class TransactionPoolFactory { protocolContext, ethContext, metricsSystem, - shouldProcessTransactions, + syncState, miningParameters, transactionPoolConfiguration, pendingTransactions, @@ -72,7 +76,7 @@ public class TransactionPoolFactory { final ProtocolContext protocolContext, final EthContext ethContext, final MetricsSystem metricsSystem, - final Supplier shouldProcessTransactions, + final SyncState syncState, final MiningParameters miningParameters, final TransactionPoolConfiguration transactionPoolConfiguration, final AbstractPendingTransactionsSorter pendingTransactions, @@ -100,7 +104,7 @@ public class TransactionPoolFactory { ethContext.getScheduler(), new TransactionsMessageProcessor(transactionTracker, transactionPool, metricsSystem), transactionPoolConfiguration.getTxMessageKeepAliveSeconds()); - ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler); + final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler = new NewPooledTransactionHashesMessageHandler( ethContext.getScheduler(), @@ -109,16 +113,47 @@ public class TransactionPoolFactory { transactionPool, transactionPoolConfiguration, ethContext, - metricsSystem, - shouldProcessTransactions), + metricsSystem), transactionPoolConfiguration.getTxMessageKeepAliveSeconds()); + + if (syncState.isInitialSyncPhaseDone()) { + enableTransactionPool( + protocolContext, + ethContext, + transactionTracker, + transactionPool, + transactionsMessageHandler, + pooledTransactionsMessageHandler); + } else { + syncState.subscribeCompletionReached( + () -> { + enableTransactionPool( + protocolContext, + ethContext, + transactionTracker, + transactionPool, + transactionsMessageHandler, + pooledTransactionsMessageHandler); + }); + } + + return transactionPool; + } + + private static void enableTransactionPool( + final ProtocolContext protocolContext, + final EthContext ethContext, + final PeerTransactionTracker transactionTracker, + final TransactionPool transactionPool, + final TransactionsMessageHandler transactionsMessageHandler, + final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) { + LOG.info("Enabling transaction pool"); + ethContext.getEthPeers().subscribeDisconnect(transactionTracker); + protocolContext.getBlockchain().observeBlockAdded(transactionPool); + ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler); ethContext .getEthMessages() .subscribe(EthPV65.NEW_POOLED_TRANSACTION_HASHES, pooledTransactionsMessageHandler); - - protocolContext.getBlockchain().observeBlockAdded(transactionPool); - ethContext.getEthPeers().subscribeDisconnect(transactionTracker); - return transactionPool; } private static AbstractPendingTransactionsSorter createPendingTransactionsSorter( diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java index aabeae8aa1..0c753597b4 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java @@ -56,6 +56,7 @@ import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage; import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; import org.hyperledger.besu.ethereum.eth.messages.StatusMessage; import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage; +import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory; @@ -1084,7 +1085,7 @@ public final class EthProtocolManagerTest { ethManager.ethContext(), TestClock.system(ZoneId.systemDefault()), metricsSystem, - () -> true, + new SyncState(blockchain, ethManager.ethContext().getEthPeers()), new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), TransactionPoolConfiguration.DEFAULT); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java index 18b9f3aae4..9ab9b1eef2 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java @@ -107,7 +107,7 @@ public abstract class AbstractMessageTaskTest { ethContext, TestClock.system(ZoneId.systemDefault()), metricsSystem, - syncState::isInitialSyncPhaseDone, + syncState, new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(), TransactionPoolConfiguration.DEFAULT); ethProtocolManager = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java index 3545824c0a..c4b7914c68 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java @@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; -import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor.FetcherCreatorTask; import org.hyperledger.besu.metrics.StubMetricsSystem; @@ -57,7 +56,6 @@ public class NewPooledTransactionHashesMessageProcessorTest { @Mock private EthPeer peer1; @Mock private EthContext ethContext; @Mock private EthScheduler ethScheduler; - @Mock private SyncState syncState; private final BlockDataGenerator generator = new BlockDataGenerator(); private final Hash hash1 = generator.transaction().getHash(); @@ -72,15 +70,13 @@ public class NewPooledTransactionHashesMessageProcessorTest { metricsSystem = new StubMetricsSystem(); when(transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod()) .thenReturn(Duration.ofMillis(500)); - when(syncState.isInitialSyncPhaseDone()).thenReturn(true); messageHandler = new NewPooledTransactionHashesMessageProcessor( transactionTracker, transactionPool, transactionPoolConfiguration, ethContext, - metricsSystem, - syncState::isInitialSyncPhaseDone); + metricsSystem); when(ethContext.getScheduler()).thenReturn(ethScheduler); } @@ -187,17 +183,4 @@ public class NewPooledTransactionHashesMessageProcessorTest { verify(ethScheduler, times(1)) .scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class)); } - - @Test - public void shouldNotAddTransactionsWhenDisabled() { - - when(syncState.isInitialSyncPhaseDone()).thenReturn(false); - messageHandler.processNewPooledTransactionHashesMessage( - peer1, - NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)), - now(), - ofMinutes(1)); - - verifyNoInteractions(transactionPool); - } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java index 515315c4e3..bc8e1db783 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java @@ -123,6 +123,7 @@ public class TestNode implements Closeable { final SyncState syncState = mock(SyncState.class); when(syncState.isInSync(anyLong())).thenReturn(true); + when(syncState.isInitialSyncPhaseDone()).thenReturn(true); final EthMessages ethMessages = new EthMessages(); @@ -144,7 +145,7 @@ public class TestNode implements Closeable { ethContext, TestClock.system(ZoneId.systemDefault()), metricsSystem, - syncState::isInitialSyncPhaseDone, + syncState, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), TransactionPoolConfiguration.DEFAULT); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java index 525a5b219e..7d2fa4f88c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java @@ -15,29 +15,31 @@ package org.hyperledger.besu.ethereum.eth.transactions; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.ProtocolContext; +import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; -import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthMessages; -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.ForkIdManager; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; @@ -49,47 +51,173 @@ import java.math.BigInteger; import java.util.Collections; import java.util.Optional; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; -@SuppressWarnings("unchecked") +@RunWith(MockitoJUnitRunner.class) public class TransactionPoolFactoryTest { + @Mock ProtocolSchedule schedule; + @Mock ProtocolContext context; + @Mock MutableBlockchain blockchain; + @Mock EthContext ethContext; + @Mock EthMessages ethMessages; + @Mock EthScheduler ethScheduler; - @Test - public void testDisconnect() { - final ProtocolSchedule schedule = mock(ProtocolSchedule.class); - final ProtocolContext context = mock(ProtocolContext.class); - final MutableBlockchain blockchain = mock(MutableBlockchain.class); + @Mock GasPricePendingTransactionsSorter pendingTransactions; + @Mock PeerTransactionTracker peerTransactionTracker; + @Mock TransactionsMessageSender transactionsMessageSender; + + @Mock NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender; + + TransactionPool pool; + EthPeers ethPeers; + + SyncState syncState; + + EthProtocolManager ethProtocolManager; - when(blockchain.getBlockByNumber(anyLong())).thenReturn(Optional.of(mock(Block.class))); + @Before + public void setup() { when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(mock(Hash.class))); when(context.getBlockchain()).thenReturn(blockchain); - final EthPeers ethPeers = + ethPeers = new EthPeers( "ETH", TestClock.fixed(), new NoOpMetricsSystem(), 25, EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE); - final EthContext ethContext = mock(EthContext.class); - when(ethContext.getEthMessages()).thenReturn(mock(EthMessages.class)); + when(ethContext.getEthMessages()).thenReturn(ethMessages); when(ethContext.getEthPeers()).thenReturn(ethPeers); - final EthScheduler ethScheduler = mock(EthScheduler.class); + when(ethContext.getScheduler()).thenReturn(ethScheduler); - final GasPricePendingTransactionsSorter pendingTransactions = - mock(GasPricePendingTransactionsSorter.class); - final PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class); - final TransactionsMessageSender transactionsMessageSender = - mock(TransactionsMessageSender.class); - doNothing().when(transactionsMessageSender).sendTransactionsToPeer(any(EthPeer.class)); - final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender = - mock(NewPooledTransactionHashesMessageSender.class); - final TransactionPool pool = + } + + @Test + public void disconnectNotInvokedBeforeInitialSyncIsDone() { + setupInitialSyncPhase(true); + final RespondingEthPeer ethPeer = + RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); + assertThat(ethPeer.getEthPeer()).isNotNull(); + assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); + ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); + verifyNoInteractions(peerTransactionTracker); + } + + @Test + public void disconnectInvokedAfterInitialSyncIsDone() { + setupInitialSyncPhase(true); + final RespondingEthPeer ethPeer = + RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); + assertThat(ethPeer.getEthPeer()).isNotNull(); + assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); + + syncState.markInitialSyncPhaseAsDone(); + + ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); + verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer()); + } + + @Test + public void disconnectInvokedIfNoInitialSync() { + setupInitialSyncPhase(false); + final RespondingEthPeer ethPeer = + RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); + assertThat(ethPeer.getEthPeer()).isNotNull(); + assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); + + ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); + verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer()); + } + + @Test + public void notRegisteredToBlockAddedEventBeforeInitialSyncIsDone() { + setupInitialSyncPhase(true); + ArgumentCaptor blockAddedListeners = + ArgumentCaptor.forClass(BlockAddedObserver.class); + verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture()); + + assertThat(blockAddedListeners.getAllValues()).doesNotContain(pool); + } + + @Test + public void registeredToBlockAddedEventAfterInitialSyncIsDone() { + setupInitialSyncPhase(true); + syncState.markInitialSyncPhaseAsDone(); + + ArgumentCaptor blockAddedListeners = + ArgumentCaptor.forClass(BlockAddedObserver.class); + verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture()); + + assertThat(blockAddedListeners.getAllValues()).contains(pool); + } + + @Test + public void registeredToBlockAddedEventIfNoInitialSync() { + setupInitialSyncPhase(false); + + ArgumentCaptor blockAddedListeners = + ArgumentCaptor.forClass(BlockAddedObserver.class); + verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture()); + + assertThat(blockAddedListeners.getAllValues()).contains(pool); + } + + @Test + public void incomingTransactionMessageHandlersNotRegisteredBeforeInitialSyncIsDone() { + setupInitialSyncPhase(true); + ArgumentCaptor messageHandlers = + ArgumentCaptor.forClass(EthMessages.MessageCallback.class); + verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture()); + + assertThat(messageHandlers.getAllValues()) + .doesNotHaveAnyElementsOfTypes( + TransactionsMessageHandler.class, NewPooledTransactionHashesMessageHandler.class); + } + + @Test + public void incomingTransactionMessageHandlersRegisteredAfterInitialSyncIsDone() { + setupInitialSyncPhase(true); + syncState.markInitialSyncPhaseAsDone(); + + ArgumentCaptor messageHandlers = + ArgumentCaptor.forClass(EthMessages.MessageCallback.class); + verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture()); + + assertThat(messageHandlers.getAllValues()) + .hasAtLeastOneElementOfType(TransactionsMessageHandler.class); + assertThat(messageHandlers.getAllValues()) + .hasAtLeastOneElementOfType(NewPooledTransactionHashesMessageHandler.class); + } + + @Test + public void incomingTransactionMessageHandlersRegisteredIfNoInitialSync() { + setupInitialSyncPhase(false); + + ArgumentCaptor messageHandlers = + ArgumentCaptor.forClass(EthMessages.MessageCallback.class); + verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture()); + + assertThat(messageHandlers.getAllValues()) + .hasAtLeastOneElementOfType(TransactionsMessageHandler.class); + assertThat(messageHandlers.getAllValues()) + .hasAtLeastOneElementOfType(NewPooledTransactionHashesMessageHandler.class); + } + + private void setupInitialSyncPhase(final boolean hasInitialSyncPhase) { + syncState = new SyncState(blockchain, ethPeers, hasInitialSyncPhase, Optional.empty()); + + pool = TransactionPoolFactory.createTransactionPool( schedule, context, ethContext, new NoOpMetricsSystem(), - () -> true, + syncState, new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(), ImmutableTransactionPoolConfiguration.builder() .txPoolMaxSize(1) @@ -101,7 +229,7 @@ public class TransactionPoolFactoryTest { transactionsMessageSender, newPooledTransactionHashesMessageSender); - final EthProtocolManager ethProtocolManager = + ethProtocolManager = new EthProtocolManager( blockchain, BigInteger.ONE, @@ -116,12 +244,5 @@ public class TransactionPoolFactoryTest { true, mock(EthScheduler.class), mock(ForkIdManager.class)); - - final RespondingEthPeer ethPeer = - RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); - assertThat(ethPeer.getEthPeer()).isNotNull(); - assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); - ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); - verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer()); } } diff --git a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java index fdd6170949..2d31ef0aee 100644 --- a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java +++ b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java @@ -213,7 +213,7 @@ public class RetestethContext { ethContext, retestethClock, metricsSystem, - syncState::isInitialSyncPhaseDone, + syncState, new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), transactionPoolConfiguration);