Improve eth/66 support (#3616)

Currently Besu has a limited support for sending NewPooledTransactionHashes messages, and other aspect related to reduce transactions synchronization traffic, described in the Ethereum Wire Protocol version 66.

Specifically:

    Besu only uses NewPooledTransactionHashes for new local transactions, while it could be extended to any transaction added to the transaction pool
    Besu does not limit the sending of the full transaction messages to a small fraction of the connected peers, and sends the new transaction hashes to all the remaining peers

This PR, extends eth/66 support and does some code refactoring, to remove some reduntant code and rename some classes to identify they are related to the NewPooledTransactionHashes message.

The main changes are:

    Do not have a separate tracker for transaction hashes, since for them we can reuse PeerTransactionTracker, that tracks full transactions exchange history and sending queue with a peer. So PeerPendingTransactionTracker has been removed. --tx-pool-hashes-max-size is now deprecated and has no more effect and it will be removed in a future release.
    When a new peer connects, if it support eth/6[56] then we send all the transaction hashes we have in the pool, otherwise we send the full transactions.
    When new transactions are added to the pool, we send full transactions to peers without eth/6[56] support, or to a small fractions of all peers, and then we send only transaction hashes to the remaining peer that support eth/6[56]. Both transactions and transaction hashes are only sent if not already exchanged with that specific peer.


Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/3629/head
Fabio Di Fabio 3 years ago committed by GitHub
parent 0d182b80ae
commit 6c179ba596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      CHANGELOG.md
  2. 12
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 2
      besu/src/main/java/org/hyperledger/besu/cli/util/CommandLineUtils.java
  4. 8
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  5. 3
      consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueBlockCreatorTest.java
  6. 3
      consensus/clique/src/test/java/org/hyperledger/besu/consensus/clique/blockcreation/CliqueMinerExecutorTest.java
  7. 1
      consensus/ibft/src/integration-test/java/org/hyperledger/besu/consensus/ibft/support/TestContextBuilder.java
  8. 1
      consensus/ibft/src/test/java/org/hyperledger/besu/consensus/ibft/blockcreation/BftBlockCreatorTest.java
  9. 1
      consensus/ibftlegacy/src/test/java/org/hyperledger/besu/consensus/ibftlegacy/blockcreation/BftBlockCreatorTest.java
  10. 1
      consensus/qbft/src/integration-test/java/org/hyperledger/besu/consensus/qbft/support/TestContextBuilder.java
  11. 15
      ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java
  12. 15
      ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java
  13. 2
      ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/BlockTransactionSelectorTest.java
  14. 4
      ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWBlockCreatorTest.java
  15. 2
      ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/PoWMinerExecutorTest.java
  16. 13
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Transaction.java
  17. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java
  18. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java
  19. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java
  20. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java
  21. 35
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSender.java
  22. 101
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTracker.java
  23. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java
  24. 52
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionSender.java
  25. 157
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcaster.java
  26. 48
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java
  27. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java
  28. 34
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
  29. 49
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionSender.java
  30. 21
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessor.java
  31. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageSender.java
  32. 35
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java
  33. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java
  34. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java
  35. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java
  36. 5
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BaseFeePendingTransactionsTest.java
  37. 5
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/GasPricePendingTransactionsTest.java
  38. 21
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java
  39. 35
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java
  40. 124
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PeerPendingTransactionTrackerTest.java
  41. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingMultiTypesTransactionsTest.java
  42. 67
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/PendingTransactionsSenderTest.java
  43. 272
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionBroadcasterTest.java
  44. 17
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
  45. 140
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolTest.java
  46. 6
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageProcessorTest.java

@ -3,10 +3,14 @@
## 22.1.3
### Breaking Changes
- Remove the experimental flag for bonsai tries CLI options '--data-storage-format' and '--bonsai-maximum-back-layers-to-load' [#3578](https://github.com/hyperledger/besu/pull/3578)
- Remove the experimental flag for bonsai tries CLI options `--data-storage-format` and `--bonsai-maximum-back-layers-to-load` [#3578](https://github.com/hyperledger/besu/pull/3578)
### Deprecations
- `--tx-pool-hashes-max-size` is now deprecated and has no more effect and it will be removed in a future release.
### Additions and Improvements
- Tune transaction synchronization parameter to adapt to mainnet traffic [#3610](https://github.com/hyperledger/besu/pull/3610)
- Improve eth/66 support [#3616](https://github.com/hyperledger/besu/pull/3616)
## 22.1.2

@ -21,6 +21,7 @@ import static java.util.Collections.singletonList;
import static org.hyperledger.besu.cli.DefaultCommandValues.getDefaultBesuDataPath;
import static org.hyperledger.besu.cli.config.NetworkName.MAINNET;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPENDENCY_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATED_AND_USELESS_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATION_WARNING_MSG;
import static org.hyperledger.besu.config.experimental.MergeConfigOptions.isMergeEnabled;
import static org.hyperledger.besu.controller.BesuController.DATABASE_PATH;
@ -1117,10 +1118,10 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
names = {"--tx-pool-hashes-max-size"},
paramLabel = MANDATORY_INTEGER_FORMAT_HELP,
description =
"Maximum number of pending transaction hashes that will be kept in the transaction pool (default: ${DEFAULT-VALUE})",
"Deprecated, has not effect. Maximum number of pending transaction hashes that will be kept in the transaction pool",
arity = "1")
private final Integer pooledTransactionHashesSize =
TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS_HASHES;
@SuppressWarnings("unused")
private final Integer pooledTransactionHashesSize = null; // NOSONAR
@Option(
names = {"--tx-pool-retention-hours"},
@ -1826,6 +1827,10 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
"--privacy-onchain-groups-enabled",
"--privacy-flexible-groups-enabled");
}
if (pooledTransactionHashesSize != null) { // NOSONAR
logger.warn(DEPRECATED_AND_USELESS_WARNING_MSG, "--tx-pool-hashes-max-size");
}
}
private void configure() throws Exception {
@ -2651,7 +2656,6 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
return unstableTransactionPoolOptions
.toDomainObject()
.txPoolMaxSize(txPoolMaxSize)
.pooledTransactionHashesSize(pooledTransactionHashesSize)
.pendingTxRetentionPeriod(pendingTxRetentionPeriod)
.priceBump(Percentage.fromInt(priceBump))
.txFeeCap(txFeeCap)

@ -29,6 +29,8 @@ public class CommandLineUtils {
public static final String MULTI_DEPENDENCY_WARNING_MSG =
"{} ignored because none of {} was defined.";
public static final String DEPRECATION_WARNING_MSG = "{} has been deprecated, use {} instead.";
public static final String DEPRECATED_AND_USELESS_WARNING_MSG =
"{} has been deprecated and is now useless, remove it.";
/**
* Check if options are passed that require an option to be true to have any effect and log a

@ -26,6 +26,7 @@ import static org.hyperledger.besu.cli.config.NetworkName.MORDOR;
import static org.hyperledger.besu.cli.config.NetworkName.RINKEBY;
import static org.hyperledger.besu.cli.config.NetworkName.ROPSTEN;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPENDENCY_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATED_AND_USELESS_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATION_WARNING_MSG;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.ENGINE;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.ETH;
@ -3884,6 +3885,13 @@ public class BesuCommandTest extends CommandTestAbstract {
"--privacy-flexible-groups-enabled");
}
@Test
public void txPoolHashesMaxSizeOptionIsDeprecated() {
parseCommand("--tx-pool-hashes-max-size", "1024");
verify(mockLogger).warn(DEPRECATED_AND_USELESS_WARNING_MSG, "--tx-pool-hashes-max-size");
}
@Test
public void flexiblePrivacyGroupEnabledFlagValueIsSet() {
parseCommand(

@ -133,7 +133,6 @@ public class CliqueBlockCreatorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
@ -169,7 +168,6 @@ public class CliqueBlockCreatorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
@ -207,7 +205,6 @@ public class CliqueBlockCreatorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,

@ -96,7 +96,6 @@ public class CliqueMinerExecutorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
CliqueMinerExecutorTest::mockBlockHeader,
@ -141,7 +140,6 @@ public class CliqueMinerExecutorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
CliqueMinerExecutorTest::mockBlockHeader,
@ -186,7 +184,6 @@ public class CliqueMinerExecutorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
CliqueMinerExecutorTest::mockBlockHeader,

@ -334,7 +334,6 @@ public class TestContextBuilder {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
1,
clock,
metricsSystem,
blockChain::getChainHeadHeader,

@ -118,7 +118,6 @@ public class BftBlockCreatorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,

@ -107,7 +107,6 @@ public class BftBlockCreatorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,

@ -439,7 +439,6 @@ public class TestContextBuilder {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
1,
clock,
metricsSystem,
blockChain::getChainHeadHeader,

@ -49,10 +49,8 @@ import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.PeerPendingTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionBroadcaster;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
@ -75,8 +73,7 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class EthGetFilterChangesIntegrationTest {
@Mock private TransactionBatchAddedListener batchAddedListener;
@Mock private TransactionBatchAddedListener pendingBatchAddedListener;
@Mock private TransactionBroadcaster batchAddedListener;
private MutableBlockchain blockchain;
private final String ETH_METHOD = "eth_getFilterChanges";
private final String JSON_RPC_VERSION = "2.0";
@ -86,7 +83,6 @@ public class EthGetFilterChangesIntegrationTest {
private GasPricePendingTransactionsSorter transactions;
private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_HASHES = 5;
private static final KeyPair keyPair = SignatureAlgorithmFactory.getInstance().generateKeyPair();
private final Transaction transaction = createTransaction(1);
private FilterManager filterManager;
@ -101,16 +97,12 @@ public class EthGetFilterChangesIntegrationTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_HASHES,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
TransactionPoolConfiguration.DEFAULT_PRICE_BUMP);
final ProtocolContext protocolContext = executionContext.getProtocolContext();
PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
PeerPendingTransactionTracker peerPendingTransactionTracker =
mock(PeerPendingTransactionTracker.class);
EthContext ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
@ -121,11 +113,8 @@ public class EthGetFilterChangesIntegrationTest {
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
TransactionPoolConfiguration.DEFAULT);

@ -49,10 +49,8 @@ import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.PeerPendingTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionBroadcaster;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
@ -75,8 +73,7 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class EthGetFilterChangesIntegrationTest {
@Mock private TransactionBatchAddedListener batchAddedListener;
@Mock private TransactionBatchAddedListener pendingBatchAddedListener;
@Mock private TransactionBroadcaster batchAddedListener;
private MutableBlockchain blockchain;
private final String ETH_METHOD = "eth_getFilterChanges";
private final String JSON_RPC_VERSION = "2.0";
@ -86,7 +83,6 @@ public class EthGetFilterChangesIntegrationTest {
private BaseFeePendingTransactionsSorter transactions;
private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_HASHES = 5;
private static final KeyPair keyPair = SignatureAlgorithmFactory.getInstance().generateKeyPair();
private final Transaction transaction = createTransaction(1);
private FilterManager filterManager;
@ -101,16 +97,12 @@ public class EthGetFilterChangesIntegrationTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_HASHES,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
TransactionPoolConfiguration.DEFAULT_PRICE_BUMP);
final ProtocolContext protocolContext = executionContext.getProtocolContext();
PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
PeerPendingTransactionTracker peerPendingTransactionTracker =
mock(PeerPendingTransactionTracker.class);
EthContext ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
@ -121,11 +113,8 @@ public class EthGetFilterChangesIntegrationTest {
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
TransactionPoolConfiguration.DEFAULT);

@ -84,7 +84,6 @@ public class BlockTransactionSelectorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
BlockTransactionSelectorTest::mockBlockHeader,
@ -337,7 +336,6 @@ public class BlockTransactionSelectorTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
() -> {

@ -96,7 +96,6 @@ public class PoWBlockCreatorTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,
@ -158,7 +157,6 @@ public class PoWBlockCreatorTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,
@ -215,7 +213,6 @@ public class PoWBlockCreatorTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,
@ -288,7 +285,6 @@ public class PoWBlockCreatorTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,

@ -45,7 +45,6 @@ public class PoWMinerExecutorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
PoWMinerExecutorTest::mockBlockHeader,
@ -75,7 +74,6 @@ public class PoWMinerExecutorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
PoWMinerExecutorTest::mockBlockHeader,

@ -38,9 +38,11 @@ import org.hyperledger.besu.plugin.data.TransactionType;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
@ -680,6 +682,17 @@ public class Transaction
return GoQuorumPrivateTransactionDetector.isGoQuorumPrivateTransactionV(v.get());
}
/**
* Return the list of transaction hashes extracted from the collection of Transaction passed as
* argument
*
* @param transactions a collection of transactions
* @return the list of transaction hashes
*/
public static List<Hash> toHashList(final Collection<Transaction> transactions) {
return transactions.stream().map(Transaction::getHash).collect(Collectors.toUnmodifiableList());
}
private static Bytes32 computeSenderRecoveryHash(
final TransactionType transactionType,
final long nonce,

@ -515,6 +515,11 @@ public class EthPeer {
return connection.getPeerInfo().getNodeId();
}
public boolean hasSupportForMessage(final int messageCode) {
return getAgreedCapabilities().stream()
.anyMatch(cap -> EthProtocol.get().isValidMessageCode(cap.getVersion(), messageCode));
}
@Override
public String toString() {
return String.format("Peer %s...", nodeId().toString().substring(0, 20));

@ -18,7 +18,7 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConf
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor;
import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor;
import java.util.ArrayList;
import java.util.List;
@ -33,11 +33,11 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
private static final int MAX_HASHES = 256;
private final EthPeer peer;
private final PendingTransactionsMessageProcessor processor;
private final NewPooledTransactionHashesMessageProcessor processor;
private final Queue<Hash> txAnnounces;
public BufferedGetPooledTransactionsFromPeerFetcher(
final EthPeer peer, final PendingTransactionsMessageProcessor processor) {
final EthPeer peer, final NewPooledTransactionHashesMessageProcessor processor) {
this.peer = peer;
this.processor = processor;
this.txAnnounces = Queues.synchronizedQueue(EvictingQueue.create(MAX_PENDING_TRANSACTIONS));

@ -24,15 +24,15 @@ import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMess
import java.time.Duration;
import java.time.Instant;
class PendingTransactionsMessageHandler implements EthMessages.MessageCallback {
class NewPooledTransactionHashesMessageHandler implements EthMessages.MessageCallback {
private final PendingTransactionsMessageProcessor transactionsMessageProcessor;
private final NewPooledTransactionHashesMessageProcessor transactionsMessageProcessor;
private final EthScheduler scheduler;
private final Duration txMsgKeepAlive;
public PendingTransactionsMessageHandler(
public NewPooledTransactionHashesMessageHandler(
final EthScheduler scheduler,
final PendingTransactionsMessageProcessor transactionsMessageProcessor,
final NewPooledTransactionHashesMessageProcessor transactionsMessageProcessor,
final int txMsgKeepAliveSeconds) {
this.scheduler = scheduler;
this.transactionsMessageProcessor = transactionsMessageProcessor;

@ -37,18 +37,18 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PendingTransactionsMessageProcessor {
public class NewPooledTransactionHashesMessageProcessor {
private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000;
private static final long SYNC_TOLERANCE = 100L;
private static final Logger LOG =
LoggerFactory.getLogger(PendingTransactionsMessageProcessor.class);
LoggerFactory.getLogger(NewPooledTransactionHashesMessageProcessor.class);
private final ConcurrentHashMap<EthPeer, BufferedGetPooledTransactionsFromPeerFetcher>
scheduledTasks;
private final PeerPendingTransactionTracker transactionTracker;
private final PeerTransactionTracker transactionTracker;
private final Counter totalSkippedTransactionsMessageCounter;
private final TransactionPool transactionPool;
private final TransactionPoolConfiguration transactionPoolConfiguration;
@ -56,8 +56,8 @@ public class PendingTransactionsMessageProcessor {
private final MetricsSystem metricsSystem;
private final SyncState syncState;
public PendingTransactionsMessageProcessor(
final PeerPendingTransactionTracker transactionTracker,
public NewPooledTransactionHashesMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration,
final Counter metricsCounter,
@ -86,7 +86,7 @@ public class PendingTransactionsMessageProcessor {
final NewPooledTransactionHashesMessage transactionsMessage,
final Instant startedAt,
final Duration keepAlive) {
// Check if message not expired.
// Check if message is not expired.
if (startedAt.plus(keepAlive).isAfter(now())) {
this.processNewPooledTransactionHashesMessage(peer, transactionsMessage);
} else {
@ -99,6 +99,7 @@ public class PendingTransactionsMessageProcessor {
final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) {
try {
final List<Hash> incomingTransactionHashes = transactionsMessage.pendingTransactions();
transactionTracker.markTransactionHashesAsSeen(peer, incomingTransactionHashes);
traceLambda(
LOG,
@ -107,8 +108,6 @@ public class PendingTransactionsMessageProcessor {
incomingTransactionHashes::size,
incomingTransactionHashes::toString);
transactionTracker.markTransactionsHashesAsSeen(
peer, transactionsMessage.pendingTransactions());
if (syncState.isInSync(SYNC_TOLERANCE)) {
final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
scheduledTasks.computeIfAbsent(
@ -122,9 +121,8 @@ public class PendingTransactionsMessageProcessor {
return new BufferedGetPooledTransactionsFromPeerFetcher(peer, this);
});
for (final Hash hash : transactionsMessage.pendingTransactions()) {
if (transactionPool.getTransactionByHash(hash).isEmpty()
&& transactionPool.addTransactionHash(hash)) {
for (final Hash hash : incomingTransactionHashes) {
if (transactionPool.getTransactionByHash(hash).isEmpty()) {
bufferedTask.addHash(hash);
}
}

@ -14,50 +14,47 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import java.util.List;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class PendingTransactionsMessageSender {
private static final Logger LOG = LoggerFactory.getLogger(PendingTransactionsMessageSender.class);
class NewPooledTransactionHashesMessageSender {
private static final Logger LOG =
LoggerFactory.getLogger(NewPooledTransactionHashesMessageSender.class);
private static final int MAX_TRANSACTIONS_HASHES = 4096;
private final PeerPendingTransactionTracker transactionTracker;
private final PeerTransactionTracker transactionTracker;
public PendingTransactionsMessageSender(final PeerPendingTransactionTracker transactionTracker) {
public NewPooledTransactionHashesMessageSender(final PeerTransactionTracker transactionTracker) {
this.transactionTracker = transactionTracker;
}
public void sendTransactionsToPeers() {
StreamSupport.stream(transactionTracker.getEthPeersWithUnsentTransactions().spliterator(), true)
.parallel()
.forEach(this::sendTransactionsToPeer);
}
private void sendTransactionsToPeer(final EthPeer peer) {
for (final List<Hash> hashes :
public void sendTransactionHashesToPeer(final EthPeer peer) {
for (final List<Transaction> txBatch :
Iterables.partition(
transactionTracker.claimTransactionsToSendToPeer(peer),
TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS_HASHES)) {
transactionTracker.claimTransactionsToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
try {
List<Hash> txHashes = toHashList(txBatch);
traceLambda(
LOG,
"Sending transaction hashes to peer {}, transaction hashes count {}, list {}",
peer::toString,
hashes::size,
hashes::toString);
txHashes::size,
txHashes::toString);
peer.send(NewPooledTransactionHashesMessage.create(hashes));
} catch (final PeerNotConnected __) {
peer.send(NewPooledTransactionHashesMessage.create(txHashes));
} catch (final PeerNotConnected unused) {
break;
}
}

@ -1,101 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static java.util.Collections.emptySet;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class PeerPendingTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Hash>> transactionsToSend = new ConcurrentHashMap<>();
private final AbstractPendingTransactionsSorter pendingTransactions;
public PeerPendingTransactionTracker(
final AbstractPendingTransactionsSorter pendingTransactions) {
this.pendingTransactions = pendingTransactions;
}
public synchronized void markTransactionsHashesAsSeen(
final EthPeer peer, final Collection<Hash> transactions) {
final Set<Hash> seenTransactionsForPeer = getOrCreateSeenTransactionsForPeer(peer);
transactions.stream().forEach(seenTransactionsForPeer::add);
}
public synchronized void addToPeerSendQueue(final EthPeer peer, final Hash hash) {
if (!hasPeerSeenTransaction(peer, hash)) {
transactionsToSend.computeIfAbsent(peer, key -> createTransactionsSet()).add(hash);
}
}
public Iterable<EthPeer> getEthPeersWithUnsentTransactions() {
return transactionsToSend.keySet();
}
public synchronized Set<Hash> claimTransactionsToSendToPeer(final EthPeer peer) {
final Set<Hash> transactionsToSend = this.transactionsToSend.remove(peer);
if (transactionsToSend != null) {
markTransactionsHashesAsSeen(
peer,
transactionsToSend.stream()
.filter(h -> pendingTransactions.getTransactionByHash(h).isPresent())
.collect(Collectors.toSet()));
return transactionsToSend;
} else {
return emptySet();
}
}
public boolean isPeerSupported(final EthPeer peer, final Capability capability) {
return peer.getAgreedCapabilities().contains(capability);
}
private Set<Hash> getOrCreateSeenTransactionsForPeer(final EthPeer peer) {
return seenTransactions.computeIfAbsent(peer, key -> createTransactionsSet());
}
private boolean hasPeerSeenTransaction(final EthPeer peer, final Hash hash) {
final Set<Hash> seenTransactionsForPeer = seenTransactions.get(peer);
return seenTransactionsForPeer != null && seenTransactionsForPeer.contains(hash);
}
private <T> Set<T> createTransactionsSet() {
return Collections.newSetFromMap(
new LinkedHashMap<T, Boolean>(1 << 4, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
return size() > MAX_TRACKED_SEEN_TRANSACTIONS;
}
});
}
@Override
public void onDisconnect(final EthPeer peer) {
seenTransactions.remove(peer);
transactionsToSend.remove(peer);
}
}

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions;
import static java.util.Collections.emptySet;
import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
@ -34,8 +35,13 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
public synchronized void markTransactionsAsSeen(
final EthPeer peer, final Collection<Transaction> transactions) {
markTransactionHashesAsSeen(peer, toHashList(transactions));
}
public synchronized void markTransactionHashesAsSeen(
final EthPeer peer, final Collection<Hash> txHashes) {
final Set<Hash> seenTransactionsForPeer = getOrCreateSeenTransactionsForPeer(peer);
transactions.stream().map(Transaction::getHash).forEach(seenTransactionsForPeer::add);
seenTransactionsForPeer.addAll(txHashes);
}
public synchronized void addToPeerSendQueue(final EthPeer peer, final Transaction transaction) {
@ -62,10 +68,13 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
return seenTransactions.computeIfAbsent(peer, key -> createTransactionsSet());
}
private boolean hasPeerSeenTransaction(final EthPeer peer, final Transaction transaction) {
boolean hasPeerSeenTransaction(final EthPeer peer, final Transaction transaction) {
return hasPeerSeenTransaction(peer, transaction.getHash());
}
boolean hasPeerSeenTransaction(final EthPeer peer, final Hash txHash) {
final Set<Hash> seenTransactionsForPeer = seenTransactions.get(peer);
return seenTransactionsForPeer != null
&& seenTransactionsForPeer.contains(transaction.getHash());
return seenTransactionsForPeer != null && seenTransactionsForPeer.contains(txHash);
}
private <T> Set<T> createTransactionsSet() {

@ -1,52 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
class PendingTransactionSender implements TransactionBatchAddedListener {
private final PeerPendingTransactionTracker transactionTracker;
private final PendingTransactionsMessageSender transactionsMessageSender;
private final EthContext ethContext;
public PendingTransactionSender(
final PeerPendingTransactionTracker transactionTracker,
final PendingTransactionsMessageSender transactionsMessageSender,
final EthContext ethContext) {
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.ethContext = ethContext;
}
@Override
public void onTransactionsAdded(final Iterable<Transaction> transactions) {
ethContext
.getEthPeers()
.streamAvailablePeers()
.filter(peer -> transactionTracker.isPeerSupported(peer, EthProtocol.ETH65))
.forEach(
peer ->
transactions.forEach(
transaction ->
transactionTracker.addToPeerSendQueue(peer, transaction.getHash())));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers);
}
}

@ -0,0 +1,157 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo.toTransactionList;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionBroadcaster implements TransactionBatchAddedListener {
private static final Logger LOG = LoggerFactory.getLogger(TransactionBroadcaster.class);
private final AbstractPendingTransactionsSorter pendingTransactions;
private final PeerTransactionTracker transactionTracker;
private final TransactionsMessageSender transactionsMessageSender;
private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthContext ethContext;
private final int numPeersToSendFullTransactions;
public TransactionBroadcaster(
final EthContext ethContext,
final AbstractPendingTransactionsSorter pendingTransactions,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) {
this.pendingTransactions = pendingTransactions;
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender;
this.ethContext = ethContext;
this.numPeersToSendFullTransactions =
(int) Math.ceil(Math.sqrt(ethContext.getEthPeers().getMaxPeers()));
}
public void relayTransactionPoolTo(final EthPeer peer) {
Set<TransactionInfo> pendingTransactionInfo = pendingTransactions.getTransactionInfo();
if (!pendingTransactionInfo.isEmpty()) {
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
sendTransactionHashes(toTransactionList(pendingTransactionInfo), List.of(peer));
} else {
sendFullTransactions(toTransactionList(pendingTransactionInfo), List.of(peer));
}
}
}
@Override
public void onTransactionsAdded(final Iterable<Transaction> transactions) {
final int currPeerCount = ethContext.getEthPeers().peerCount();
if (currPeerCount == 0) {
return;
}
List<EthPeer> peersWithOnlyTransactionSupport = new ArrayList<>(currPeerCount);
List<EthPeer> peersWithTransactionHashesSupport = new ArrayList<>(currPeerCount);
ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
peer -> {
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
peersWithTransactionHashesSupport.add(peer);
} else {
peersWithOnlyTransactionSupport.add(peer);
}
});
if (peersWithOnlyTransactionSupport.size() < numPeersToSendFullTransactions) {
final int delta =
Math.min(
numPeersToSendFullTransactions - peersWithOnlyTransactionSupport.size(),
peersWithTransactionHashesSupport.size());
Collections.shuffle(peersWithTransactionHashesSupport);
// move peers from the other list to reach the required size for full transaction peers
movePeersBetweenLists(
peersWithTransactionHashesSupport, peersWithOnlyTransactionSupport, delta);
}
traceLambda(
LOG,
"Sending full transactions to {} peers and transaction hashes to {} peers."
+ " Peers w/o eth/66 {}, peers with eth/66 {}",
peersWithOnlyTransactionSupport::size,
peersWithTransactionHashesSupport::size,
peersWithOnlyTransactionSupport::toString,
peersWithTransactionHashesSupport::toString);
sendFullTransactions(transactions, peersWithOnlyTransactionSupport);
sendTransactionHashes(transactions, peersWithTransactionHashesSupport);
}
private void sendFullTransactions(
final Iterable<Transaction> transactions, final List<EthPeer> fullTransactionPeers) {
fullTransactionPeers.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(() -> transactionsMessageSender.sendTransactionsToPeer(peer));
});
}
private void sendTransactionHashes(
final Iterable<Transaction> transactions, final List<EthPeer> transactionHashPeers) {
transactionHashPeers.stream()
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() ->
newPooledTransactionHashesMessageSender.sendTransactionHashesToPeer(
peer));
});
}
private void movePeersBetweenLists(
final List<EthPeer> sourceList, final List<EthPeer> destinationList, final int num) {
final int stopIndex = sourceList.size() - num;
for (int i = sourceList.size() - 1; i >= stopIndex; i--) {
destinationList.add(sourceList.remove(i));
}
}
}

@ -28,7 +28,6 @@ import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -72,36 +71,27 @@ public class TransactionPool implements BlockAddedObserver {
private final AbstractPendingTransactionsSorter pendingTransactions;
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
private final TransactionBatchAddedListener transactionBatchAddedListener;
private final TransactionBatchAddedListener pendingTransactionBatchAddedListener;
private final TransactionBroadcaster transactionBroadcaster;
private final SyncState syncState;
private final MiningParameters miningParameters;
private final LabelledMetric<Counter> duplicateTransactionCounter;
private final PeerTransactionTracker peerTransactionTracker;
private final PeerPendingTransactionTracker peerPendingTransactionTracker;
private final TransactionPoolConfiguration configuration;
public TransactionPool(
final AbstractPendingTransactionsSorter pendingTransactions,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final TransactionBatchAddedListener transactionBatchAddedListener,
final TransactionBatchAddedListener pendingTransactionBatchAddedListener,
final TransactionBroadcaster transactionBroadcaster,
final SyncState syncState,
final EthContext ethContext,
final PeerTransactionTracker peerTransactionTracker,
final PeerPendingTransactionTracker peerPendingTransactionTracker,
final MiningParameters miningParameters,
final MetricsSystem metricsSystem,
final TransactionPoolConfiguration configuration) {
this.pendingTransactions = pendingTransactions;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.transactionBatchAddedListener = transactionBatchAddedListener;
this.pendingTransactionBatchAddedListener = pendingTransactionBatchAddedListener;
this.transactionBroadcaster = transactionBroadcaster;
this.syncState = syncState;
this.peerTransactionTracker = peerTransactionTracker;
this.peerPendingTransactionTracker = peerPendingTransactionTracker;
this.miningParameters = miningParameters;
this.configuration = configuration;
@ -116,19 +106,7 @@ public class TransactionPool implements BlockAddedObserver {
}
void handleConnect(final EthPeer peer) {
pendingTransactions
.getLocalTransactions()
.forEach(transaction -> peerTransactionTracker.addToPeerSendQueue(peer, transaction));
if (peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)) {
pendingTransactions
.getNewPooledHashes()
.forEach(hash -> peerPendingTransactionTracker.addToPeerSendQueue(peer, hash));
}
}
public boolean addTransactionHash(final Hash transactionHash) {
return pendingTransactions.addTransactionHash(transactionHash);
transactionBroadcaster.relayTransactionPoolTo(peer);
}
public ValidationResult<TransactionInvalidReason> addLocalTransaction(
@ -147,8 +125,7 @@ public class TransactionPool implements BlockAddedObserver {
return ValidationResult.invalid(transactionAddedStatus.getInvalidReason().orElseThrow());
}
final Collection<Transaction> txs = singletonList(transaction);
transactionBatchAddedListener.onTransactionsAdded(txs);
pendingTransactionBatchAddedListener.onTransactionsAdded(txs);
transactionBroadcaster.onTransactionsAdded(txs);
}
return validationResult;
@ -167,9 +144,8 @@ public class TransactionPool implements BlockAddedObserver {
if (!syncState.isInSync(SYNC_TOLERANCE)) {
return;
}
final Set<Transaction> addedTransactions = new HashSet<>();
final Set<Transaction> addedTransactions = new HashSet<>(transactions.size());
for (final Transaction transaction : transactions) {
pendingTransactions.tryEvictTransactionHash(transaction.getHash());
if (pendingTransactions.containsTransaction(transaction.getHash())) {
// We already have this transaction, don't even validate it.
duplicateTransactionCounter.labels(REMOTE).inc();
@ -196,7 +172,7 @@ public class TransactionPool implements BlockAddedObserver {
}
}
if (!addedTransactions.isEmpty()) {
transactionBatchAddedListener.onTransactionsAdded(addedTransactions);
transactionBroadcaster.onTransactionsAdded(addedTransactions);
}
}
@ -328,11 +304,6 @@ public class TransactionPool implements BlockAddedObserver {
return blockchain.getBlockHeader(blockchain.getChainHeadHash()).get();
}
public interface TransactionBatchAddedListener {
void onTransactionsAdded(Iterable<Transaction> transactions);
}
private Wei minTransactionGasPrice(final Transaction transaction) {
final BlockHeader chainHeadBlockHeader = getChainHeadBlockHeader();
return protocolSchedule
@ -340,4 +311,9 @@ public class TransactionPool implements BlockAddedObserver {
.getFeeMarket()
.minTransactionPriceInNextBlock(transaction, chainHeadBlockHeader::getBaseFee);
}
public interface TransactionBatchAddedListener {
void onTransactionsAdded(Iterable<Transaction> transactions);
}
}

@ -40,11 +40,6 @@ public interface TransactionPoolConfiguration {
return MAX_PENDING_TRANSACTIONS;
}
@Value.Default
default int getPooledTransactionHashesSize() {
return MAX_PENDING_TRANSACTIONS_HASHES;
}
@Value.Default
default int getPendingTxRetentionPeriod() {
return DEFAULT_TX_RETENTION_HOURS;

@ -51,10 +51,8 @@ public class TransactionPoolFactory {
final TransactionsMessageSender transactionsMessageSender =
new TransactionsMessageSender(transactionTracker);
final PeerPendingTransactionTracker pendingTransactionTracker =
new PeerPendingTransactionTracker(pendingTransactions);
final PendingTransactionsMessageSender pendingTransactionsMessageSender =
new PendingTransactionsMessageSender(pendingTransactionTracker);
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender =
new NewPooledTransactionHashesMessageSender(transactionTracker);
return createTransactionPool(
protocolSchedule,
@ -67,8 +65,7 @@ public class TransactionPoolFactory {
pendingTransactions,
transactionTracker,
transactionsMessageSender,
pendingTransactionTracker,
pendingTransactionsMessageSender);
newPooledTransactionHashesMessageSender);
}
static TransactionPool createTransactionPool(
@ -82,20 +79,20 @@ public class TransactionPoolFactory {
final AbstractPendingTransactionsSorter pendingTransactions,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final PeerPendingTransactionTracker pendingTransactionTracker,
final PendingTransactionsMessageSender pendingTransactionsMessageSender) {
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) {
final TransactionPool transactionPool =
new TransactionPool(
pendingTransactions,
protocolSchedule,
protocolContext,
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext),
new PendingTransactionSender(
pendingTransactionTracker, pendingTransactionsMessageSender, ethContext),
new TransactionBroadcaster(
ethContext,
pendingTransactions,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender),
syncState,
ethContext,
transactionTracker,
pendingTransactionTracker,
miningParameters,
metricsSystem,
transactionPoolConfiguration);
@ -111,11 +108,11 @@ public class TransactionPoolFactory {
"Total number of transactions messages skipped by the processor.")),
transactionPoolConfiguration.getTxMessageKeepAliveSeconds());
ethContext.getEthMessages().subscribe(EthPV62.TRANSACTIONS, transactionsMessageHandler);
final PendingTransactionsMessageHandler pooledTransactionsMessageHandler =
new PendingTransactionsMessageHandler(
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler =
new NewPooledTransactionHashesMessageHandler(
ethContext.getScheduler(),
new PendingTransactionsMessageProcessor(
pendingTransactionTracker,
new NewPooledTransactionHashesMessageProcessor(
transactionTracker,
transactionPool,
transactionPoolConfiguration,
metricsSystem.createCounter(
@ -129,7 +126,6 @@ public class TransactionPoolFactory {
ethContext
.getEthMessages()
.subscribe(EthPV65.NEW_POOLED_TRANSACTION_HASHES, pooledTransactionsMessageHandler);
ethContext.getEthPeers().subscribeDisconnect(pendingTransactionTracker);
protocolContext.getBlockchain().observeBlockAdded(transactionPool);
ethContext.getEthPeers().subscribeDisconnect(transactionTracker);
@ -152,7 +148,6 @@ public class TransactionPoolFactory {
return new BaseFeePendingTransactionsSorter(
transactionPoolConfiguration.getPendingTxRetentionPeriod(),
transactionPoolConfiguration.getTxPoolMaxSize(),
transactionPoolConfiguration.getPooledTransactionHashesSize(),
clock,
metricsSystem,
protocolContext.getBlockchain()::getChainHeadHeader,
@ -161,7 +156,6 @@ public class TransactionPoolFactory {
return new GasPricePendingTransactionsSorter(
transactionPoolConfiguration.getPendingTxRetentionPeriod(),
transactionPoolConfiguration.getTxPoolMaxSize(),
transactionPoolConfiguration.getPooledTransactionHashesSize(),
clock,
metricsSystem,
protocolContext.getBlockchain()::getChainHeadHeader,

@ -1,49 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
class TransactionSender implements TransactionBatchAddedListener {
private final PeerTransactionTracker transactionTracker;
private final TransactionsMessageSender transactionsMessageSender;
private final EthContext ethContext;
public TransactionSender(
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final EthContext ethContext) {
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.ethContext = ethContext;
}
@Override
public void onTransactionsAdded(final Iterable<Transaction> transactions) {
ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
peer ->
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction)));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers);
}
}

@ -15,9 +15,9 @@
package org.hyperledger.besu.ethereum.eth.transactions;
import static java.time.Instant.now;
import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
@ -28,12 +28,8 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -77,18 +73,17 @@ class TransactionsMessageProcessor {
private void processTransactionsMessage(
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
final List<Transaction> readTransactions = transactionsMessage.transactions();
final List<Transaction> incomingTransactions = transactionsMessage.transactions();
transactionTracker.markTransactionsAsSeen(peer, incomingTransactions);
traceLambda(
LOG,
"Received transactions message from {}, incoming transactions {}, incoming list {}",
peer::toString,
readTransactions::size,
() -> toHashList(readTransactions));
incomingTransactions::size,
() -> toHashList(incomingTransactions));
final Set<Transaction> transactions = Sets.newHashSet(readTransactions);
transactionTracker.markTransactionsAsSeen(peer, transactions);
transactionPool.addRemoteTransactions(transactions);
transactionPool.addRemoteTransactions(incomingTransactions);
} catch (final RLPException ex) {
if (peer != null) {
LOG.debug("Malformed transaction message received, disconnecting: {}", peer, ex);
@ -96,8 +91,4 @@ class TransactionsMessageProcessor {
}
}
}
private List<Hash> toHashList(final Collection<Transaction> txs) {
return txs.stream().map(Transaction::getHash).collect(Collectors.toList());
}
}

@ -14,18 +14,15 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.LimitedTransactionsMessages;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
@ -46,7 +43,7 @@ class TransactionsMessageSender {
.forEach(this::sendTransactionsToPeer);
}
private void sendTransactionsToPeer(final EthPeer peer) {
void sendTransactionsToPeer(final EthPeer peer) {
final Set<Transaction> allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer);
while (!allTxToSend.isEmpty()) {
final LimitedTransactionsMessages limitedTransactionsMessages =
@ -69,8 +66,4 @@ class TransactionsMessageSender {
}
}
}
private List<Hash> toHashList(final Collection<Transaction> txs) {
return txs.stream().map(Transaction::getHash).collect(Collectors.toList());
}
}

@ -40,6 +40,7 @@ import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -54,7 +55,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.EvictingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,7 +71,6 @@ public abstract class AbstractPendingTransactionsSorter {
protected final int maxTransactionRetentionHours;
protected final Clock clock;
protected final EvictingQueue<Hash> newPooledHashes;
protected final Object lock = new Object();
protected final Map<Hash, TransactionInfo> pendingTransactions = new ConcurrentHashMap<>();
@ -96,7 +95,6 @@ public abstract class AbstractPendingTransactionsSorter {
public AbstractPendingTransactionsSorter(
final int maxTransactionRetentionHours,
final int maxPendingTransactions,
final int maxPooledTransactionHashes,
final Clock clock,
final MetricsSystem metricsSystem,
final Supplier<BlockHeader> chainHeadHeaderSupplier,
@ -104,7 +102,6 @@ public abstract class AbstractPendingTransactionsSorter {
this.maxTransactionRetentionHours = maxTransactionRetentionHours;
this.maxPendingTransactions = maxPendingTransactions;
this.clock = clock;
this.newPooledHashes = EvictingQueue.create(maxPooledTransactionHashes);
this.chainHeadHeaderSupplier = chainHeadHeaderSupplier;
this.transactionReplacementHandler = new TransactionPoolReplacementHandler(priceBump);
final LabelledMetric<Counter> transactionAddedCounter =
@ -163,17 +160,6 @@ public abstract class AbstractPendingTransactionsSorter {
return added;
}
public boolean addTransactionHash(final Hash transactionHash) {
final boolean hashAdded;
synchronized (newPooledHashes) {
hashAdded = newPooledHashes.add(transactionHash);
}
if (hashAdded) {
localTransactionHashesAddedCounter.inc();
}
return hashAdded;
}
@VisibleForTesting
public TransactionAddedStatus addLocalTransaction(final Transaction transaction) {
final TransactionAddedStatus transactionAdded =
@ -338,18 +324,6 @@ public abstract class AbstractPendingTransactionsSorter {
: transactionsForSenderInfo.maybeNextNonce();
}
public void tryEvictTransactionHash(final Hash hash) {
synchronized (newPooledHashes) {
newPooledHashes.remove(hash);
}
}
public List<Hash> getNewPooledHashes() {
synchronized (newPooledHashes) {
return List.copyOf(newPooledHashes);
}
}
public abstract void manageBlockAdded(final Block block);
protected abstract void doRemoveTransaction(
@ -412,6 +386,13 @@ public abstract class AbstractPendingTransactionsSorter {
public Instant getAddedToPoolAt() {
return addedToPoolAt;
}
public static List<Transaction> toTransactionList(
final Collection<TransactionInfo> transactionsInfo) {
return transactionsInfo.stream()
.map(TransactionInfo::getTransaction)
.collect(Collectors.toUnmodifiableList());
}
}
public enum TransactionSelectionResult {

@ -54,7 +54,6 @@ public class BaseFeePendingTransactionsSorter extends AbstractPendingTransaction
public BaseFeePendingTransactionsSorter(
final int maxTransactionRetentionHours,
final int maxPendingTransactions,
final int maxPooledTransactionHashes,
final Clock clock,
final MetricsSystem metricsSystem,
final Supplier<BlockHeader> chainHeadHeaderSupplier,
@ -62,7 +61,6 @@ public class BaseFeePendingTransactionsSorter extends AbstractPendingTransaction
super(
maxTransactionRetentionHours,
maxPendingTransactions,
maxPooledTransactionHashes,
clock,
metricsSystem,
chainHeadHeaderSupplier,
@ -211,7 +209,6 @@ public class BaseFeePendingTransactionsSorter extends AbstractPendingTransaction
}
LOG.trace("Adding {} to pending transactions", transactionInfo);
pendingTransactions.put(transactionInfo.getHash(), transactionInfo);
tryEvictTransactionHash(transactionInfo.getHash());
if (pendingTransactions.size() > maxPendingTransactions) {
final Stream.Builder<TransactionInfo> removalCandidates = Stream.builder();

@ -47,7 +47,6 @@ public class GasPricePendingTransactionsSorter extends AbstractPendingTransactio
public GasPricePendingTransactionsSorter(
final int maxTransactionRetentionHours,
final int maxPendingTransactions,
final int maxPooledTransactionHashes,
final Clock clock,
final MetricsSystem metricsSystem,
final Supplier<BlockHeader> chainHeadHeaderSupplier,
@ -55,7 +54,6 @@ public class GasPricePendingTransactionsSorter extends AbstractPendingTransactio
super(
maxTransactionRetentionHours,
maxPendingTransactions,
maxPooledTransactionHashes,
clock,
metricsSystem,
chainHeadHeaderSupplier,
@ -101,7 +99,6 @@ public class GasPricePendingTransactionsSorter extends AbstractPendingTransactio
}
prioritizedTransactions.add(transactionInfo);
pendingTransactions.put(transactionInfo.getHash(), transactionInfo);
tryEvictTransactionHash(transactionInfo.getHash());
if (pendingTransactions.size() > maxPendingTransactions) {
final TransactionInfo toRemove = prioritizedTransactions.last();

@ -29,7 +29,7 @@ import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor;
import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -51,7 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner;
public class BufferedGetPooledTransactionsFromPeerFetcherTest {
@Mock EthPeer ethPeer;
@Mock PendingTransactionsMessageProcessor processor;
@Mock NewPooledTransactionHashesMessageProcessor processor;
@Mock TransactionPool transactionPool;
@Mock EthContext ethContext;
@Mock EthScheduler ethScheduler;

@ -53,7 +53,6 @@ import org.junit.Test;
public class BaseFeePendingTransactionsTest {
private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_TRANSACTION_HASHES = 5;
private static final Supplier<SignatureAlgorithm> SIGNATURE_ALGORITHM =
Suppliers.memoize(SignatureAlgorithmFactory::getInstance);
private static final KeyPair KEYS1 = SIGNATURE_ALGORITHM.get().generateKeyPair();
@ -70,7 +69,6 @@ public class BaseFeePendingTransactionsTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
TestClock.fixed(),
metricsSystem,
BaseFeePendingTransactionsTest::mockBlockHeader,
@ -594,7 +592,6 @@ public class BaseFeePendingTransactionsTest {
new BaseFeePendingTransactionsSorter(
maxTransactionRetentionHours,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
clock,
metricsSystem,
BaseFeePendingTransactionsTest::mockBlockHeader,
@ -618,7 +615,6 @@ public class BaseFeePendingTransactionsTest {
new BaseFeePendingTransactionsSorter(
maxTransactionRetentionHours,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
clock,
metricsSystem,
BaseFeePendingTransactionsTest::mockBlockHeader,
@ -638,7 +634,6 @@ public class BaseFeePendingTransactionsTest {
new BaseFeePendingTransactionsSorter(
maxTransactionRetentionHours,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
clock,
metricsSystem,
BaseFeePendingTransactionsTest::mockBlockHeader,

@ -53,7 +53,6 @@ import org.junit.Test;
public class GasPricePendingTransactionsTest {
private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_TRANSACTION_HASHES = 5;
private static final Supplier<SignatureAlgorithm> SIGNATURE_ALGORITHM =
Suppliers.memoize(SignatureAlgorithmFactory::getInstance);
private static final KeyPair KEYS1 = SIGNATURE_ALGORITHM.get().generateKeyPair();
@ -70,7 +69,6 @@ public class GasPricePendingTransactionsTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
TestClock.fixed(),
metricsSystem,
GasPricePendingTransactionsTest::mockBlockHeader,
@ -617,7 +615,6 @@ public class GasPricePendingTransactionsTest {
new GasPricePendingTransactionsSorter(
maxTransactionRetentionHours,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
clock,
metricsSystem,
() -> null,
@ -641,7 +638,6 @@ public class GasPricePendingTransactionsTest {
new GasPricePendingTransactionsSorter(
maxTransactionRetentionHours,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
clock,
metricsSystem,
() -> null,
@ -661,7 +657,6 @@ public class GasPricePendingTransactionsTest {
new GasPricePendingTransactionsSorter(
maxTransactionRetentionHours,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
clock,
metricsSystem,
() -> null,

@ -35,7 +35,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor.FetcherCreatorTask;
import org.hyperledger.besu.ethereum.eth.transactions.NewPooledTransactionHashesMessageProcessor.FetcherCreatorTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
@ -51,11 +51,11 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class PendingTransactionsMessageProcessorTest {
public class NewPooledTransactionHashesMessageProcessorTest {
@Mock private TransactionPool transactionPool;
@Mock private TransactionPoolConfiguration transactionPoolConfiguration;
@Mock private PeerPendingTransactionTracker transactionTracker;
@Mock private PeerTransactionTracker transactionTracker;
@Mock private Counter totalSkippedTransactionsMessageCounter;
@Mock private EthPeer peer1;
@Mock private MetricsSystem metricsSystem;
@ -63,7 +63,7 @@ public class PendingTransactionsMessageProcessorTest {
@Mock private EthContext ethContext;
@Mock private EthScheduler ethScheduler;
private PendingTransactionsMessageProcessor messageHandler;
private NewPooledTransactionHashesMessageProcessor messageHandler;
private final BlockDataGenerator generator = new BlockDataGenerator();
private final Hash hash1 = generator.transaction().getHash();
@ -75,7 +75,7 @@ public class PendingTransactionsMessageProcessorTest {
when(transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod())
.thenReturn(Duration.ofMillis(500));
messageHandler =
new PendingTransactionsMessageProcessor(
new NewPooledTransactionHashesMessageProcessor(
transactionTracker,
transactionPool,
transactionPoolConfiguration,
@ -95,7 +95,7 @@ public class PendingTransactionsMessageProcessorTest {
ofMinutes(1));
verify(transactionTracker)
.markTransactionsHashesAsSeen(peer1, Arrays.asList(hash1, hash2, hash3));
.markTransactionHashesAsSeen(peer1, Arrays.asList(hash1, hash2, hash3));
verifyNoMoreInteractions(transactionTracker);
}
@ -109,9 +109,6 @@ public class PendingTransactionsMessageProcessorTest {
now(),
ofMinutes(1));
verify(transactionPool).addTransactionHash(hash1);
verify(transactionPool).addTransactionHash(hash2);
verify(transactionPool).addTransactionHash(hash3);
verify(transactionPool).getTransactionByHash(hash1);
verify(transactionPool).getTransactionByHash(hash2);
verify(transactionPool).getTransactionByHash(hash3);
@ -133,7 +130,7 @@ public class PendingTransactionsMessageProcessorTest {
now(),
ofMinutes(1));
verify(transactionPool).addTransactionHash(hash3);
// verify(transactionPool).addTransactionHash(hash3);
verify(transactionPool).getTransactionByHash(hash1);
verify(transactionPool).getTransactionByHash(hash2);
verify(transactionPool).getTransactionByHash(hash3);
@ -182,7 +179,6 @@ public class PendingTransactionsMessageProcessorTest {
final EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
when(transactionPool.addTransactionHash(hash1)).thenReturn(true);
messageHandler.processNewPooledTransactionHashesMessage(
peer1, NewPooledTransactionHashesMessage.create(asList(hash1, hash2)), now(), ofMinutes(1));
@ -195,9 +191,6 @@ public class PendingTransactionsMessageProcessorTest {
public void shouldNotScheduleGetPooledTransactionsTaskTwice() {
when(syncState.isInSync(anyLong())).thenReturn(true);
when(transactionPool.addTransactionHash(hash1)).thenReturn(true);
when(transactionPool.addTransactionHash(hash2)).thenReturn(true);
messageHandler.processNewPooledTransactionHashesMessage(
peer1,
NewPooledTransactionHashesMessage.create(Collections.singletonList(hash1)),

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.transactions;
import static com.google.common.collect.Sets.newHashSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
@ -50,20 +51,20 @@ import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
@RunWith(Parameterized.class)
public class PendingTransactionsMessageSenderTest {
public class NewPooledTransactionHashesMessageSenderTest {
private final EthPeer peer1 = mock(EthPeer.class);
private final EthPeer peer2 = mock(EthPeer.class);
private final BlockDataGenerator generator = new BlockDataGenerator();
private final Hash transaction1 = generator.transaction().getHash();
private final Hash transaction2 = generator.transaction().getHash();
private final Hash transaction3 = generator.transaction().getHash();
private final Transaction transaction1 = generator.transaction();
private final Transaction transaction2 = generator.transaction();
private final Transaction transaction3 = generator.transaction();
@Parameterized.Parameter public AbstractPendingTransactionsSorter pendingTransactions;
private PeerPendingTransactionTracker transactionTracker;
private PendingTransactionsMessageSender messageSender;
private PeerTransactionTracker transactionTracker;
private NewPooledTransactionHashesMessageSender messageSender;
@Parameterized.Parameters
public static Collection<Object[]> data() {
@ -76,8 +77,8 @@ public class PendingTransactionsMessageSenderTest {
@Before
public void setUp() {
transactionTracker = new PeerPendingTransactionTracker(pendingTransactions);
messageSender = new PendingTransactionsMessageSender(transactionTracker);
transactionTracker = new PeerTransactionTracker();
messageSender = new NewPooledTransactionHashesMessageSender(transactionTracker);
Transaction tx = mock(Transaction.class);
when(pendingTransactions.getTransactionByHash(any())).thenReturn(Optional.of(tx));
}
@ -89,7 +90,7 @@ public class PendingTransactionsMessageSenderTest {
transactionTracker.addToPeerSendQueue(peer1, transaction2);
transactionTracker.addToPeerSendQueue(peer2, transaction3);
messageSender.sendTransactionsToPeers();
List.of(peer1, peer2).forEach(messageSender::sendTransactionHashesToPeer);
verify(peer1).send(transactionsMessageContaining(transaction1, transaction2));
verify(peer2).send(transactionsMessageContaining(transaction3));
@ -98,20 +99,20 @@ public class PendingTransactionsMessageSenderTest {
@Test
public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Hash> transactions =
generator.transactions(6000).stream().map(Transaction::getHash).collect(Collectors.toSet());
final Set<Transaction> transactions =
generator.transactions(6000).stream().collect(Collectors.toSet());
transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
messageSender.sendTransactionsToPeers();
messageSender.sendTransactionHashesToPeer(peer1);
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
ArgumentCaptor.forClass(MessageData.class);
verify(peer1, times(2)).send(messageDataArgumentCaptor.capture());
final List<MessageData> sentMessages = messageDataArgumentCaptor.getAllValues();
assertThat(sentMessages).hasSize(2);
assertThat(sentMessages)
.hasSize(2)
.allMatch(message -> message.getCode() == EthPV65.NEW_POOLED_TRANSACTION_HASHES);
final Set<Hash> firstBatch = getTransactionsFromMessage(sentMessages.get(0));
final Set<Hash> secondBatch = getTransactionsFromMessage(sentMessages.get(1));
@ -124,14 +125,16 @@ public class PendingTransactionsMessageSenderTest {
.hasSizeBetween(
expectedSecondBatchSize - toleranceDelta, expectedSecondBatchSize + toleranceDelta);
assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(transactions);
assertThat(Sets.union(firstBatch, secondBatch))
.containsExactlyInAnyOrderElementsOf(toHashList(transactions));
}
private MessageData transactionsMessageContaining(final Hash... transactions) {
private MessageData transactionsMessageContaining(final Transaction... transactions) {
return argThat(
message -> {
final Set<Hash> actualSentTransactions = getTransactionsFromMessage(message);
final Set<Hash> expectedTransactions = newHashSet(transactions);
final Set<Hash> expectedTransactions =
newHashSet(toHashList(Arrays.asList(transactions)));
return message.getCode() == EthPV65.NEW_POOLED_TRANSACTION_HASHES
&& actualSentTransactions.equals(expectedTransactions);
});

@ -1,124 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class PeerPendingTransactionTrackerTest {
@Parameterized.Parameter public AbstractPendingTransactionsSorter pendingTransactions;
private final EthPeer ethPeer1 = mock(EthPeer.class);
private final EthPeer ethPeer2 = mock(EthPeer.class);
private final BlockDataGenerator generator = new BlockDataGenerator();
private PeerPendingTransactionTracker tracker;
private final Hash hash1 = generator.transaction().getHash();
private final Hash hash2 = generator.transaction().getHash();
private final Hash hash3 = generator.transaction().getHash();
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{mock(GasPricePendingTransactionsSorter.class)},
{mock(BaseFeePendingTransactionsSorter.class)}
});
}
@Before
public void setUp() {
tracker = new PeerPendingTransactionTracker(pendingTransactions);
Transaction tx = mock(Transaction.class);
when(pendingTransactions.getTransactionByHash(any())).thenReturn(Optional.of(tx));
}
@Test
public void shouldTrackTransactionsToSendToPeer() {
tracker.addToPeerSendQueue(ethPeer1, hash1);
tracker.addToPeerSendQueue(ethPeer1, hash2);
tracker.addToPeerSendQueue(ethPeer2, hash3);
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1, hash2);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3);
}
@Test
public void shouldExcludeAlreadySeenTransactionsFromTransactionsToSend() {
tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash2));
tracker.addToPeerSendQueue(ethPeer1, hash1);
tracker.addToPeerSendQueue(ethPeer1, hash2);
tracker.addToPeerSendQueue(ethPeer2, hash3);
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3);
}
@Test
public void shouldExcludeAlreadySeenTransactionsAsACollectionFromTransactionsToSend() {
tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash1, hash2));
tracker.addToPeerSendQueue(ethPeer1, hash1);
tracker.addToPeerSendQueue(ethPeer1, hash2);
tracker.addToPeerSendQueue(ethPeer2, hash3);
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).isEmpty();
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3);
}
@Test
public void shouldClearDataWhenPeerDisconnects() {
tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash3));
tracker.addToPeerSendQueue(ethPeer1, hash2);
tracker.addToPeerSendQueue(ethPeer2, hash3);
tracker.onDisconnect(ethPeer1);
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2);
// Should have cleared data that ethPeer1 has already seen transaction1
tracker.addToPeerSendQueue(ethPeer1, hash1);
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3);
}
}

@ -42,7 +42,6 @@ import org.junit.Test;
public class PendingMultiTypesTransactionsTest {
private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_TRANSACTION_HASHES = 5;
private static final Supplier<SignatureAlgorithm> SIGNATURE_ALGORITHM =
Suppliers.memoize(SignatureAlgorithmFactory::getInstance)::get;
private static final KeyPair KEYS1 = SIGNATURE_ALGORITHM.get().generateKeyPair();
@ -59,7 +58,6 @@ public class PendingMultiTypesTransactionsTest {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
TestClock.fixed(),
metricsSystem,
() -> mockBlockHeader(Wei.of(7L)),

@ -1,67 +0,0 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.util.Arrays;
import java.util.Collections;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.Test;
public class PendingTransactionsSenderTest {
@Test
public void testSendEth65PeersOnly() {
PeerPendingTransactionTracker peerPendingTransactionTracker =
mock(PeerPendingTransactionTracker.class);
PendingTransactionsMessageSender pendingTransactionsMessageSender =
mock(PendingTransactionsMessageSender.class);
EthContext ethContext = mock(EthContext.class);
EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
PendingTransactionSender sender =
new PendingTransactionSender(
peerPendingTransactionTracker, pendingTransactionsMessageSender, ethContext);
EthPeer peer1 = mock(EthPeer.class);
EthPeer peer2 = mock(EthPeer.class);
Transaction tx = mock(Transaction.class);
Hash hash = Hash.wrap(Bytes32.random());
when(tx.getHash()).thenReturn(hash);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
when(ethPeers.streamAvailablePeers()).thenReturn(Arrays.asList(peer1, peer2).stream());
when(peerPendingTransactionTracker.isPeerSupported(peer1, EthProtocol.ETH65)).thenReturn(true);
when(peerPendingTransactionTracker.isPeerSupported(peer2, EthProtocol.ETH65)).thenReturn(false);
sender.onTransactionsAdded(Collections.singleton(tx));
verify(peerPendingTransactionTracker, times(1)).addToPeerSendQueue(peer1, hash);
verify(peerPendingTransactionTracker, never()).addToPeerSendQueue(peer2, hash);
}
}

@ -0,0 +1,272 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo.toTransactionList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionInfo;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TransactionBroadcasterTest {
@Mock private EthContext ethContext;
@Mock private EthPeers ethPeers;
@Mock private EthScheduler ethScheduler;
@Mock private AbstractPendingTransactionsSorter pendingTransactions;
@Mock private PeerTransactionTracker transactionTracker;
@Mock private TransactionsMessageSender transactionsMessageSender;
@Mock private NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthPeer ethPeerNoEth66 = mock(EthPeer.class);
private final EthPeer ethPeerWithEth66 = mock(EthPeer.class);
private final EthPeer ethPeerNoEth66_2 = mock(EthPeer.class);
private final EthPeer ethPeerWithEth66_2 = mock(EthPeer.class);
private final EthPeer ethPeerWithEth66_3 = mock(EthPeer.class);
private final BlockDataGenerator generator = new BlockDataGenerator();
private TransactionBroadcaster txBroadcaster;
private ArgumentCaptor<Runnable> sendTaskCapture;
@Before
public void setUp() {
when(ethPeerNoEth66.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES))
.thenReturn(Boolean.FALSE);
when(ethPeerNoEth66_2.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES))
.thenReturn(Boolean.FALSE);
when(ethPeerWithEth66.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES))
.thenReturn(Boolean.TRUE);
when(ethPeerWithEth66_2.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES))
.thenReturn(Boolean.TRUE);
when(ethPeerWithEth66_3.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES))
.thenReturn(Boolean.TRUE);
sendTaskCapture = ArgumentCaptor.forClass(Runnable.class);
doNothing().when(ethScheduler).scheduleSyncWorkerTask(sendTaskCapture.capture());
when(ethPeers.getMaxPeers()).thenReturn(4);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
txBroadcaster =
new TransactionBroadcaster(
ethContext,
pendingTransactions,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender);
}
@Test
public void doNotRelayTransactionsWhenPoolIsEmpty() {
setupTransactionPool(0, 0);
txBroadcaster.relayTransactionPoolTo(ethPeerNoEth66);
txBroadcaster.relayTransactionPoolTo(ethPeerWithEth66);
verifyNothingSent();
}
@Test
public void relayFullTransactionsFromPoolWhenPeerDoesNotSupportEth66() {
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.relayTransactionPoolTo(ethPeerNoEth66);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66, txs);
sendTaskCapture.getValue().run();
verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerNoEth66);
verifyNoInteractions(newPooledTransactionHashesMessageSender);
}
@Test
public void relayTransactionHashesFromPoolWhenPeerSupportEth66() {
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.relayTransactionPoolTo(ethPeerWithEth66);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs);
sendTaskCapture.getValue().run();
verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(ethPeerWithEth66);
verifyNoInteractions(transactionsMessageSender);
}
@Test
public void onTransactionsAddedWithNoPeersDoesNothing() {
when(ethPeers.peerCount()).thenReturn(0);
txBroadcaster.onTransactionsAdded(toTransactionList(setupTransactionPool(1, 1)));
verifyNothingSent();
}
@Test
public void onTransactionsAddedWithOnlyNonEth66PeersSendFullTransactions() {
when(ethPeers.peerCount()).thenReturn(2);
when(ethPeers.streamAvailablePeers()).thenReturn(Stream.of(ethPeerNoEth66, ethPeerNoEth66_2));
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.onTransactionsAdded(txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66_2, txs);
sendTaskCapture.getAllValues().forEach(Runnable::run);
verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerNoEth66);
verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerNoEth66_2);
verifyNoInteractions(newPooledTransactionHashesMessageSender);
}
@Test
public void onTransactionsAddedWithOnlyFewEth66PeersSendFullTransactions() {
when(ethPeers.peerCount()).thenReturn(2);
when(ethPeers.streamAvailablePeers())
.thenReturn(Stream.of(ethPeerWithEth66, ethPeerWithEth66_2));
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.onTransactionsAdded(txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_2, txs);
sendTaskCapture.getAllValues().forEach(Runnable::run);
verify(transactionsMessageSender, times(2)).sendTransactionsToPeer(any(EthPeer.class));
verifyNoInteractions(newPooledTransactionHashesMessageSender);
}
@Test
public void onTransactionsAddedWithOnlyEth66PeersSendFullTransactionsAndTransactionHashes() {
when(ethPeers.peerCount()).thenReturn(3);
when(ethPeers.streamAvailablePeers())
.thenReturn(Stream.of(ethPeerWithEth66, ethPeerWithEth66_2, ethPeerWithEth66_3));
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.onTransactionsAdded(txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_3, txs);
sendTaskCapture.getAllValues().forEach(Runnable::run);
verify(transactionsMessageSender, times(2)).sendTransactionsToPeer(any(EthPeer.class));
verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(any(EthPeer.class));
}
@Test
public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionHashes() {
List<EthPeer> eth66Peers = List.of(ethPeerWithEth66, ethPeerWithEth66_2);
when(ethPeers.peerCount()).thenReturn(3);
when(ethPeers.streamAvailablePeers())
.thenReturn(Stream.concat(eth66Peers.stream(), Stream.of(ethPeerNoEth66)));
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));
txBroadcaster.onTransactionsAdded(txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth66_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth66, txs);
sendTaskCapture.getAllValues().forEach(Runnable::run);
ArgumentCaptor<EthPeer> capPeerFullTransactions = ArgumentCaptor.forClass(EthPeer.class);
verify(transactionsMessageSender, times(2))
.sendTransactionsToPeer(capPeerFullTransactions.capture());
List<EthPeer> fullTransactionPeers = new ArrayList<>(capPeerFullTransactions.getAllValues());
assertThat(fullTransactionPeers.remove(ethPeerNoEth66)).isTrue();
assertThat(fullTransactionPeers).hasSize(1).first().isIn(eth66Peers);
ArgumentCaptor<EthPeer> capPeerTransactionHashes = ArgumentCaptor.forClass(EthPeer.class);
verify(newPooledTransactionHashesMessageSender)
.sendTransactionHashesToPeer(capPeerTransactionHashes.capture());
assertThat(capPeerTransactionHashes.getValue()).isIn(eth66Peers);
}
private void verifyNothingSent() {
verifyNoInteractions(
transactionTracker, transactionsMessageSender, newPooledTransactionHashesMessageSender);
}
private Set<TransactionInfo> setupTransactionPool(
final int numLocalTransactions, final int numRemoteTransactions) {
Set<TransactionInfo> txInfo = createTransactionInfoList(numLocalTransactions, true);
txInfo.addAll(createTransactionInfoList(numRemoteTransactions, false));
when(pendingTransactions.getTransactionInfo()).thenReturn(txInfo);
return txInfo;
}
private Set<TransactionInfo> createTransactionInfoList(final int num, final boolean local) {
return IntStream.range(0, num)
.mapToObj(unused -> generator.transaction())
.map(tx -> new TransactionInfo(tx, local, Instant.now()))
.collect(Collectors.toSet());
}
private void verifyTransactionAddedToPeerSendingQueue(
final EthPeer peer, final Collection<Transaction> transactions) {
ArgumentCaptor<Transaction> trackedTransactions = ArgumentCaptor.forClass(Transaction.class);
verify(transactionTracker, times(transactions.size()))
.addToPeerSendQueue(eq(peer), trackedTransactions.capture());
assertThat(trackedTransactions.getAllValues())
.containsExactlyInAnyOrderElementsOf(transactions);
}
}

@ -15,7 +15,9 @@
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -30,6 +32,7 @@ import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@ -65,16 +68,17 @@ public class TransactionPoolFactoryTest {
final EthContext ethContext = mock(EthContext.class);
when(ethContext.getEthMessages()).thenReturn(mock(EthMessages.class));
when(ethContext.getEthPeers()).thenReturn(ethPeers);
final EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
final SyncState state = mock(SyncState.class);
final GasPricePendingTransactionsSorter pendingTransactions =
mock(GasPricePendingTransactionsSorter.class);
final PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
final TransactionsMessageSender transactionsMessageSender =
mock(TransactionsMessageSender.class);
final PeerPendingTransactionTracker peerPendingTransactionTracker =
mock(PeerPendingTransactionTracker.class);
final PendingTransactionsMessageSender pendingTransactionsMessageSender =
mock(PendingTransactionsMessageSender.class);
doNothing().when(transactionsMessageSender).sendTransactionsToPeer(any(EthPeer.class));
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender =
mock(NewPooledTransactionHashesMessageSender.class);
final TransactionPool pool =
TransactionPoolFactory.createTransactionPool(
schedule,
@ -87,7 +91,6 @@ public class TransactionPoolFactoryTest {
1,
1,
1,
1,
TransactionPoolConfiguration.DEFAULT_PRICE_BUMP,
TransactionPoolConfiguration.ETH65_TRX_ANNOUNCED_BUFFERING_PERIOD,
TransactionPoolConfiguration.DEFAULT_RPC_TX_FEE_CAP,
@ -95,8 +98,7 @@ public class TransactionPoolFactoryTest {
pendingTransactions,
peerTransactionTracker,
transactionsMessageSender,
peerPendingTransactionTracker,
pendingTransactionsMessageSender);
newPooledTransactionHashesMessageSender);
final EthProtocolManager ethProtocolManager =
new EthProtocolManager(
@ -119,6 +121,5 @@ public class TransactionPoolFactoryTest {
assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse();
ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING);
verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer());
verify(peerPendingTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer());
}
}

@ -28,9 +28,10 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -51,13 +52,14 @@ import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionValidator;
@ -92,15 +94,14 @@ import org.mockito.junit.MockitoJUnitRunner;
public class TransactionPoolTest {
private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_TRANSACTION_HASHES = 5;
private static final KeyPair KEY_PAIR1 =
SignatureAlgorithmFactory.getInstance().generateKeyPair();
@Mock private MainnetTransactionValidator transactionValidator;
@Mock private PendingTransactionListener listener;
@Mock private TransactionPool.TransactionBatchAddedListener batchAddedListener;
@Mock private TransactionPool.TransactionBatchAddedListener pendingBatchAddedListener;
@Mock private MiningParameters miningParameters;
@Mock private TransactionsMessageSender transactionsMessageSender;
@Mock private NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
@SuppressWarnings("unchecked")
@Mock
@ -112,6 +113,7 @@ public class TransactionPoolTest {
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private MutableBlockchain blockchain;
private TransactionBroadcaster transactionBroadcaster;
private GasPricePendingTransactionsSorter transactions;
private final Transaction transaction1 = createTransaction(1);
@ -124,7 +126,7 @@ public class TransactionPoolTest {
private EthContext ethContext;
private EthPeers ethPeers;
private PeerTransactionTracker peerTransactionTracker;
private PeerPendingTransactionTracker peerPendingTransactionTracker;
private ArgumentCaptor<Runnable> syncTaskCapture;
@Before
public void setUp() {
@ -133,7 +135,6 @@ public class TransactionPoolTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_TRANSACTION_HASHES,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
@ -145,10 +146,25 @@ public class TransactionPoolTest {
syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
ethContext = mock(EthContext.class);
final EthScheduler ethScheduler = mock(EthScheduler.class);
syncTaskCapture = ArgumentCaptor.forClass(Runnable.class);
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
when(ethContext.getScheduler()).thenReturn(ethScheduler);
ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
peerTransactionTracker = mock(PeerTransactionTracker.class);
peerPendingTransactionTracker = mock(PeerPendingTransactionTracker.class);
peerTransactionTracker = new PeerTransactionTracker();
transactionBroadcaster =
spy(
new TransactionBroadcaster(
ethContext,
transactions,
peerTransactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender));
transactionPool = createTransactionPool();
blockchain.observeBlockAdded(transactionPool);
when(miningParameters.getMinTransactionGasPrice()).thenReturn(Wei.of(2));
@ -169,12 +185,9 @@ public class TransactionPoolTest {
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
miningParameters,
metricsSystem,
config);
@ -440,7 +453,7 @@ public class TransactionPoolTest {
assertTransactionNotPending(transaction1);
assertTransactionPending(transaction2);
verify(batchAddedListener).onTransactionsAdded(singleton(transaction2));
verify(transactionBroadcaster).onTransactionsAdded(singleton(transaction2));
}
@Test
@ -455,7 +468,7 @@ public class TransactionPoolTest {
assertTransactionNotPending(transaction1);
assertTransactionPending(transaction2);
verify(batchAddedListener).onTransactionsAdded(singleton(transaction2));
verify(transactionBroadcaster).onTransactionsAdded(singleton(transaction2));
verify(transactionValidator).validate(eq(transaction1), any(Optional.class), any());
verify(transactionValidator)
.validateForSender(eq(transaction1), eq(null), any(TransactionValidationParams.class));
@ -529,12 +542,9 @@ public class TransactionPoolTest {
pendingTransactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
TransactionPoolConfiguration.DEFAULT);
@ -544,7 +554,6 @@ public class TransactionPoolTest {
transactionPool.addRemoteTransactions(singletonList(transaction1));
verify(pendingTransactions).containsTransaction(transaction1.getHash());
verify(pendingTransactions).tryEvictTransactionHash(transaction1.getHash());
verifyNoInteractions(transactionValidator);
verifyNoMoreInteractions(pendingTransactions);
}
@ -570,8 +579,8 @@ public class TransactionPoolTest {
transactionPool.addRemoteTransactions(singletonList(transaction2));
assertTransactionPending(transaction1);
verify(batchAddedListener).onTransactionsAdded(singleton(transaction1));
verify(batchAddedListener, never()).onTransactionsAdded(singleton(transaction2));
verify(transactionBroadcaster).onTransactionsAdded(singleton(transaction1));
verify(transactionBroadcaster, never()).onTransactionsAdded(singleton(transaction2));
}
@Test
@ -595,8 +604,8 @@ public class TransactionPoolTest {
transactionPool.addLocalTransaction(transaction2);
assertTransactionPending(transaction1);
verify(batchAddedListener).onTransactionsAdded(singletonList(transaction1));
verify(batchAddedListener, never()).onTransactionsAdded(singletonList(transaction2));
verify(transactionBroadcaster).onTransactionsAdded(singletonList(transaction1));
verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(transaction2));
}
@Test
@ -611,7 +620,7 @@ public class TransactionPoolTest {
.isEqualTo(ValidationResult.invalid(EXCEEDS_BLOCK_GAS_LIMIT));
assertTransactionNotPending(transaction1);
verifyNoInteractions(batchAddedListener);
verifyNoInteractions(transactionBroadcaster);
}
@Test
@ -625,29 +634,37 @@ public class TransactionPoolTest {
transactionPool.addRemoteTransactions(singleton(transaction1));
assertTransactionNotPending(transaction1);
verifyNoInteractions(batchAddedListener);
verifyNoInteractions(transactionBroadcaster);
}
@Test
public void shouldNotNotifyBatchListenerIfNoTransactionsAreAdded() {
transactionPool.addRemoteTransactions(emptyList());
verifyNoInteractions(batchAddedListener);
verifyNoInteractions(transactionBroadcaster);
}
@Test
public void shouldSendPooledTransactionHashesIfPeerSupportsEth65() {
EthPeer peer = mock(EthPeer.class);
when(peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)).thenReturn(true);
givenTransactionIsValid(transaction1);
transactionPool.addLocalTransaction(transaction1);
transactionPool.handleConnect(peer);
syncTaskCapture.getValue().run();
verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(peer);
}
@Test
public void shouldNotNotifyPeerForPendingTransactionsIfItDoesntSupportEth65() {
public void shouldSendFullTransactionsIfPeerDoesNotSupportEth65() {
EthPeer peer = mock(EthPeer.class);
EthPeer validPeer = mock(EthPeer.class);
when(peerPendingTransactionTracker.isPeerSupported(peer, EthProtocol.ETH65)).thenReturn(false);
when(peerPendingTransactionTracker.isPeerSupported(validPeer, EthProtocol.ETH65))
.thenReturn(true);
when(peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)).thenReturn(false);
transactionPool.addTransactionHash(transaction1.getHash());
givenTransactionIsValid(transaction1);
transactionPool.addLocalTransaction(transaction1);
transactionPool.handleConnect(peer);
verify(peerPendingTransactionTracker, never()).addToPeerSendQueue(peer, transaction1.getHash());
transactionPool.handleConnect(validPeer);
verify(peerPendingTransactionTracker, times(1))
.addToPeerSendQueue(validPeer, transaction1.getHash());
syncTaskCapture.getValue().run();
verify(transactionsMessageSender).sendTransactionsToPeer(peer);
}
@Test
@ -668,12 +685,9 @@ public class TransactionPoolTest {
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
TransactionPoolConfiguration.DEFAULT);
@ -688,7 +702,7 @@ public class TransactionPoolTest {
assertTransactionNotPending(transaction1);
assertTransactionNotPending(transaction2);
assertTransactionNotPending(transaction3);
verifyNoInteractions(batchAddedListener);
verifyNoInteractions(transactionBroadcaster);
}
@Test
@ -718,21 +732,17 @@ public class TransactionPoolTest {
}
@Test
public void shouldSendOnlyLocalTransactionToNewlyConnectedPeer() {
public void shouldSendFullTransactionPoolToNewlyConnectedPeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthContext ethContext = ethProtocolManager.ethContext();
PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
TransactionPool transactionPool =
new TransactionPool(
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
TransactionPoolConfiguration.DEFAULT);
@ -756,7 +766,7 @@ public class TransactionPoolTest {
Set<Transaction> transactionsToSendToPeer =
peerTransactionTracker.claimTransactionsToSendToPeer(peer.getEthPeer());
assertThat(transactionsToSendToPeer).containsExactly(transactionLocal);
assertThat(transactionsToSendToPeer).contains(transactionLocal, transactionRemote);
}
@Test
@ -782,19 +792,15 @@ public class TransactionPoolTest {
public void shouldIgnoreFeeCapIfSetZero() {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final EthContext ethContext = ethProtocolManager.ethContext();
final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
final Wei twoEthers = Wei.fromEth(2);
final TransactionPool transactionPool =
new TransactionPool(
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
ImmutableTransactionPoolConfiguration.builder().txFeeCap(Wei.ZERO).build());
@ -820,18 +826,14 @@ public class TransactionPoolTest {
public void shouldIgnoreEIP1559TransactionWhenNotAllowed() {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final EthContext ethContext = ethProtocolManager.ethContext();
final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
final TransactionPool transactionPool =
new TransactionPool(
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
ImmutableTransactionPoolConfiguration.builder().txFeeCap(Wei.ONE).build());
@ -857,18 +859,14 @@ public class TransactionPoolTest {
public void shouldIgnoreEIP1559TransactionBeforeTheFork() {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final EthContext ethContext = ethProtocolManager.ethContext();
final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
final TransactionPool transactionPool =
new TransactionPool(
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
ImmutableTransactionPoolConfiguration.builder().txFeeCap(Wei.ONE).build());
@ -896,19 +894,15 @@ public class TransactionPoolTest {
public void shouldRejectLocalTransactionIfFeeCapExceeded() {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final EthContext ethContext = ethProtocolManager.ethContext();
final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
final Wei twoEthers = Wei.fromEth(2);
TransactionPool transactionPool =
new TransactionPool(
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
ImmutableTransactionPoolConfiguration.builder().txFeeCap(twoEthers).build());
@ -936,7 +930,6 @@ public class TransactionPoolTest {
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
final EthContext ethContext = ethProtocolManager.ethContext();
final PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
final Wei twoEthers = Wei.fromEth(2);
final TransactionPool transactionPool =
@ -944,12 +937,9 @@ public class TransactionPoolTest {
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
transactionBroadcaster,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
ImmutableTransactionPoolConfiguration.builder().txFeeCap(twoEthers).build());
@ -1103,7 +1093,7 @@ public class TransactionPoolTest {
private void assertRemoteTransactionValid(final Transaction tx) {
transactionPool.addRemoteTransactions(List.of(tx));
verify(batchAddedListener).onTransactionsAdded(singleton(tx));
verify(transactionBroadcaster).onTransactionsAdded(singleton(tx));
assertTransactionPending(tx);
}
}

@ -27,7 +27,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import com.google.common.collect.ImmutableSet;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@ -57,7 +56,7 @@ public class TransactionsMessageProcessorTest {
ofMinutes(1));
verify(transactionTracker)
.markTransactionsAsSeen(peer1, ImmutableSet.of(transaction1, transaction2, transaction3));
.markTransactionsAsSeen(peer1, asList(transaction1, transaction2, transaction3));
}
@Test
@ -67,8 +66,7 @@ public class TransactionsMessageProcessorTest {
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now(),
ofMinutes(1));
verify(transactionPool)
.addRemoteTransactions(ImmutableSet.of(transaction1, transaction2, transaction3));
verify(transactionPool).addRemoteTransactions(asList(transaction1, transaction2, transaction3));
}
@Test

Loading…
Cancel
Save