From 11a62a072050270b76ef01fec77504f3099f275c Mon Sep 17 00:00:00 2001 From: Stefan Pingel <16143240+pinges@users.noreply.github.com> Date: Wed, 9 Oct 2024 13:36:51 +1000 Subject: [PATCH 1/4] Fix unhandled exception (#7743) * clean up and use thread safe cache Signed-off-by: stefan.pingel@consensys.net --- CHANGELOG.md | 1 + .../p2p/discovery/internal/PeerTable.java | 48 +++++-------------- .../p2p/discovery/internal/PeerTableTest.java | 15 ++---- 3 files changed, 16 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bcbfa51b7..592b445bc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - Corrects a regression where custom plugin services are not initialized correctly. [#7625](https://github.com/hyperledger/besu/pull/7625) - Fix for IBFT2 chains using the BONSAI DB format [#7631](https://github.com/hyperledger/besu/pull/7631) - Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623) +- Fix an undhandled exception. [#7733](https://github.com/hyperledger/besu/issues/7733) ## 24.9.1 diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java index 9f58ab1af9..33bbed0f09 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java @@ -26,16 +26,17 @@ import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.hash.BloomFilter; -import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.tuweni.bytes.Bytes; /** @@ -54,10 +55,7 @@ public class PeerTable { private final Map distanceCache; private BloomFilter idBloom; private int evictionCnt = 0; - private final LinkedHashMapWithMaximumSize ipAddressCheckMap = - new LinkedHashMapWithMaximumSize<>(DEFAULT_BUCKET_SIZE * N_BUCKETS); - private final CircularFifoQueue invalidIPs = - new CircularFifoQueue<>(DEFAULT_BUCKET_SIZE * N_BUCKETS); + private final Cache unresponsiveIPs; /** * Builds a new peer table, where distance is calculated using the provided nodeId as a baseline. @@ -72,6 +70,11 @@ public class PeerTable { .toArray(Bucket[]::new); this.distanceCache = new ConcurrentHashMap<>(); this.maxEntriesCnt = N_BUCKETS * DEFAULT_BUCKET_SIZE; + this.unresponsiveIPs = + CacheBuilder.newBuilder() + .maximumSize(maxEntriesCnt) + .expireAfterWrite(15L, TimeUnit.MINUTES) + .build(); // A bloom filter with 4096 expected insertions of 64-byte keys with a 0.1% false positive // probability yields a memory footprint of ~7.5kb. @@ -140,7 +143,6 @@ public class PeerTable { if (!res.isPresent()) { idBloom.put(id); distanceCache.put(id, distance); - ipAddressCheckMap.put(getKey(peer.getEndpoint()), peer.getEndpoint().getUdpPort()); return AddResult.added(); } @@ -214,26 +216,12 @@ public class PeerTable { public boolean isIpAddressInvalid(final Endpoint endpoint) { final String key = getKey(endpoint); - if (invalidIPs.contains(key)) { - return true; - } - if (ipAddressCheckMap.containsKey(key) && ipAddressCheckMap.get(key) != endpoint.getUdpPort()) { - // This peer has multiple discovery services on the same IP address + TCP port. - invalidIPs.add(key); - for (final Bucket bucket : table) { - bucket.getPeers().stream() - .filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost())) - .forEach(bucket::evict); - } - return true; - } else { - return false; - } + return unresponsiveIPs.getIfPresent(key) != null; } public void invalidateIP(final Endpoint endpoint) { final String key = getKey(endpoint); - invalidIPs.add(key); + unresponsiveIPs.put(key, Integer.MAX_VALUE); } private static String getKey(final Endpoint endpoint) { @@ -313,20 +301,6 @@ public class PeerTable { } } - private static class LinkedHashMapWithMaximumSize extends LinkedHashMap { - private final int maxSize; - - public LinkedHashMapWithMaximumSize(final int maxSize) { - super(maxSize, 0.75f, false); - this.maxSize = maxSize; - } - - @Override - protected boolean removeEldestEntry(final Map.Entry eldest) { - return size() > maxSize; - } - } - static class EvictResult { public enum EvictOutcome { EVICTED, diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java index 331711a07f..5d2fc4a43a 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java @@ -188,15 +188,12 @@ public class PeerTableTest { @Test public void ipAddressIsInvalidReturnsTrue() { final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1))); - final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(1))); final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1); - final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2); final PeerTable table = new PeerTable(Bytes.random(64)); - final PeerTable.AddResult addResult1 = table.tryAdd(peer1); - assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome()); + table.invalidateIP(endpoint1); - assertThat(table.isIpAddressInvalid(peer2.getEndpoint())).isEqualTo(true); + assertThat(table.isIpAddressInvalid(peer1.getEndpoint())).isEqualTo(true); } @Test @@ -216,16 +213,12 @@ public class PeerTableTest { @Test public void invalidIPAddressNotAdded() { final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1))); - final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(1))); final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1); - final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2); final PeerTable table = new PeerTable(Bytes.random(64)); + table.invalidateIP(endpoint1); final PeerTable.AddResult addResult1 = table.tryAdd(peer1); - assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome()); - - final PeerTable.AddResult addResult2 = table.tryAdd(peer2); - assertThat(addResult2.getOutcome()).isEqualTo(PeerTable.AddResult.invalid().getOutcome()); + assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.invalid().getOutcome()); } @Test From 0cd51065d1a470c446df2cc7f1a0033cfeed170d Mon Sep 17 00:00:00 2001 From: Stefan Pingel <16143240+pinges@users.noreply.github.com> Date: Wed, 9 Oct 2024 17:33:17 +1000 Subject: [PATCH 2/4] Keep track of blobs that are part of multiple transactions (#7723) * keep track of blobs that are part of multiple transactions Signed-off-by: stefan.pingel@consensys.net Signed-off-by: Stefan Pingel <16143240+pinges@users.noreply.github.com> --- .../besu/ethereum/core/BlobTestFixture.java | 14 +- .../eth/transactions/TransactionPool.java | 23 +- .../AbstractTransactionPoolTest.java | 508 +-------------- .../AbstractTransactionPoolTestBase.java | 585 ++++++++++++++++++ .../BlobV1TransactionPoolTest.java | 141 +++++ 5 files changed, 756 insertions(+), 515 deletions(-) create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTestBase.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BlobV1TransactionPoolTest.java diff --git a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlobTestFixture.java b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlobTestFixture.java index 21e8630c84..9c3f62126f 100644 --- a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlobTestFixture.java +++ b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlobTestFixture.java @@ -23,8 +23,6 @@ import org.hyperledger.besu.datatypes.KZGProof; import org.hyperledger.besu.datatypes.VersionedHash; import org.hyperledger.besu.evm.precompile.KZGPointEvalPrecompiledContract; -import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -36,6 +34,8 @@ import org.bouncycastle.crypto.digests.SHA256Digest; public class BlobTestFixture { + private byte byteValue = 0x00; + public BlobTestFixture() { try { // optimistically tear down a potential previous loaded trusted setup @@ -58,14 +58,8 @@ public class BlobTestFixture { ; public BlobTriplet createBlobTriplet() { - byte[] rawMaterial = {}; - try (InputStream readme = - BlobTestFixture.class.getResourceAsStream( - "/org/hyperledger/besu/ethereum/core/encoding/BlobDataFixture.bin")) { - rawMaterial = readme.readAllBytes(); - } catch (IOException e) { - fail("Failed to read blob file", e); - } + byte[] rawMaterial = new byte[131072]; + rawMaterial[0] = byteValue++; Bytes48 commitment = Bytes48.wrap(CKZG4844JNI.blobToKzgCommitment(rawMaterial)); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 7bbf1abe3b..f3ce1b4ebd 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -61,6 +61,7 @@ import java.util.HashMap; import java.util.IntSummaryStatistics; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; @@ -77,6 +78,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimaps; import org.apache.tuweni.bytes.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,8 +110,10 @@ public class TransactionPool implements BlockAddedObserver { private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager(); private final Set
localSenders = ConcurrentHashMap.newKeySet(); private final EthScheduler.OrderedProcessor blockAddedEventOrderedProcessor; - private final Map mapOfBlobsInTransactionPool = - new HashMap<>(); + private final ListMultimap + mapOfBlobsInTransactionPool = + Multimaps.synchronizedListMultimap( + Multimaps.newListMultimap(new HashMap<>(), () -> new ArrayList<>(1))); public TransactionPool( final Supplier pendingTransactionsSupplier, @@ -660,6 +665,7 @@ public class TransactionPool implements BlockAddedObserver { } final List blobQuads = maybeBlobsWithCommitments.get().getBlobQuads(); + blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.put(bq.versionedHash(), bq)); } @@ -672,15 +678,18 @@ public class TransactionPool implements BlockAddedObserver { } final List blobQuads = maybeBlobsWithCommitments.get().getBlobQuads(); - blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.remove(bq.versionedHash())); + + blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.remove(bq.versionedHash(), bq)); } public BlobsWithCommitments.BlobQuad getBlobQuad(final VersionedHash vh) { - BlobsWithCommitments.BlobQuad blobQuad = mapOfBlobsInTransactionPool.get(vh); - if (blobQuad == null) { - blobQuad = cacheForBlobsOfTransactionsAddedToABlock.get(vh); + try { + // returns an empty list if the key is not present, so getFirst() will throw + return mapOfBlobsInTransactionPool.get(vh).getFirst(); + } catch (NoSuchElementException e) { + // do nothing } - return blobQuad; + return cacheForBlobsOfTransactionsAddedToABlock.get(vh); } public boolean isEnabled() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java index e5ec06fd35..656751bed1 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java @@ -17,7 +17,6 @@ package org.hyperledger.besu.ethereum.eth.transactions; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.hyperledger.besu.ethereum.mainnet.ValidationResult.valid; import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.EXCEEDS_BLOCK_GAS_LIMIT; @@ -29,275 +28,55 @@ import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.TRANSACTION_REPLACEMENT_UNDERPRICED; import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.TX_FEECAP_EXCEEDED; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.nullable; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.mockito.quality.Strictness.LENIENT; -import org.hyperledger.besu.config.GenesisConfigFile; -import org.hyperledger.besu.crypto.KeyPair; -import org.hyperledger.besu.crypto.SignatureAlgorithmFactory; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; -import org.hyperledger.besu.ethereum.ProtocolContext; -import org.hyperledger.besu.ethereum.chain.BadBlockManager; -import org.hyperledger.besu.ethereum.chain.MutableBlockchain; -import org.hyperledger.besu.ethereum.core.BlobTestFixture; import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.core.BlockHeaderBuilder; -import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.Difficulty; -import org.hyperledger.besu.ethereum.core.ExecutionContextTestFixture; -import org.hyperledger.besu.ethereum.core.MiningParameters; -import org.hyperledger.besu.ethereum.core.PrivacyParameters; import org.hyperledger.besu.ethereum.core.Transaction; -import org.hyperledger.besu.ethereum.core.TransactionReceipt; -import org.hyperledger.besu.ethereum.core.TransactionTestFixture; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; -import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.messages.EthPV65; -import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredTransactionPoolBaseFeeTest; -import org.hyperledger.besu.ethereum.eth.transactions.sorter.LegacyTransactionPoolBaseFeeTest; -import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; -import org.hyperledger.besu.ethereum.mainnet.ProtocolScheduleBuilder; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSpecAdapters; import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams; -import org.hyperledger.besu.ethereum.mainnet.TransactionValidatorFactory; import org.hyperledger.besu.ethereum.mainnet.ValidationResult; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason; -import org.hyperledger.besu.evm.account.Account; -import org.hyperledger.besu.evm.internal.EvmConfiguration; -import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; -import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.TransactionPoolValidatorService; -import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidator; -import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidatorFactory; -import org.hyperledger.besu.testutil.DeterministicEthScheduler; import org.hyperledger.besu.util.number.Percentage; -import java.math.BigInteger; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledIf; import org.junit.jupiter.api.condition.EnabledIf; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Answers; import org.mockito.ArgumentCaptor; -import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @SuppressWarnings("unchecked") @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = LENIENT) -public abstract class AbstractTransactionPoolTest { - - protected static final KeyPair KEY_PAIR1 = - SignatureAlgorithmFactory.getInstance().generateKeyPair(); - private static final KeyPair KEY_PAIR2 = - SignatureAlgorithmFactory.getInstance().generateKeyPair(); - protected static final Wei BASE_FEE_FLOOR = Wei.of(7L); - protected static final Wei DEFAULT_MIN_GAS_PRICE = Wei.of(50L); - - protected final EthScheduler ethScheduler = new DeterministicEthScheduler(); - - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - protected TransactionValidatorFactory transactionValidatorFactory; - - @Mock protected PendingTransactionAddedListener listener; - - @Mock protected TransactionsMessageSender transactionsMessageSender; - @Mock protected NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender; - @Mock protected ProtocolSpec protocolSpec; - - protected ProtocolSchedule protocolSchedule; - - protected final MetricsSystem metricsSystem = new NoOpMetricsSystem(); - protected MutableBlockchain blockchain; - protected TransactionBroadcaster transactionBroadcaster; - - protected PendingTransactions transactions; - protected final Transaction transaction0 = createTransaction(0); - protected final Transaction transaction1 = createTransaction(1); - protected final Transaction transactionBlob = createBlobTransaction(2); - - protected final Transaction transactionOtherSender = createTransaction(1, KEY_PAIR2); - private ExecutionContextTestFixture executionContext; - protected ProtocolContext protocolContext; - protected TransactionPool transactionPool; - protected long blockGasLimit; - protected EthProtocolManager ethProtocolManager; - protected EthContext ethContext; - private PeerTransactionTracker peerTransactionTracker; - private ArgumentCaptor syncTaskCapture; - - protected abstract PendingTransactions createPendingTransactions( - final TransactionPoolConfiguration poolConfig, - final BiFunction - transactionReplacementTester); - - protected TransactionTestFixture createBaseTransactionGasPriceMarket( - final int transactionNumber) { - return new TransactionTestFixture() - .nonce(transactionNumber) - .gasLimit(blockGasLimit) - .type(TransactionType.FRONTIER); - } - - protected TransactionTestFixture createBaseTransactionBaseFeeMarket(final int nonce) { - return new TransactionTestFixture() - .nonce(nonce) - .gasLimit(blockGasLimit) - .gasPrice(null) - .maxFeePerGas(Optional.of(Wei.of(5000L))) - .maxPriorityFeePerGas(Optional.of(Wei.of(1000L))) - .type(TransactionType.EIP1559); - } - - protected abstract ExecutionContextTestFixture createExecutionContextTestFixture(); - - protected static ExecutionContextTestFixture createExecutionContextTestFixtureBaseFeeMarket() { - final var genesisConfigFile = GenesisConfigFile.fromResource("/txpool-test-genesis.json"); - final ProtocolSchedule protocolSchedule = - new ProtocolScheduleBuilder( - genesisConfigFile.getConfigOptions(), - BigInteger.valueOf(1), - ProtocolSpecAdapters.create(0, Function.identity()), - new PrivacyParameters(), - false, - EvmConfiguration.DEFAULT, - MiningParameters.MINING_DISABLED, - new BadBlockManager(), - false, - new NoOpMetricsSystem()) - .createProtocolSchedule(); - final ExecutionContextTestFixture executionContextTestFixture = - ExecutionContextTestFixture.builder(genesisConfigFile) - .protocolSchedule(protocolSchedule) - .build(); - - final Block block = - new Block( - new BlockHeaderTestFixture() - .gasLimit( - executionContextTestFixture - .getBlockchain() - .getChainHeadBlock() - .getHeader() - .getGasLimit()) - .difficulty(Difficulty.ONE) - .baseFeePerGas(Wei.of(10L)) - .parentHash(executionContextTestFixture.getBlockchain().getChainHeadHash()) - .number(executionContextTestFixture.getBlockchain().getChainHeadBlockNumber() + 1) - .buildHeader(), - new BlockBody(List.of(), List.of())); - executionContextTestFixture.getBlockchain().appendBlock(block, List.of()); - - return executionContextTestFixture; - } - - protected abstract FeeMarket getFeeMarket(); - - @BeforeEach - public void setUp() { - executionContext = createExecutionContextTestFixture(); - protocolContext = executionContext.getProtocolContext(); - blockchain = executionContext.getBlockchain(); - when(protocolSpec.getTransactionValidatorFactory()).thenReturn(transactionValidatorFactory); - when(protocolSpec.getFeeMarket()).thenReturn(getFeeMarket()); - protocolSchedule = spy(executionContext.getProtocolSchedule()); - doReturn(protocolSpec).when(protocolSchedule).getByBlockHeader(any()); - blockGasLimit = blockchain.getChainHeadBlock().getHeader().getGasLimit(); - ethProtocolManager = EthProtocolManagerTestUtil.create(); - ethContext = spy(ethProtocolManager.ethContext()); - - final EthScheduler ethScheduler = spy(ethContext.getScheduler()); - syncTaskCapture = ArgumentCaptor.forClass(Runnable.class); - doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture()); - doReturn(ethScheduler).when(ethContext).getScheduler(); - - peerTransactionTracker = new PeerTransactionTracker(ethContext.getEthPeers()); - transactionBroadcaster = - spy( - new TransactionBroadcaster( - ethContext, - peerTransactionTracker, - transactionsMessageSender, - newPooledTransactionHashesMessageSender)); - - transactionPool = createTransactionPool(); - blockchain.observeBlockAdded(transactionPool); - } - - protected TransactionPool createTransactionPool() { - return createTransactionPool(b -> b.minGasPrice(Wei.of(2))); - } - - private TransactionPool createTransactionPool( - final Consumer configConsumer) { - final ImmutableTransactionPoolConfiguration.Builder configBuilder = - ImmutableTransactionPoolConfiguration.builder(); - configConsumer.accept(configBuilder); - final TransactionPoolConfiguration poolConfig = configBuilder.build(); - - final TransactionPoolReplacementHandler transactionReplacementHandler = - new TransactionPoolReplacementHandler( - poolConfig.getPriceBump(), poolConfig.getBlobPriceBump()); - - final BiFunction transactionReplacementTester = - (t1, t2) -> - transactionReplacementHandler.shouldReplace( - t1, t2, protocolContext.getBlockchain().getChainHeadHeader()); - - transactions = spy(createPendingTransactions(poolConfig, transactionReplacementTester)); - - final TransactionPool txPool = - new TransactionPool( - () -> transactions, - protocolSchedule, - protocolContext, - transactionBroadcaster, - ethContext, - new TransactionPoolMetrics(metricsSystem), - poolConfig, - new BlobCache()); - txPool.setEnabled(); - return txPool; - } +public abstract class AbstractTransactionPoolTest extends AbstractTransactionPoolTestBase { @ParameterizedTest @ValueSource(booleans = {true, false}) @@ -466,21 +245,21 @@ public abstract class AbstractTransactionPoolTest { public void shouldReAddBlobTxsWhenReorgHappens() { givenTransactionIsValid(transaction0); givenTransactionIsValid(transaction1); - givenTransactionIsValid(transactionBlob); + givenTransactionIsValid(transactionWithBlobs); addAndAssertRemoteTransactionsValid(transaction0); addAndAssertRemoteTransactionsValid(transaction1); - addAndAssertRemoteTransactionsValid(transactionBlob); + addAndAssertRemoteTransactionsValid(transactionWithBlobs); final BlockHeader commonParent = getHeaderForCurrentChainHead(); final Block originalFork1 = appendBlock(Difficulty.of(1000), commonParent, transaction0); final Block originalFork2 = appendBlock(Difficulty.of(10), originalFork1.getHeader(), transaction1); final Block originalFork3 = - appendBlock(Difficulty.of(1), originalFork2.getHeader(), transactionBlob); + appendBlock(Difficulty.of(1), originalFork2.getHeader(), transactionWithBlobs); assertTransactionNotPending(transaction0); assertTransactionNotPending(transaction1); - assertTransactionNotPending(transactionBlob); + assertTransactionNotPending(transactionWithBlobs); final Block reorgFork1 = appendBlock(Difficulty.ONE, commonParent); verifyChainHeadIs(originalFork3); @@ -493,14 +272,15 @@ public abstract class AbstractTransactionPoolTest { assertTransactionPending(transaction0); assertTransactionPending(transaction1); - assertTransactionPending(transactionBlob); + assertTransactionPending(transactionWithBlobs); - Optional maybeBlob = transactions.getTransactionByHash(transactionBlob.getHash()); + Optional maybeBlob = + transactions.getTransactionByHash(transactionWithBlobs.getHash()); assertThat(maybeBlob).isPresent(); Transaction restoredBlob = maybeBlob.get(); - assertThat(restoredBlob).isEqualTo(transactionBlob); + assertThat(restoredBlob).isEqualTo(transactionWithBlobs); assertThat(restoredBlob.getBlobsWithCommitments().get().getBlobQuads()) - .isEqualTo(transactionBlob.getBlobsWithCommitments().get().getBlobQuads()); + .isEqualTo(transactionWithBlobs.getBlobsWithCommitments().get().getBlobQuads()); } @ParameterizedTest @@ -1281,272 +1061,4 @@ public abstract class AbstractTransactionPoolTest { .map(PendingTransaction::getTransaction) .containsExactlyInAnyOrder(transaction1, transaction2a, transaction3); } - - private static TransactionPoolValidatorService getTransactionPoolValidatorServiceReturning( - final String errorMessage) { - return new TransactionPoolValidatorService() { - @Override - public PluginTransactionPoolValidator createTransactionValidator() { - return (transaction, isLocal, hasPriority) -> Optional.ofNullable(errorMessage); - } - - @Override - public void registerPluginTransactionValidatorFactory( - final PluginTransactionPoolValidatorFactory pluginTransactionPoolValidatorFactory) {} - }; - } - - @SuppressWarnings("unused") - private static boolean isBaseFeeMarket(final ExtensionContext extensionContext) { - final Class cz = extensionContext.getTestClass().get(); - - return cz.equals(LegacyTransactionPoolBaseFeeTest.class) - || cz.equals(LayeredTransactionPoolBaseFeeTest.class); - } - - protected void assertTransactionNotPending(final Transaction transaction) { - assertThat(transactions.getTransactionByHash(transaction.getHash())).isEmpty(); - } - - protected void addAndAssertRemoteTransactionInvalid(final Transaction tx) { - transactionPool.addRemoteTransactions(List.of(tx)); - - verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx)); - assertTransactionNotPending(tx); - } - - protected void assertTransactionPending(final Transaction t) { - assertThat(transactions.getTransactionByHash(t.getHash())).contains(t); - } - - protected void addAndAssertRemoteTransactionsValid(final Transaction... txs) { - addAndAssertRemoteTransactionsValid(false, txs); - } - - protected void addAndAssertRemotePriorityTransactionsValid(final Transaction... txs) { - addAndAssertRemoteTransactionsValid(true, txs); - } - - protected void addAndAssertRemoteTransactionsValid( - final boolean hasPriority, final Transaction... txs) { - transactionPool.addRemoteTransactions(List.of(txs)); - - verify(transactionBroadcaster) - .onTransactionsAdded( - argThat(btxs -> btxs.size() == txs.length && btxs.containsAll(List.of(txs)))); - Arrays.stream(txs).forEach(this::assertTransactionPending); - assertThat(transactions.getLocalTransactions()).doesNotContain(txs); - if (hasPriority) { - assertThat(transactions.getPriorityTransactions()).contains(txs); - } - } - - protected void addAndAssertTransactionViaApiValid( - final Transaction tx, final boolean disableLocalPriority) { - final ValidationResult result = - transactionPool.addTransactionViaApi(tx); - - assertThat(result.isValid()).isTrue(); - assertTransactionPending(tx); - verify(transactionBroadcaster).onTransactionsAdded(singletonList(tx)); - assertThat(transactions.getLocalTransactions()).contains(tx); - if (disableLocalPriority) { - assertThat(transactions.getPriorityTransactions()).doesNotContain(tx); - } else { - assertThat(transactions.getPriorityTransactions()).contains(tx); - } - } - - protected void addAndAssertTransactionViaApiInvalid( - final Transaction tx, final TransactionInvalidReason invalidReason) { - final ValidationResult result = - transactionPool.addTransactionViaApi(tx); - - assertThat(result.isValid()).isFalse(); - assertThat(result.getInvalidReason()).isEqualTo(invalidReason); - assertTransactionNotPending(tx); - verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx)); - } - - @SuppressWarnings("unchecked") - protected void givenTransactionIsValid(final Transaction transaction) { - when(transactionValidatorFactory - .get() - .validate(eq(transaction), any(Optional.class), any(Optional.class), any())) - .thenReturn(valid()); - when(transactionValidatorFactory - .get() - .validateForSender( - eq(transaction), nullable(Account.class), any(TransactionValidationParams.class))) - .thenReturn(valid()); - } - - protected abstract Block appendBlock( - final Difficulty difficulty, - final BlockHeader parentBlock, - final Transaction... transactionsToAdd); - - protected Transaction createTransactionGasPriceMarket( - final int transactionNumber, final Wei maxPrice) { - return createBaseTransaction(transactionNumber).gasPrice(maxPrice).createTransaction(KEY_PAIR1); - } - - protected Transaction createTransactionBaseFeeMarket(final int nonce, final Wei maxPrice) { - return createBaseTransaction(nonce) - .maxFeePerGas(Optional.of(maxPrice)) - .maxPriorityFeePerGas(Optional.of(maxPrice.divide(5L))) - .createTransaction(KEY_PAIR1); - } - - protected abstract TransactionTestFixture createBaseTransaction(final int nonce); - - protected Transaction createTransaction( - final int transactionNumber, final Optional maybeChainId) { - return createBaseTransaction(transactionNumber) - .chainId(maybeChainId) - .createTransaction(KEY_PAIR1); - } - - protected abstract Transaction createTransaction(final int nonce, final Wei maxPrice); - - protected Transaction createTransaction(final int nonce) { - return createTransaction(nonce, Optional.of(BigInteger.ONE)); - } - - protected Transaction createTransaction(final int nonce, final KeyPair keyPair) { - return createBaseTransaction(nonce).createTransaction(keyPair); - } - - protected void verifyChainHeadIs(final Block forkBlock2) { - assertThat(blockchain.getChainHeadHash()).isEqualTo(forkBlock2.getHash()); - } - - protected BlockHeader getHeaderForCurrentChainHead() { - return blockchain.getBlockHeader(blockchain.getChainHeadHash()).get(); - } - - protected void appendBlock(final Transaction... transactionsToAdd) { - appendBlock(Difficulty.ONE, getHeaderForCurrentChainHead(), transactionsToAdd); - } - - protected void protocolSupportsTxReplayProtection( - final long chainId, final boolean isSupportedAtCurrentBlock) { - when(protocolSpec.isReplayProtectionSupported()).thenReturn(isSupportedAtCurrentBlock); - when(protocolSchedule.getChainId()).thenReturn(Optional.of(BigInteger.valueOf(chainId))); - } - - protected void protocolDoesNotSupportTxReplayProtection() { - when(protocolSchedule.getChainId()).thenReturn(Optional.empty()); - } - - protected Transaction createTransactionWithoutChainId(final int transactionNumber) { - return createTransaction(transactionNumber, Optional.empty()); - } - - protected void whenBlockBaseFeeIs(final Wei baseFee) { - final BlockHeader header = - BlockHeaderBuilder.fromHeader(blockchain.getChainHeadHeader()) - .baseFee(baseFee) - .blockHeaderFunctions(new MainnetBlockHeaderFunctions()) - .parentHash(blockchain.getChainHeadHash()) - .buildBlockHeader(); - blockchain.appendBlock(new Block(header, BlockBody.empty()), emptyList()); - } - - protected Transaction createFrontierTransaction(final int transactionNumber, final Wei gasPrice) { - return new TransactionTestFixture() - .nonce(transactionNumber) - .gasPrice(gasPrice) - .gasLimit(blockGasLimit) - .type(TransactionType.FRONTIER) - .createTransaction(KEY_PAIR1); - } - - protected Transaction createBlobTransaction(final int nonce) { - return new TransactionTestFixture() - .nonce(nonce) - .gasLimit(blockGasLimit) - .gasPrice(null) - .maxFeePerGas(Optional.of(Wei.of(5000L))) - .maxPriorityFeePerGas(Optional.of(Wei.of(1000L))) - .type(TransactionType.BLOB) - .blobsWithCommitments(Optional.of(new BlobTestFixture().createBlobsWithCommitments(6))) - .createTransaction(KEY_PAIR1); - } - - protected int addTxAndGetPendingTxsCount( - final Wei genesisBaseFee, - final Wei minGasPrice, - final Wei lastBlockBaseFee, - final Wei txMaxFeePerGas, - final boolean isLocal, - final boolean hasPriority) { - when(protocolSpec.getFeeMarket()).thenReturn(FeeMarket.london(0, Optional.of(genesisBaseFee))); - whenBlockBaseFeeIs(lastBlockBaseFee); - - final Transaction transaction = createTransaction(0, txMaxFeePerGas); - if (hasPriority) { - transactionPool = - createTransactionPool( - b -> b.minGasPrice(minGasPrice).prioritySenders(Set.of(transaction.getSender()))); - } else { - transactionPool = - createTransactionPool(b -> b.minGasPrice(minGasPrice).noLocalPriority(true)); - } - - givenTransactionIsValid(transaction); - - if (isLocal) { - transactionPool.addTransactionViaApi(transaction); - } else { - transactionPool.addRemoteTransactions(List.of(transaction)); - } - - return transactions.size(); - } - - protected Block appendBlockGasPriceMarket( - final Difficulty difficulty, - final BlockHeader parentBlock, - final Transaction[] transactionsToAdd) { - final List transactionList = asList(transactionsToAdd); - final Block block = - new Block( - new BlockHeaderTestFixture() - .difficulty(difficulty) - .gasLimit(parentBlock.getGasLimit()) - .parentHash(parentBlock.getHash()) - .number(parentBlock.getNumber() + 1) - .buildHeader(), - new BlockBody(transactionList, emptyList())); - final List transactionReceipts = - transactionList.stream() - .map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty())) - .collect(toList()); - blockchain.appendBlock(block, transactionReceipts); - return block; - } - - protected Block appendBlockBaseFeeMarket( - final Difficulty difficulty, - final BlockHeader parentBlock, - final Transaction[] transactionsToAdd) { - final List transactionList = asList(transactionsToAdd); - final Block block = - new Block( - new BlockHeaderTestFixture() - .baseFeePerGas(Wei.of(10L)) - .gasLimit(parentBlock.getGasLimit()) - .difficulty(difficulty) - .parentHash(parentBlock.getHash()) - .number(parentBlock.getNumber() + 1) - .buildHeader(), - new BlockBody(transactionList, emptyList())); - final List transactionReceipts = - transactionList.stream() - .map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty())) - .collect(toList()); - blockchain.appendBlock(block, transactionReceipts); - return block; - } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTestBase.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTestBase.java new file mode 100644 index 0000000000..c3ca24a4e1 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTestBase.java @@ -0,0 +1,585 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.transactions; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.ethereum.mainnet.ValidationResult.valid; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.quality.Strictness.LENIENT; + +import org.hyperledger.besu.config.GenesisConfigFile; +import org.hyperledger.besu.crypto.KeyPair; +import org.hyperledger.besu.crypto.SignatureAlgorithmFactory; +import org.hyperledger.besu.datatypes.BlobsWithCommitments; +import org.hyperledger.besu.datatypes.TransactionType; +import org.hyperledger.besu.datatypes.Wei; +import org.hyperledger.besu.ethereum.ProtocolContext; +import org.hyperledger.besu.ethereum.chain.BadBlockManager; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.BlobTestFixture; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderBuilder; +import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.core.ExecutionContextTestFixture; +import org.hyperledger.besu.ethereum.core.MiningParameters; +import org.hyperledger.besu.ethereum.core.PrivacyParameters; +import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.core.TransactionTestFixture; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; +import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredTransactionPoolBaseFeeTest; +import org.hyperledger.besu.ethereum.eth.transactions.sorter.LegacyTransactionPoolBaseFeeTest; +import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolScheduleBuilder; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpecAdapters; +import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams; +import org.hyperledger.besu.ethereum.mainnet.TransactionValidatorFactory; +import org.hyperledger.besu.ethereum.mainnet.ValidationResult; +import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; +import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason; +import org.hyperledger.besu.evm.account.Account; +import org.hyperledger.besu.evm.internal.EvmConfiguration; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.TransactionPoolValidatorService; +import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidator; +import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidatorFactory; +import org.hyperledger.besu.testutil.DeterministicEthScheduler; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; + +@SuppressWarnings("unchecked") +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = LENIENT) +public abstract class AbstractTransactionPoolTestBase { + + protected static final KeyPair KEY_PAIR1 = + SignatureAlgorithmFactory.getInstance().generateKeyPair(); + private static final KeyPair KEY_PAIR2 = + SignatureAlgorithmFactory.getInstance().generateKeyPair(); + protected static final Wei BASE_FEE_FLOOR = Wei.of(7L); + protected static final Wei DEFAULT_MIN_GAS_PRICE = Wei.of(50L); + + protected final EthScheduler ethScheduler = new DeterministicEthScheduler(); + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + protected TransactionValidatorFactory transactionValidatorFactory; + + @Mock protected PendingTransactionAddedListener listener; + + @Mock protected TransactionsMessageSender transactionsMessageSender; + @Mock protected NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender; + @Mock protected ProtocolSpec protocolSpec; + + protected ProtocolSchedule protocolSchedule; + + protected final MetricsSystem metricsSystem = new NoOpMetricsSystem(); + protected MutableBlockchain blockchain; + protected TransactionBroadcaster transactionBroadcaster; + + protected PendingTransactions transactions; + protected final Transaction transaction0 = createTransaction(0); + protected final Transaction transaction1 = createTransaction(1); + protected final Transaction transactionWithBlobs = createBlobTransaction(2); + protected final Transaction transactionWithBlobsReplacement = + createReplacementTransactionWithBlobs(2); + protected final Transaction transactionWithSameBlobs = + createBlobTransactionWithSameBlobs(3, transactionWithBlobs.getBlobsWithCommitments().get()); + protected final Transaction transactionWithSameBlobsReplacement = + createReplacementTransactionWithBlobs(3); + + protected final Transaction transactionOtherSender = createTransaction(1, KEY_PAIR2); + private ExecutionContextTestFixture executionContext; + protected ProtocolContext protocolContext; + protected TransactionPool transactionPool; + protected long blockGasLimit; + protected EthProtocolManager ethProtocolManager; + protected EthContext ethContext; + protected ArgumentCaptor syncTaskCapture; + protected PeerTransactionTracker peerTransactionTracker; + private BlobTestFixture blobTestFixture; + + protected abstract PendingTransactions createPendingTransactions( + final TransactionPoolConfiguration poolConfig, + final BiFunction + transactionReplacementTester); + + protected TransactionTestFixture createBaseTransactionGasPriceMarket( + final int transactionNumber) { + return new TransactionTestFixture() + .nonce(transactionNumber) + .gasLimit(blockGasLimit) + .type(TransactionType.FRONTIER); + } + + protected TransactionTestFixture createBaseTransactionBaseFeeMarket(final int nonce) { + return new TransactionTestFixture() + .nonce(nonce) + .gasLimit(blockGasLimit) + .gasPrice(null) + .maxFeePerGas(Optional.of(Wei.of(5000L))) + .maxPriorityFeePerGas(Optional.of(Wei.of(1000L))) + .type(TransactionType.EIP1559); + } + + protected abstract ExecutionContextTestFixture createExecutionContextTestFixture(); + + protected static ExecutionContextTestFixture createExecutionContextTestFixtureBaseFeeMarket() { + final var genesisConfigFile = GenesisConfigFile.fromResource("/txpool-test-genesis.json"); + final ProtocolSchedule protocolSchedule = + new ProtocolScheduleBuilder( + genesisConfigFile.getConfigOptions(), + BigInteger.valueOf(1), + ProtocolSpecAdapters.create(0, Function.identity()), + new PrivacyParameters(), + false, + EvmConfiguration.DEFAULT, + MiningParameters.MINING_DISABLED, + new BadBlockManager(), + false, + new NoOpMetricsSystem()) + .createProtocolSchedule(); + final ExecutionContextTestFixture executionContextTestFixture = + ExecutionContextTestFixture.builder(genesisConfigFile) + .protocolSchedule(protocolSchedule) + .build(); + + final Block block = + new Block( + new BlockHeaderTestFixture() + .gasLimit( + executionContextTestFixture + .getBlockchain() + .getChainHeadBlock() + .getHeader() + .getGasLimit()) + .difficulty(Difficulty.ONE) + .baseFeePerGas(Wei.of(10L)) + .parentHash(executionContextTestFixture.getBlockchain().getChainHeadHash()) + .number(executionContextTestFixture.getBlockchain().getChainHeadBlockNumber() + 1) + .buildHeader(), + new BlockBody(List.of(), List.of())); + executionContextTestFixture.getBlockchain().appendBlock(block, List.of()); + + return executionContextTestFixture; + } + + protected abstract FeeMarket getFeeMarket(); + + @BeforeEach + public void setUp() { + executionContext = createExecutionContextTestFixture(); + protocolContext = executionContext.getProtocolContext(); + blockchain = executionContext.getBlockchain(); + when(protocolSpec.getTransactionValidatorFactory()).thenReturn(transactionValidatorFactory); + when(protocolSpec.getFeeMarket()).thenReturn(getFeeMarket()); + protocolSchedule = spy(executionContext.getProtocolSchedule()); + doReturn(protocolSpec).when(protocolSchedule).getByBlockHeader(any()); + blockGasLimit = blockchain.getChainHeadBlock().getHeader().getGasLimit(); + ethProtocolManager = EthProtocolManagerTestUtil.create(); + ethContext = spy(ethProtocolManager.ethContext()); + + final EthScheduler ethScheduler = spy(ethContext.getScheduler()); + syncTaskCapture = ArgumentCaptor.forClass(Runnable.class); + doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture()); + doReturn(ethScheduler).when(ethContext).getScheduler(); + + peerTransactionTracker = new PeerTransactionTracker(ethContext.getEthPeers()); + transactionBroadcaster = + spy( + new TransactionBroadcaster( + ethContext, + peerTransactionTracker, + transactionsMessageSender, + newPooledTransactionHashesMessageSender)); + + transactionPool = createTransactionPool(); + blockchain.observeBlockAdded(transactionPool); + } + + protected TransactionPool createTransactionPool() { + return createTransactionPool(b -> b.minGasPrice(Wei.of(2))); + } + + TransactionPool createTransactionPool( + final Consumer configConsumer) { + final ImmutableTransactionPoolConfiguration.Builder configBuilder = + ImmutableTransactionPoolConfiguration.builder(); + configConsumer.accept(configBuilder); + final TransactionPoolConfiguration poolConfig = configBuilder.build(); + + final TransactionPoolReplacementHandler transactionReplacementHandler = + new TransactionPoolReplacementHandler( + poolConfig.getPriceBump(), poolConfig.getBlobPriceBump()); + + final BiFunction transactionReplacementTester = + (t1, t2) -> + transactionReplacementHandler.shouldReplace( + t1, t2, protocolContext.getBlockchain().getChainHeadHeader()); + + transactions = spy(createPendingTransactions(poolConfig, transactionReplacementTester)); + + final TransactionPool txPool = + new TransactionPool( + () -> transactions, + protocolSchedule, + protocolContext, + transactionBroadcaster, + ethContext, + new TransactionPoolMetrics(metricsSystem), + poolConfig, + new BlobCache()); + txPool.setEnabled(); + return txPool; + } + + static TransactionPoolValidatorService getTransactionPoolValidatorServiceReturning( + final String errorMessage) { + return new TransactionPoolValidatorService() { + @Override + public PluginTransactionPoolValidator createTransactionValidator() { + return (transaction, isLocal, hasPriority) -> Optional.ofNullable(errorMessage); + } + + @Override + public void registerPluginTransactionValidatorFactory( + final PluginTransactionPoolValidatorFactory pluginTransactionPoolValidatorFactory) {} + }; + } + + @SuppressWarnings("unused") + private static boolean isBaseFeeMarket(final ExtensionContext extensionContext) { + final Class cz = extensionContext.getTestClass().get(); + + return cz.equals(LegacyTransactionPoolBaseFeeTest.class) + || cz.equals(LayeredTransactionPoolBaseFeeTest.class) + || cz.equals(BlobV1TransactionPoolTest.class); + } + + protected void assertTransactionNotPending(final Transaction transaction) { + assertThat(transactions.getTransactionByHash(transaction.getHash())).isEmpty(); + } + + protected void addAndAssertRemoteTransactionInvalid(final Transaction tx) { + transactionPool.addRemoteTransactions(List.of(tx)); + + verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx)); + assertTransactionNotPending(tx); + } + + protected void assertTransactionPending(final Transaction t) { + assertThat(transactions.getTransactionByHash(t.getHash())).contains(t); + } + + protected void addAndAssertRemoteTransactionsValid(final Transaction... txs) { + addAndAssertRemoteTransactionsValid(false, txs); + } + + protected void addAndAssertRemotePriorityTransactionsValid(final Transaction... txs) { + addAndAssertRemoteTransactionsValid(true, txs); + } + + protected void addAndAssertRemoteTransactionsValid( + final boolean hasPriority, final Transaction... txs) { + transactionPool.addRemoteTransactions(List.of(txs)); + + verify(transactionBroadcaster) + .onTransactionsAdded( + argThat(btxs -> btxs.size() == txs.length && btxs.containsAll(List.of(txs)))); + Arrays.stream(txs).forEach(this::assertTransactionPending); + assertThat(transactions.getLocalTransactions()).doesNotContain(txs); + if (hasPriority) { + assertThat(transactions.getPriorityTransactions()).contains(txs); + } + } + + protected void addAndAssertTransactionViaApiValid( + final Transaction tx, final boolean disableLocalPriority) { + final ValidationResult result = + transactionPool.addTransactionViaApi(tx); + + assertThat(result.isValid()).isTrue(); + assertTransactionPending(tx); + verify(transactionBroadcaster).onTransactionsAdded(singletonList(tx)); + assertThat(transactions.getLocalTransactions()).contains(tx); + if (disableLocalPriority) { + assertThat(transactions.getPriorityTransactions()).doesNotContain(tx); + } else { + assertThat(transactions.getPriorityTransactions()).contains(tx); + } + } + + protected void addAndAssertTransactionViaApiInvalid( + final Transaction tx, final TransactionInvalidReason invalidReason) { + final ValidationResult result = + transactionPool.addTransactionViaApi(tx); + + assertThat(result.isValid()).isFalse(); + assertThat(result.getInvalidReason()).isEqualTo(invalidReason); + assertTransactionNotPending(tx); + verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx)); + } + + @SuppressWarnings("unchecked") + protected void givenTransactionIsValid(final Transaction transaction) { + when(transactionValidatorFactory + .get() + .validate(eq(transaction), any(Optional.class), any(Optional.class), any())) + .thenReturn(valid()); + when(transactionValidatorFactory + .get() + .validateForSender( + eq(transaction), nullable(Account.class), any(TransactionValidationParams.class))) + .thenReturn(valid()); + } + + protected abstract Block appendBlock( + final Difficulty difficulty, + final BlockHeader parentBlock, + final Transaction... transactionsToAdd); + + protected Transaction createTransactionGasPriceMarket( + final int transactionNumber, final Wei maxPrice) { + return createBaseTransaction(transactionNumber).gasPrice(maxPrice).createTransaction(KEY_PAIR1); + } + + protected Transaction createTransactionBaseFeeMarket(final int nonce, final Wei maxPrice) { + return createBaseTransaction(nonce) + .maxFeePerGas(Optional.of(maxPrice)) + .maxPriorityFeePerGas(Optional.of(maxPrice.divide(5L))) + .createTransaction(KEY_PAIR1); + } + + protected abstract TransactionTestFixture createBaseTransaction(final int nonce); + + protected Transaction createTransaction( + final int transactionNumber, final Optional maybeChainId) { + return createBaseTransaction(transactionNumber) + .chainId(maybeChainId) + .createTransaction(KEY_PAIR1); + } + + protected abstract Transaction createTransaction(final int nonce, final Wei maxPrice); + + protected Transaction createTransaction(final int nonce) { + return createTransaction(nonce, Optional.of(BigInteger.ONE)); + } + + protected Transaction createTransaction(final int nonce, final KeyPair keyPair) { + return createBaseTransaction(nonce).createTransaction(keyPair); + } + + protected void verifyChainHeadIs(final Block forkBlock2) { + assertThat(blockchain.getChainHeadHash()).isEqualTo(forkBlock2.getHash()); + } + + protected BlockHeader getHeaderForCurrentChainHead() { + return blockchain.getBlockHeader(blockchain.getChainHeadHash()).get(); + } + + protected void appendBlock(final Transaction... transactionsToAdd) { + appendBlock(Difficulty.ONE, getHeaderForCurrentChainHead(), transactionsToAdd); + } + + protected void protocolSupportsTxReplayProtection( + final long chainId, final boolean isSupportedAtCurrentBlock) { + when(protocolSpec.isReplayProtectionSupported()).thenReturn(isSupportedAtCurrentBlock); + when(protocolSchedule.getChainId()).thenReturn(Optional.of(BigInteger.valueOf(chainId))); + } + + protected void protocolDoesNotSupportTxReplayProtection() { + when(protocolSchedule.getChainId()).thenReturn(Optional.empty()); + } + + protected Transaction createTransactionWithoutChainId(final int transactionNumber) { + return createTransaction(transactionNumber, Optional.empty()); + } + + protected void whenBlockBaseFeeIs(final Wei baseFee) { + final BlockHeader header = + BlockHeaderBuilder.fromHeader(blockchain.getChainHeadHeader()) + .baseFee(baseFee) + .blockHeaderFunctions(new MainnetBlockHeaderFunctions()) + .parentHash(blockchain.getChainHeadHash()) + .buildBlockHeader(); + blockchain.appendBlock(new Block(header, BlockBody.empty()), emptyList()); + } + + protected Transaction createFrontierTransaction(final int transactionNumber, final Wei gasPrice) { + return new TransactionTestFixture() + .nonce(transactionNumber) + .gasPrice(gasPrice) + .gasLimit(blockGasLimit) + .type(TransactionType.FRONTIER) + .createTransaction(KEY_PAIR1); + } + + protected Transaction createBlobTransaction(final int nonce) { + if (blobTestFixture == null) { + blobTestFixture = new BlobTestFixture(); + } + return new TransactionTestFixture() + .nonce(nonce) + .gasLimit(blockGasLimit) + .gasPrice(null) + .maxFeePerGas(Optional.of(Wei.of(5000L))) + .maxPriorityFeePerGas(Optional.of(Wei.of(1000L))) + .type(TransactionType.BLOB) + .blobsWithCommitments(Optional.of(blobTestFixture.createBlobsWithCommitments(6))) + .createTransaction(KEY_PAIR1); + } + + protected Transaction createBlobTransactionWithSameBlobs( + final int nonce, final BlobsWithCommitments blobsWithCommitments) { + return new TransactionTestFixture() + .nonce(nonce) + .gasLimit(blockGasLimit) + .gasPrice(null) + .maxFeePerGas(Optional.of(Wei.of(5000L))) + .maxPriorityFeePerGas(Optional.of(Wei.of(1000L))) + .type(TransactionType.BLOB) + .blobsWithCommitments(Optional.of(blobsWithCommitments)) + .createTransaction(KEY_PAIR1); + } + + protected Transaction createReplacementTransactionWithBlobs(final int nonce) { + if (blobTestFixture == null) { + blobTestFixture = new BlobTestFixture(); + } + return new TransactionTestFixture() + .nonce(nonce) + .gasLimit(blockGasLimit) + .gasPrice(null) + .maxFeePerGas(Optional.of(Wei.of(5000L * 10))) + .maxPriorityFeePerGas(Optional.of(Wei.of(1000L * 10))) + .maxFeePerBlobGas(Optional.of(Wei.of(5000L))) + .type(TransactionType.BLOB) + .blobsWithCommitments(Optional.of(blobTestFixture.createBlobsWithCommitments(6))) + .createTransaction(KEY_PAIR1); + } + + protected int addTxAndGetPendingTxsCount( + final Wei genesisBaseFee, + final Wei minGasPrice, + final Wei lastBlockBaseFee, + final Wei txMaxFeePerGas, + final boolean isLocal, + final boolean hasPriority) { + when(protocolSpec.getFeeMarket()).thenReturn(FeeMarket.london(0, Optional.of(genesisBaseFee))); + whenBlockBaseFeeIs(lastBlockBaseFee); + + final Transaction transaction = createTransaction(0, txMaxFeePerGas); + if (hasPriority) { + transactionPool = + createTransactionPool( + b -> b.minGasPrice(minGasPrice).prioritySenders(Set.of(transaction.getSender()))); + } else { + transactionPool = + createTransactionPool(b -> b.minGasPrice(minGasPrice).noLocalPriority(true)); + } + + givenTransactionIsValid(transaction); + + if (isLocal) { + transactionPool.addTransactionViaApi(transaction); + } else { + transactionPool.addRemoteTransactions(List.of(transaction)); + } + + return transactions.size(); + } + + protected Block appendBlockGasPriceMarket( + final Difficulty difficulty, + final BlockHeader parentBlock, + final Transaction[] transactionsToAdd) { + final List transactionList = asList(transactionsToAdd); + final Block block = + new Block( + new BlockHeaderTestFixture() + .difficulty(difficulty) + .gasLimit(parentBlock.getGasLimit()) + .parentHash(parentBlock.getHash()) + .number(parentBlock.getNumber() + 1) + .buildHeader(), + new BlockBody(transactionList, emptyList())); + final List transactionReceipts = + transactionList.stream() + .map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty())) + .collect(toList()); + blockchain.appendBlock(block, transactionReceipts); + return block; + } + + protected Block appendBlockBaseFeeMarket( + final Difficulty difficulty, + final BlockHeader parentBlock, + final Transaction[] transactionsToAdd) { + final List transactionList = asList(transactionsToAdd); + final Block block = + new Block( + new BlockHeaderTestFixture() + .baseFeePerGas(Wei.of(10L)) + .gasLimit(parentBlock.getGasLimit()) + .difficulty(difficulty) + .parentHash(parentBlock.getHash()) + .number(parentBlock.getNumber() + 1) + .buildHeader(), + new BlockBody(transactionList, emptyList())); + final List transactionReceipts = + transactionList.stream() + .map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty())) + .collect(toList()); + blockchain.appendBlock(block, transactionReceipts); + return block; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BlobV1TransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BlobV1TransactionPoolTest.java new file mode 100644 index 0000000000..aa81215a66 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BlobV1TransactionPoolTest.java @@ -0,0 +1,141 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.transactions; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.hyperledger.besu.datatypes.BlobsWithCommitments; +import org.hyperledger.besu.datatypes.Wei; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.core.ExecutionContextTestFixture; +import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.core.TransactionTestFixture; +import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter; +import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; +import org.hyperledger.besu.testutil.TestClock; + +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.function.BiFunction; + +import org.junit.jupiter.api.Test; + +public class BlobV1TransactionPoolTest extends AbstractTransactionPoolTestBase { + + @Override + protected PendingTransactions createPendingTransactions( + final TransactionPoolConfiguration poolConfig, + final BiFunction + transactionReplacementTester) { + + return new BaseFeePendingTransactionsSorter( + poolConfig, + TestClock.system(ZoneId.systemDefault()), + metricsSystem, + protocolContext.getBlockchain()::getChainHeadHeader); + } + + @Override + protected Transaction createTransaction(final int transactionNumber, final Wei maxPrice) { + return createTransactionBaseFeeMarket(transactionNumber, maxPrice); + } + + @Override + protected TransactionTestFixture createBaseTransaction(final int transactionNumber) { + return createBaseTransactionBaseFeeMarket(transactionNumber); + } + + @Override + protected ExecutionContextTestFixture createExecutionContextTestFixture() { + return createExecutionContextTestFixtureBaseFeeMarket(); + } + + @Override + protected FeeMarket getFeeMarket() { + return FeeMarket.london(0L, Optional.of(BASE_FEE_FLOOR)); + } + + @Override + protected Block appendBlock( + final Difficulty difficulty, + final BlockHeader parentBlock, + final Transaction... transactionsToAdd) { + return appendBlockBaseFeeMarket(difficulty, parentBlock, transactionsToAdd); + } + + @Test + public void shouldReturnBlobWhenTransactionAddedToPool() { + givenTransactionIsValid(transactionWithBlobs); + + addAndAssertRemoteTransactionsValid(transactionWithBlobs); + + assertTransactionPending(transactionWithBlobs); + // assert that the blobs are returned from the tx pool + final List expectedBlobQuads = + transactionWithBlobs.getBlobsWithCommitments().get().getBlobQuads(); + + expectedBlobQuads.forEach( + bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq)); + } + + @Test + public void shouldNotReturnBlobsWhenAllTxsContainingBlobsHaveBeenReplaced() { + givenTransactionIsValid(transactionWithBlobs); + givenTransactionIsValid(transactionWithBlobsReplacement); + givenTransactionIsValid(transactionWithSameBlobs); // contains same blobs as transactionBlob + givenTransactionIsValid(transactionWithSameBlobsReplacement); + + addAndAssertRemoteTransactionsValid(transactionWithBlobs); + assertTransactionPending(transactionWithBlobs); + + final List expectedBlobQuads = + transactionWithBlobs.getBlobsWithCommitments().get().getBlobQuads(); + + // assert that the blobs are returned from the tx pool + expectedBlobQuads.forEach( + bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq)); + + // add different transaction that contains the same blobs + addAndAssertRemoteTransactionsValid(transactionWithSameBlobs); + + assertTransactionPending(transactionWithBlobs); + assertTransactionPending(transactionWithSameBlobs); + // assert that the blobs are still returned from the tx pool + expectedBlobQuads.forEach( + bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq)); + + // replace the second blob transaction with tx with different blobs + addAndAssertRemoteTransactionsValid(transactionWithSameBlobsReplacement); + assertTransactionPending(transactionWithSameBlobsReplacement); + assertTransactionNotPending(transactionWithSameBlobs); + + // assert that the blob is still returned from the tx pool + expectedBlobQuads.forEach( + bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq)); + + // replace the first blob transaction with tx with different blobs + addAndAssertRemoteTransactionsValid(transactionWithBlobsReplacement); + assertTransactionPending(transactionWithBlobsReplacement); + assertTransactionNotPending(transactionWithBlobs); + + // All txs containing the expected blobs have been replaced, + // so the blobs should no longer be returned from the tx pool + expectedBlobQuads.forEach( + bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isNull()); + } +} From e4c1b5991c2c060080a1c9e349170e889070b792 Mon Sep 17 00:00:00 2001 From: Simon Dudley Date: Wed, 9 Oct 2024 20:01:54 +1000 Subject: [PATCH 3/4] Fix RocksDBException: Busy during snapsync (#7746) Signed-off-by: Karim Taam Signed-off-by: Simon Dudley Co-authored-by: Karim Taam --- CHANGELOG.md | 3 +- .../request/heal/TrieNodeHealingRequest.java | 9 ++-- .../sync/snapsync/PersistDataStepTest.java | 48 ++++++++++++++++++- 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 592b445bc8..a9e5beb331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,8 @@ - Corrects a regression where custom plugin services are not initialized correctly. [#7625](https://github.com/hyperledger/besu/pull/7625) - Fix for IBFT2 chains using the BONSAI DB format [#7631](https://github.com/hyperledger/besu/pull/7631) - Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623) -- Fix an undhandled exception. [#7733](https://github.com/hyperledger/besu/issues/7733) +- Fix an unhandled PeerTable exception [#7733](https://github.com/hyperledger/besu/issues/7733) +- Fix RocksDBException: Busy leading to MerkleTrieException: Unable to load trie node value [#7745](https://github.com/hyperledger/besu/pull/7745) ## 24.9.1 diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java index 7e206232c5..e622108725 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; @@ -44,7 +45,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest private final Bytes location; protected Bytes data; - protected boolean requiresPersisting = true; + protected AtomicBoolean requiresPersisting = new AtomicBoolean(true); protected TrieNodeHealingRequest(final Hash nodeHash, final Hash rootHash, final Bytes location) { super(TRIE_NODE, rootHash); @@ -65,7 +66,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest return 0; } int saved = 0; - if (requiresPersisting) { + if (requiresPersisting.getAndSet(false)) { checkNotNull(data, "Must set data before node can be persisted."); saved = doPersist( @@ -143,7 +144,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest } public boolean isRequiresPersisting() { - return requiresPersisting; + return requiresPersisting.get(); } public Bytes32 getNodeHash() { @@ -173,7 +174,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest } public void setRequiresPersisting(final boolean requiresPersisting) { - this.requiresPersisting = requiresPersisting; + this.requiresPersisting.set(requiresPersisting); } private boolean nodeIsHashReferencedDescendant(final Node node) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java index 0364dc7581..766995b094 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java @@ -17,6 +17,9 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.hyperledger.besu.datatypes.Hash; @@ -25,12 +28,15 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataR import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountTrieNodeHealingRequest; import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie; import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; +import org.hyperledger.besu.ethereum.worldstate.WorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import org.hyperledger.besu.services.tasks.Task; +import java.util.Collections; import java.util.List; import org.apache.tuweni.bytes.Bytes; @@ -39,9 +45,12 @@ import org.junit.jupiter.api.Test; public class PersistDataStepTest { + private final WorldStateKeyValueStorage worldStateKeyValueStorage = + spy( + new InMemoryKeyValueStorageProvider() + .createWorldStateStorage(DataStorageConfiguration.DEFAULT_CONFIG)); private final WorldStateStorageCoordinator worldStateStorageCoordinator = - new InMemoryKeyValueStorageProvider() - .createWorldStateStorageCoordinator(DataStorageConfiguration.DEFAULT_CONFIG); + new WorldStateStorageCoordinator(worldStateKeyValueStorage); private final SnapSyncProcessState snapSyncState = mock(SnapSyncProcessState.class); private final SnapWorldDownloadState downloadState = mock(SnapWorldDownloadState.class); @@ -67,6 +76,33 @@ public class PersistDataStepTest { assertDataPersisted(tasks); } + @Test + public void shouldPersistTrieNodeHealDataOnlyOnce() { + + final Bytes stateTrieNode = + Bytes.fromHexString( + "0xe2a0310e2d527612073b26eecdfd717e6a320cf44b4afac2b0732d9fcbe2b7fa0cf602"); + final Hash hash = Hash.hash(stateTrieNode); + final Bytes location = Bytes.of(0x02); + final AccountTrieNodeHealingRequest accountTrieNodeDataRequest = + SnapDataRequest.createAccountTrieNodeDataRequest(hash, location, Collections.emptySet()); + accountTrieNodeDataRequest.setData(stateTrieNode); + + final BonsaiWorldStateKeyValueStorage.Updater updater = + (BonsaiWorldStateKeyValueStorage.Updater) spy(worldStateKeyValueStorage.updater()); + when(worldStateKeyValueStorage.updater()) + .thenReturn(updater) + .thenReturn(mock(BonsaiWorldStateKeyValueStorage.Updater.class)); + + List> result = + persistDataStep.persist(List.of(new StubTask(accountTrieNodeDataRequest))); + + persistDataStep.persist(List.of(new StubTask(accountTrieNodeDataRequest))); + + verify(updater, times(1)).putAccountStateTrieNode(location, hash, stateTrieNode); + assertDataPersisted(result); + } + @Test public void shouldSkipPersistDataWhenNoData() { final List> tasks = TaskGenerator.createAccountRequest(false); @@ -110,6 +146,14 @@ public class PersistDataStepTest { .getStrategy(BonsaiWorldStateKeyValueStorage.class) .getCode(Hash.wrap(data.getCodeHash()), Hash.wrap(data.getAccountHash()))) .isPresent(); + } else if (task.getData() instanceof AccountTrieNodeHealingRequest) { + final AccountTrieNodeHealingRequest data = + (AccountTrieNodeHealingRequest) task.getData(); + assertThat( + worldStateStorageCoordinator + .getStrategy(BonsaiWorldStateKeyValueStorage.class) + .getTrieNodeUnsafe(data.getLocation())) + .isPresent(); } else { fail("not expected message"); } From 03a0cfad4b0b931ce85177517f92a278f5a10b46 Mon Sep 17 00:00:00 2001 From: Matt Whitehead Date: Wed, 9 Oct 2024 12:07:20 +0100 Subject: [PATCH 4/4] Support BFT mining coordinator being temporarily stopped while syncing (#7657) * Support BFT mining coordinator being temporarily stopped while syncing happens Signed-off-by: Matthew Whitehead * Apply same change to IbftBesuControllerBuilder Signed-off-by: Matthew Whitehead * Add changelog entry Signed-off-by: Matthew Whitehead * Add event queue start/stop test Signed-off-by: Matthew Whitehead * Add BFT mining coordinator tests Signed-off-by: Matthew Whitehead * Typo Signed-off-by: Matthew Whitehead * Update consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java Co-authored-by: Sally MacFarlane Signed-off-by: Matt Whitehead * Update consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java Co-authored-by: Sally MacFarlane Signed-off-by: Matt Whitehead --------- Signed-off-by: Matthew Whitehead Signed-off-by: Matt Whitehead Signed-off-by: Matt Whitehead Co-authored-by: Sally MacFarlane --- CHANGELOG.md | 1 + .../controller/IbftBesuControllerBuilder.java | 15 ++++++-- .../controller/QbftBesuControllerBuilder.java | 15 ++++++-- .../consensus/common/bft/BftEventQueue.java | 5 +++ .../consensus/common/bft/BftExecutors.java | 2 +- .../consensus/common/bft/BftProcessor.java | 9 ++++- .../blockcreation/BftMiningCoordinator.java | 13 +++++-- .../common/bft/BftEventQueueTest.java | 34 +++++++++++++++++++ .../BftMiningCoordinatorTest.java | 23 +++++++++++++ 9 files changed, 106 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9e5beb331..762ae66b16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623) - Fix an unhandled PeerTable exception [#7733](https://github.com/hyperledger/besu/issues/7733) - Fix RocksDBException: Busy leading to MerkleTrieException: Unable to load trie node value [#7745](https://github.com/hyperledger/besu/pull/7745) +- If a BFT validator node is syncing, pause block production until sync has completed [#7657](https://github.com/hyperledger/besu/pull/7657) ## 24.9.1 diff --git a/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java index 58412029fc..738dcfc596 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java @@ -253,9 +253,18 @@ public class IbftBesuControllerBuilder extends BftBesuControllerBuilder { .getValue() .getBlockPeriodSeconds())); - if (syncState.isInitialSyncPhaseDone()) { - ibftMiningCoordinator.enable(); - } + syncState.subscribeSyncStatus( + syncStatus -> { + if (syncState.syncTarget().isPresent()) { + // We're syncing so stop doing other stuff + LOG.info("Stopping IBFT mining coordinator while we are syncing"); + ibftMiningCoordinator.stop(); + } else { + LOG.info("Starting IBFT mining coordinator following sync"); + ibftMiningCoordinator.enable(); + ibftMiningCoordinator.start(); + } + }); syncState.subscribeCompletionReached( new BesuEvents.InitialSyncCompletionListener() { diff --git a/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java index 60eac17996..498435e4af 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java @@ -301,9 +301,18 @@ public class QbftBesuControllerBuilder extends BftBesuControllerBuilder { .getEmptyBlockPeriodSeconds()); }); - if (syncState.isInitialSyncPhaseDone()) { - miningCoordinator.enable(); - } + syncState.subscribeSyncStatus( + syncStatus -> { + if (syncState.syncTarget().isPresent()) { + // We're syncing so stop doing other stuff + LOG.info("Stopping QBFT mining coordinator while we are syncing"); + miningCoordinator.stop(); + } else { + LOG.info("Starting QBFT mining coordinator following sync"); + miningCoordinator.enable(); + miningCoordinator.start(); + } + }); syncState.subscribeCompletionReached( new BesuEvents.InitialSyncCompletionListener() { diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java index 4221fd1fac..8f6190c4cf 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java @@ -48,6 +48,11 @@ public class BftEventQueue { started.set(true); } + /** Stop the event queue. No events will be queued for processing until it is started. */ + public void stop() { + started.set(false); + } + private boolean isStarted() { return started.get(); } diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java index 30038add3d..709d65faaa 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java @@ -78,7 +78,7 @@ public class BftExecutors { /** Start. */ public synchronized void start() { - if (state != State.IDLE) { + if (state != State.IDLE && state != State.STOPPED) { // Nothing to do return; } diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java index 93df77ea77..81be83b469 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java @@ -44,8 +44,13 @@ public class BftProcessor implements Runnable { this.eventMultiplexer = eventMultiplexer; } + /** Indicate to the processor that it can be started */ + public synchronized void start() { + shutdown = false; + } + /** Indicate to the processor that it should gracefully stop at its next opportunity */ - public void stop() { + public synchronized void stop() { shutdown = true; } @@ -67,6 +72,8 @@ public class BftProcessor implements Runnable { while (!shutdown) { nextEvent().ifPresent(eventMultiplexer::handleBftEvent); } + + incomingQueue.stop(); } catch (final Throwable t) { LOG.error("BFT Mining thread has suffered a fatal error, mining has been halted", t); } diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java index 83cd61beb3..795a064b6b 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java @@ -93,7 +93,9 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv @Override public void start() { - if (state.compareAndSet(State.IDLE, State.RUNNING)) { + if (state.compareAndSet(State.IDLE, State.RUNNING) + || state.compareAndSet(State.STOPPED, State.RUNNING)) { + bftProcessor.start(); bftExecutors.start(); blockAddedObserverId = blockchain.observeBlockAdded(this); eventHandler.start(); @@ -110,7 +112,7 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv try { bftProcessor.awaitStop(); } catch (final InterruptedException e) { - LOG.debug("Interrupted while waiting for IbftProcessor to stop.", e); + LOG.debug("Interrupted while waiting for BftProcessor to stop.", e); Thread.currentThread().interrupt(); } bftExecutors.stop(); @@ -135,12 +137,17 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv @Override public boolean disable() { + if (state.get() == State.PAUSED + || state.compareAndSet(State.IDLE, State.PAUSED) + || state.compareAndSet(State.RUNNING, State.PAUSED)) { + return true; + } return false; } @Override public boolean isMining() { - return true; + return state.get() == State.RUNNING; } @Override diff --git a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java index b39bef3a43..6fbf701bc1 100644 --- a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java +++ b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java @@ -134,6 +134,40 @@ public class BftEventQueueTest { assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); } + @Test + public void supportsStopAndRestart() throws InterruptedException { + final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE); + queue.start(); + + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + final DummyBftEvent dummyMessageEvent = new DummyBftEvent(); + final DummyRoundExpiryBftEvent dummyRoundTimerEvent = new DummyRoundExpiryBftEvent(); + final DummyNewChainHeadBftEvent dummyNewChainHeadEvent = new DummyNewChainHeadBftEvent(); + + queue.add(dummyMessageEvent); + queue.add(dummyRoundTimerEvent); + queue.add(dummyNewChainHeadEvent); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + + queue.stop(); + queue.add(dummyMessageEvent); + queue.add(dummyRoundTimerEvent); + queue.add(dummyNewChainHeadEvent); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + + queue.start(); + queue.add(dummyMessageEvent); + queue.add(dummyRoundTimerEvent); + queue.add(dummyNewChainHeadEvent); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull(); + assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull(); + } + @Test public void alwaysAddBlockTimerExpiryEvents() throws InterruptedException { final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE); diff --git a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java index cb0dffba9b..328f9fd7b7 100644 --- a/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java +++ b/consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; import org.hyperledger.besu.consensus.common.bft.BftEventQueue; import org.hyperledger.besu.consensus.common.bft.BftExecutors; @@ -82,6 +83,28 @@ public class BftMiningCoordinatorTest { verify(bftProcessor).stop(); } + @Test + public void restartsMiningAfterStop() { + assertThat(bftMiningCoordinator.isMining()).isFalse(); + bftMiningCoordinator.stop(); + verify(bftProcessor, never()).stop(); + + bftMiningCoordinator.enable(); + bftMiningCoordinator.start(); + assertThat(bftMiningCoordinator.isMining()).isTrue(); + + bftMiningCoordinator.stop(); + assertThat(bftMiningCoordinator.isMining()).isFalse(); + verify(bftProcessor).stop(); + + bftMiningCoordinator.start(); + assertThat(bftMiningCoordinator.isMining()).isTrue(); + + // BFT processor should be started once for every time the mining + // coordinator is restarted + verify(bftProcessor, times(2)).start(); + } + @Test public void getsMinTransactionGasPrice() { final Wei minGasPrice = Wei.of(10);