Avoid to process transactions during initial sync (#4457)

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/4467/head
Fabio Di Fabio 2 years ago committed by GitHub
parent 64bf83cfeb
commit 10e72bb99d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  3. 2
      besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java
  4. 44
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java
  5. 57
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
  6. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java
  7. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  8. 19
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java
  9. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java
  10. 187
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
  11. 2
      ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java

@ -3,6 +3,7 @@
## 22.7.5 ## 22.7.5
### Additions and Improvements ### Additions and Improvements
- Avoid sending added block events to transaction pool, and processing incoming transactions during initial sync [#4457](https://github.com/hyperledger/besu/pull/4457)
- When building a new proposal, keep the best block built until now instead of the last one [#4455](https://github.com/hyperledger/besu/pull/4455) - When building a new proposal, keep the best block built until now instead of the last one [#4455](https://github.com/hyperledger/besu/pull/4455)
### Bug Fixes ### Bug Fixes

@ -386,7 +386,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
ethContext, ethContext,
clock, clock,
metricsSystem, metricsSystem,
syncState::isInitialSyncPhaseDone, syncState,
miningParameters, miningParameters,
transactionPoolConfiguration); transactionPoolConfiguration);

@ -147,7 +147,7 @@ public class BesuEventsImplTest {
mockEthContext, mockEthContext,
TestClock.system(ZoneId.systemDefault()), TestClock.system(ZoneId.systemDefault()),
new NoOpMetricsSystem(), new NoOpMetricsSystem(),
syncState::isInitialSyncPhaseDone, syncState,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
txPoolConfig); txPoolConfig);

@ -33,7 +33,6 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -55,21 +54,18 @@ public class NewPooledTransactionHashesMessageProcessor {
private final TransactionPoolConfiguration transactionPoolConfiguration; private final TransactionPoolConfiguration transactionPoolConfiguration;
private final EthContext ethContext; private final EthContext ethContext;
private final MetricsSystem metricsSystem; private final MetricsSystem metricsSystem;
private final Supplier<Boolean> shouldProcessMessages;
public NewPooledTransactionHashesMessageProcessor( public NewPooledTransactionHashesMessageProcessor(
final PeerTransactionTracker transactionTracker, final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool, final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration, final TransactionPoolConfiguration transactionPoolConfiguration,
final EthContext ethContext, final EthContext ethContext,
final MetricsSystem metricsSystem, final MetricsSystem metricsSystem) {
final Supplier<Boolean> shouldProcessMessages) {
this.transactionTracker = transactionTracker; this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool; this.transactionPool = transactionPool;
this.transactionPoolConfiguration = transactionPoolConfiguration; this.transactionPoolConfiguration = transactionPoolConfiguration;
this.ethContext = ethContext; this.ethContext = ethContext;
this.metricsSystem = metricsSystem; this.metricsSystem = metricsSystem;
this.shouldProcessMessages = shouldProcessMessages;
this.totalSkippedNewPooledTransactionHashesMessageCounter = this.totalSkippedNewPooledTransactionHashesMessageCounter =
new RunnableCounter( new RunnableCounter(
metricsSystem.createCounter( metricsSystem.createCounter(
@ -110,26 +106,24 @@ public class NewPooledTransactionHashesMessageProcessor {
incomingTransactionHashes::size, incomingTransactionHashes::size,
incomingTransactionHashes::toString); incomingTransactionHashes::toString);
if (shouldProcessMessages.get()) { final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask = scheduledTasks.computeIfAbsent(
scheduledTasks.computeIfAbsent( peer,
peer, ethPeer -> {
ethPeer -> { ethContext
ethContext .getScheduler()
.getScheduler() .scheduleFutureTask(
.scheduleFutureTask( new FetcherCreatorTask(peer),
new FetcherCreatorTask(peer), transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod());
transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod());
return new BufferedGetPooledTransactionsFromPeerFetcher(
return new BufferedGetPooledTransactionsFromPeerFetcher( ethContext, peer, transactionPool, transactionTracker, metricsSystem);
ethContext, peer, transactionPool, transactionTracker, metricsSystem); });
});
bufferedTask.addHashes(
bufferedTask.addHashes( incomingTransactionHashes.stream()
incomingTransactionHashes.stream() .filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
.filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty()) .collect(Collectors.toList()));
.collect(Collectors.toList()));
}
} catch (final RLPException ex) { } catch (final RLPException ex) {
if (peer != null) { if (peer != null) {
LOG.debug( LOG.debug(

@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65; import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter; import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter; import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
@ -28,9 +29,12 @@ import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.time.Clock; import java.time.Clock;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionPoolFactory { public class TransactionPoolFactory {
private static final Logger LOG = LoggerFactory.getLogger(TransactionPoolFactory.class);
public static TransactionPool createTransactionPool( public static TransactionPool createTransactionPool(
final ProtocolSchedule protocolSchedule, final ProtocolSchedule protocolSchedule,
@ -38,7 +42,7 @@ public class TransactionPoolFactory {
final EthContext ethContext, final EthContext ethContext,
final Clock clock, final Clock clock,
final MetricsSystem metricsSystem, final MetricsSystem metricsSystem,
final Supplier<Boolean> shouldProcessTransactions, final SyncState syncState,
final MiningParameters miningParameters, final MiningParameters miningParameters,
final TransactionPoolConfiguration transactionPoolConfiguration) { final TransactionPoolConfiguration transactionPoolConfiguration) {
@ -58,7 +62,7 @@ public class TransactionPoolFactory {
protocolContext, protocolContext,
ethContext, ethContext,
metricsSystem, metricsSystem,
shouldProcessTransactions, syncState,
miningParameters, miningParameters,
transactionPoolConfiguration, transactionPoolConfiguration,
pendingTransactions, pendingTransactions,
@ -72,7 +76,7 @@ public class TransactionPoolFactory {
final ProtocolContext protocolContext, final ProtocolContext protocolContext,
final EthContext ethContext, final EthContext ethContext,
final MetricsSystem metricsSystem, final MetricsSystem metricsSystem,
final Supplier<Boolean> shouldProcessTransactions, final SyncState syncState,
final MiningParameters miningParameters, final MiningParameters miningParameters,
final TransactionPoolConfiguration transactionPoolConfiguration, final TransactionPoolConfiguration transactionPoolConfiguration,
final AbstractPendingTransactionsSorter pendingTransactions, final AbstractPendingTransactionsSorter pendingTransactions,
@ -100,7 +104,7 @@ public class TransactionPoolFactory {
ethContext.getScheduler(), ethContext.getScheduler(),
new TransactionsMessageProcessor(transactionTracker, transactionPool, metricsSystem), new TransactionsMessageProcessor(transactionTracker, transactionPool, metricsSystem),
transactionPoolConfiguration.getTxMessageKeepAliveSeconds()); transactionPoolConfiguration.getTxMessageKeepAliveSeconds());
ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler);
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler = final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler =
new NewPooledTransactionHashesMessageHandler( new NewPooledTransactionHashesMessageHandler(
ethContext.getScheduler(), ethContext.getScheduler(),
@ -109,16 +113,47 @@ public class TransactionPoolFactory {
transactionPool, transactionPool,
transactionPoolConfiguration, transactionPoolConfiguration,
ethContext, ethContext,
metricsSystem, metricsSystem),
shouldProcessTransactions),
transactionPoolConfiguration.getTxMessageKeepAliveSeconds()); transactionPoolConfiguration.getTxMessageKeepAliveSeconds());
if (syncState.isInitialSyncPhaseDone()) {
enableTransactionPool(
protocolContext,
ethContext,
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
} else {
syncState.subscribeCompletionReached(
() -> {
enableTransactionPool(
protocolContext,
ethContext,
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
});
}
return transactionPool;
}
private static void enableTransactionPool(
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionsMessageHandler transactionsMessageHandler,
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
LOG.info("Enabling transaction pool");
ethContext.getEthPeers().subscribeDisconnect(transactionTracker);
protocolContext.getBlockchain().observeBlockAdded(transactionPool);
ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler);
ethContext ethContext
.getEthMessages() .getEthMessages()
.subscribe(EthPV65.NEW_POOLED_TRANSACTION_HASHES, pooledTransactionsMessageHandler); .subscribe(EthPV65.NEW_POOLED_TRANSACTION_HASHES, pooledTransactionsMessageHandler);
protocolContext.getBlockchain().observeBlockAdded(transactionPool);
ethContext.getEthPeers().subscribeDisconnect(transactionTracker);
return transactionPool;
} }
private static AbstractPendingTransactionsSorter createPendingTransactionsSorter( private static AbstractPendingTransactionsSorter createPendingTransactionsSorter(

@ -56,6 +56,7 @@ import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage; import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage; import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
@ -1084,7 +1085,7 @@ public final class EthProtocolManagerTest {
ethManager.ethContext(), ethManager.ethContext(),
TestClock.system(ZoneId.systemDefault()), TestClock.system(ZoneId.systemDefault()),
metricsSystem, metricsSystem,
() -> true, new SyncState(blockchain, ethManager.ethContext().getEthPeers()),
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
TransactionPoolConfiguration.DEFAULT); TransactionPoolConfiguration.DEFAULT);

@ -107,7 +107,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
ethContext, ethContext,
TestClock.system(ZoneId.systemDefault()), TestClock.system(ZoneId.systemDefault()),
metricsSystem, metricsSystem,
syncState::isInitialSyncPhaseDone, syncState,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(), new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(),
TransactionPoolConfiguration.DEFAULT); TransactionPoolConfiguration.DEFAULT);
ethProtocolManager = ethProtocolManager =

@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor.FetcherCreatorTask; import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor.FetcherCreatorTask;
import org.hyperledger.besu.metrics.StubMetricsSystem; import org.hyperledger.besu.metrics.StubMetricsSystem;
@ -57,7 +56,6 @@ public class NewPooledTransactionHashesMessageProcessorTest {
@Mock private EthPeer peer1; @Mock private EthPeer peer1;
@Mock private EthContext ethContext; @Mock private EthContext ethContext;
@Mock private EthScheduler ethScheduler; @Mock private EthScheduler ethScheduler;
@Mock private SyncState syncState;
private final BlockDataGenerator generator = new BlockDataGenerator(); private final BlockDataGenerator generator = new BlockDataGenerator();
private final Hash hash1 = generator.transaction().getHash(); private final Hash hash1 = generator.transaction().getHash();
@ -72,15 +70,13 @@ public class NewPooledTransactionHashesMessageProcessorTest {
metricsSystem = new StubMetricsSystem(); metricsSystem = new StubMetricsSystem();
when(transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod()) when(transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod())
.thenReturn(Duration.ofMillis(500)); .thenReturn(Duration.ofMillis(500));
when(syncState.isInitialSyncPhaseDone()).thenReturn(true);
messageHandler = messageHandler =
new NewPooledTransactionHashesMessageProcessor( new NewPooledTransactionHashesMessageProcessor(
transactionTracker, transactionTracker,
transactionPool, transactionPool,
transactionPoolConfiguration, transactionPoolConfiguration,
ethContext, ethContext,
metricsSystem, metricsSystem);
syncState::isInitialSyncPhaseDone);
when(ethContext.getScheduler()).thenReturn(ethScheduler); when(ethContext.getScheduler()).thenReturn(ethScheduler);
} }
@ -187,17 +183,4 @@ public class NewPooledTransactionHashesMessageProcessorTest {
verify(ethScheduler, times(1)) verify(ethScheduler, times(1))
.scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class)); .scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class));
} }
@Test
public void shouldNotAddTransactionsWhenDisabled() {
when(syncState.isInitialSyncPhaseDone()).thenReturn(false);
messageHandler.processNewPooledTransactionHashesMessage(
peer1,
NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)),
now(),
ofMinutes(1));
verifyNoInteractions(transactionPool);
}
} }

@ -123,6 +123,7 @@ public class TestNode implements Closeable {
final SyncState syncState = mock(SyncState.class); final SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true); when(syncState.isInSync(anyLong())).thenReturn(true);
when(syncState.isInitialSyncPhaseDone()).thenReturn(true);
final EthMessages ethMessages = new EthMessages(); final EthMessages ethMessages = new EthMessages();
@ -144,7 +145,7 @@ public class TestNode implements Closeable {
ethContext, ethContext,
TestClock.system(ZoneId.systemDefault()), TestClock.system(ZoneId.systemDefault()),
metricsSystem, metricsSystem,
syncState::isInitialSyncPhaseDone, syncState,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
TransactionPoolConfiguration.DEFAULT); TransactionPoolConfiguration.DEFAULT);

@ -15,29 +15,31 @@
package org.hyperledger.besu.ethereum.eth.transactions; package org.hyperledger.besu.ethereum.eth.transactions;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.ForkIdManager; import org.hyperledger.besu.ethereum.eth.manager.ForkIdManager;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
@ -49,47 +51,173 @@ import java.math.BigInteger;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@SuppressWarnings("unchecked") @RunWith(MockitoJUnitRunner.class)
public class TransactionPoolFactoryTest { public class TransactionPoolFactoryTest {
@Mock ProtocolSchedule schedule;
@Mock ProtocolContext context;
@Mock MutableBlockchain blockchain;
@Mock EthContext ethContext;
@Mock EthMessages ethMessages;
@Mock EthScheduler ethScheduler;
@Test @Mock GasPricePendingTransactionsSorter pendingTransactions;
public void testDisconnect() { @Mock PeerTransactionTracker peerTransactionTracker;
final ProtocolSchedule schedule = mock(ProtocolSchedule.class); @Mock TransactionsMessageSender transactionsMessageSender;
final ProtocolContext context = mock(ProtocolContext.class);
final MutableBlockchain blockchain = mock(MutableBlockchain.class); @Mock NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
TransactionPool pool;
EthPeers ethPeers;
SyncState syncState;
EthProtocolManager ethProtocolManager;
when(blockchain.getBlockByNumber(anyLong())).thenReturn(Optional.of(mock(Block.class))); @Before
public void setup() {
when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(mock(Hash.class))); when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(mock(Hash.class)));
when(context.getBlockchain()).thenReturn(blockchain); when(context.getBlockchain()).thenReturn(blockchain);
final EthPeers ethPeers = ethPeers =
new EthPeers( new EthPeers(
"ETH", "ETH",
TestClock.fixed(), TestClock.fixed(),
new NoOpMetricsSystem(), new NoOpMetricsSystem(),
25, 25,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE); EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE);
final EthContext ethContext = mock(EthContext.class); when(ethContext.getEthMessages()).thenReturn(ethMessages);
when(ethContext.getEthMessages()).thenReturn(mock(EthMessages.class));
when(ethContext.getEthPeers()).thenReturn(ethPeers); when(ethContext.getEthPeers()).thenReturn(ethPeers);
final EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethContext.getScheduler()).thenReturn(ethScheduler); when(ethContext.getScheduler()).thenReturn(ethScheduler);
final GasPricePendingTransactionsSorter pendingTransactions = }
mock(GasPricePendingTransactionsSorter.class);
final PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class); @Test
final TransactionsMessageSender transactionsMessageSender = public void disconnectNotInvokedBeforeInitialSyncIsDone() {
mock(TransactionsMessageSender.class); setupInitialSyncPhase(true);
doNothing().when(transactionsMessageSender).sendTransactionsToPeer(any(EthPeer.class)); final RespondingEthPeer ethPeer =
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender = RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();
mock(NewPooledTransactionHashesMessageSender.class); assertThat(ethPeer.getEthPeer()).isNotNull();
final TransactionPool pool = assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse();
ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING);
verifyNoInteractions(peerTransactionTracker);
}
@Test
public void disconnectInvokedAfterInitialSyncIsDone() {
setupInitialSyncPhase(true);
final RespondingEthPeer ethPeer =
RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();
assertThat(ethPeer.getEthPeer()).isNotNull();
assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse();
syncState.markInitialSyncPhaseAsDone();
ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING);
verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer());
}
@Test
public void disconnectInvokedIfNoInitialSync() {
setupInitialSyncPhase(false);
final RespondingEthPeer ethPeer =
RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();
assertThat(ethPeer.getEthPeer()).isNotNull();
assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse();
ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING);
verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer());
}
@Test
public void notRegisteredToBlockAddedEventBeforeInitialSyncIsDone() {
setupInitialSyncPhase(true);
ArgumentCaptor<BlockAddedObserver> blockAddedListeners =
ArgumentCaptor.forClass(BlockAddedObserver.class);
verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture());
assertThat(blockAddedListeners.getAllValues()).doesNotContain(pool);
}
@Test
public void registeredToBlockAddedEventAfterInitialSyncIsDone() {
setupInitialSyncPhase(true);
syncState.markInitialSyncPhaseAsDone();
ArgumentCaptor<BlockAddedObserver> blockAddedListeners =
ArgumentCaptor.forClass(BlockAddedObserver.class);
verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture());
assertThat(blockAddedListeners.getAllValues()).contains(pool);
}
@Test
public void registeredToBlockAddedEventIfNoInitialSync() {
setupInitialSyncPhase(false);
ArgumentCaptor<BlockAddedObserver> blockAddedListeners =
ArgumentCaptor.forClass(BlockAddedObserver.class);
verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture());
assertThat(blockAddedListeners.getAllValues()).contains(pool);
}
@Test
public void incomingTransactionMessageHandlersNotRegisteredBeforeInitialSyncIsDone() {
setupInitialSyncPhase(true);
ArgumentCaptor<EthMessages.MessageCallback> messageHandlers =
ArgumentCaptor.forClass(EthMessages.MessageCallback.class);
verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture());
assertThat(messageHandlers.getAllValues())
.doesNotHaveAnyElementsOfTypes(
TransactionsMessageHandler.class, NewPooledTransactionHashesMessageHandler.class);
}
@Test
public void incomingTransactionMessageHandlersRegisteredAfterInitialSyncIsDone() {
setupInitialSyncPhase(true);
syncState.markInitialSyncPhaseAsDone();
ArgumentCaptor<EthMessages.MessageCallback> messageHandlers =
ArgumentCaptor.forClass(EthMessages.MessageCallback.class);
verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture());
assertThat(messageHandlers.getAllValues())
.hasAtLeastOneElementOfType(TransactionsMessageHandler.class);
assertThat(messageHandlers.getAllValues())
.hasAtLeastOneElementOfType(NewPooledTransactionHashesMessageHandler.class);
}
@Test
public void incomingTransactionMessageHandlersRegisteredIfNoInitialSync() {
setupInitialSyncPhase(false);
ArgumentCaptor<EthMessages.MessageCallback> messageHandlers =
ArgumentCaptor.forClass(EthMessages.MessageCallback.class);
verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture());
assertThat(messageHandlers.getAllValues())
.hasAtLeastOneElementOfType(TransactionsMessageHandler.class);
assertThat(messageHandlers.getAllValues())
.hasAtLeastOneElementOfType(NewPooledTransactionHashesMessageHandler.class);
}
private void setupInitialSyncPhase(final boolean hasInitialSyncPhase) {
syncState = new SyncState(blockchain, ethPeers, hasInitialSyncPhase, Optional.empty());
pool =
TransactionPoolFactory.createTransactionPool( TransactionPoolFactory.createTransactionPool(
schedule, schedule,
context, context,
ethContext, ethContext,
new NoOpMetricsSystem(), new NoOpMetricsSystem(),
() -> true, syncState,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(), new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(),
ImmutableTransactionPoolConfiguration.builder() ImmutableTransactionPoolConfiguration.builder()
.txPoolMaxSize(1) .txPoolMaxSize(1)
@ -101,7 +229,7 @@ public class TransactionPoolFactoryTest {
transactionsMessageSender, transactionsMessageSender,
newPooledTransactionHashesMessageSender); newPooledTransactionHashesMessageSender);
final EthProtocolManager ethProtocolManager = ethProtocolManager =
new EthProtocolManager( new EthProtocolManager(
blockchain, blockchain,
BigInteger.ONE, BigInteger.ONE,
@ -116,12 +244,5 @@ public class TransactionPoolFactoryTest {
true, true,
mock(EthScheduler.class), mock(EthScheduler.class),
mock(ForkIdManager.class)); mock(ForkIdManager.class));
final RespondingEthPeer ethPeer =
RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build();
assertThat(ethPeer.getEthPeer()).isNotNull();
assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse();
ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING);
verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer());
} }
} }

@ -213,7 +213,7 @@ public class RetestethContext {
ethContext, ethContext,
retestethClock, retestethClock,
metricsSystem, metricsSystem,
syncState::isInitialSyncPhaseDone, syncState,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(), new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
transactionPoolConfiguration); transactionPoolConfiguration);

Loading…
Cancel
Save