Merge branch 'main' into 7311-add-peertask-foundation-code

pull/7628/head
Matilda-Clerke 2 months ago committed by GitHub
commit 4f544f4533
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      CHANGELOG.md
  2. 15
      besu/src/main/java/org/hyperledger/besu/controller/IbftBesuControllerBuilder.java
  3. 15
      besu/src/main/java/org/hyperledger/besu/controller/QbftBesuControllerBuilder.java
  4. 5
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftEventQueue.java
  5. 2
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftExecutors.java
  6. 9
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftProcessor.java
  7. 13
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinator.java
  8. 34
      consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/BftEventQueueTest.java
  9. 23
      consensus/common/src/test/java/org/hyperledger/besu/consensus/common/bft/blockcreation/BftMiningCoordinatorTest.java
  10. 14
      ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlobTestFixture.java
  11. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java
  12. 23
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java
  13. 48
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStepTest.java
  14. 508
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java
  15. 585
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTestBase.java
  16. 141
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/BlobV1TransactionPoolTest.java
  17. 48
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java
  18. 15
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java

@ -28,6 +28,9 @@
- Corrects a regression where custom plugin services are not initialized correctly. [#7625](https://github.com/hyperledger/besu/pull/7625)
- Fix for IBFT2 chains using the BONSAI DB format [#7631](https://github.com/hyperledger/besu/pull/7631)
- Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623)
- Fix an unhandled PeerTable exception [#7733](https://github.com/hyperledger/besu/issues/7733)
- Fix RocksDBException: Busy leading to MerkleTrieException: Unable to load trie node value [#7745](https://github.com/hyperledger/besu/pull/7745)
- If a BFT validator node is syncing, pause block production until sync has completed [#7657](https://github.com/hyperledger/besu/pull/7657)
## 24.9.1

@ -253,9 +253,18 @@ public class IbftBesuControllerBuilder extends BftBesuControllerBuilder {
.getValue()
.getBlockPeriodSeconds()));
if (syncState.isInitialSyncPhaseDone()) {
ibftMiningCoordinator.enable();
}
syncState.subscribeSyncStatus(
syncStatus -> {
if (syncState.syncTarget().isPresent()) {
// We're syncing so stop doing other stuff
LOG.info("Stopping IBFT mining coordinator while we are syncing");
ibftMiningCoordinator.stop();
} else {
LOG.info("Starting IBFT mining coordinator following sync");
ibftMiningCoordinator.enable();
ibftMiningCoordinator.start();
}
});
syncState.subscribeCompletionReached(
new BesuEvents.InitialSyncCompletionListener() {

@ -301,9 +301,18 @@ public class QbftBesuControllerBuilder extends BftBesuControllerBuilder {
.getEmptyBlockPeriodSeconds());
});
if (syncState.isInitialSyncPhaseDone()) {
miningCoordinator.enable();
}
syncState.subscribeSyncStatus(
syncStatus -> {
if (syncState.syncTarget().isPresent()) {
// We're syncing so stop doing other stuff
LOG.info("Stopping QBFT mining coordinator while we are syncing");
miningCoordinator.stop();
} else {
LOG.info("Starting QBFT mining coordinator following sync");
miningCoordinator.enable();
miningCoordinator.start();
}
});
syncState.subscribeCompletionReached(
new BesuEvents.InitialSyncCompletionListener() {

@ -48,6 +48,11 @@ public class BftEventQueue {
started.set(true);
}
/** Stop the event queue. No events will be queued for processing until it is started. */
public void stop() {
started.set(false);
}
private boolean isStarted() {
return started.get();
}

@ -78,7 +78,7 @@ public class BftExecutors {
/** Start. */
public synchronized void start() {
if (state != State.IDLE) {
if (state != State.IDLE && state != State.STOPPED) {
// Nothing to do
return;
}

@ -44,8 +44,13 @@ public class BftProcessor implements Runnable {
this.eventMultiplexer = eventMultiplexer;
}
/** Indicate to the processor that it can be started */
public synchronized void start() {
shutdown = false;
}
/** Indicate to the processor that it should gracefully stop at its next opportunity */
public void stop() {
public synchronized void stop() {
shutdown = true;
}
@ -67,6 +72,8 @@ public class BftProcessor implements Runnable {
while (!shutdown) {
nextEvent().ifPresent(eventMultiplexer::handleBftEvent);
}
incomingQueue.stop();
} catch (final Throwable t) {
LOG.error("BFT Mining thread has suffered a fatal error, mining has been halted", t);
}

@ -93,7 +93,9 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv
@Override
public void start() {
if (state.compareAndSet(State.IDLE, State.RUNNING)) {
if (state.compareAndSet(State.IDLE, State.RUNNING)
|| state.compareAndSet(State.STOPPED, State.RUNNING)) {
bftProcessor.start();
bftExecutors.start();
blockAddedObserverId = blockchain.observeBlockAdded(this);
eventHandler.start();
@ -110,7 +112,7 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv
try {
bftProcessor.awaitStop();
} catch (final InterruptedException e) {
LOG.debug("Interrupted while waiting for IbftProcessor to stop.", e);
LOG.debug("Interrupted while waiting for BftProcessor to stop.", e);
Thread.currentThread().interrupt();
}
bftExecutors.stop();
@ -135,12 +137,17 @@ public class BftMiningCoordinator implements MiningCoordinator, BlockAddedObserv
@Override
public boolean disable() {
if (state.get() == State.PAUSED
|| state.compareAndSet(State.IDLE, State.PAUSED)
|| state.compareAndSet(State.RUNNING, State.PAUSED)) {
return true;
}
return false;
}
@Override
public boolean isMining() {
return true;
return state.get() == State.RUNNING;
}
@Override

@ -134,6 +134,40 @@ public class BftEventQueueTest {
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
}
@Test
public void supportsStopAndRestart() throws InterruptedException {
final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE);
queue.start();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
final DummyBftEvent dummyMessageEvent = new DummyBftEvent();
final DummyRoundExpiryBftEvent dummyRoundTimerEvent = new DummyRoundExpiryBftEvent();
final DummyNewChainHeadBftEvent dummyNewChainHeadEvent = new DummyNewChainHeadBftEvent();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
queue.stop();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
queue.start();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
}
@Test
public void alwaysAddBlockTimerExpiryEvents() throws InterruptedException {
final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE);

@ -19,6 +19,7 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import org.hyperledger.besu.consensus.common.bft.BftEventQueue;
import org.hyperledger.besu.consensus.common.bft.BftExecutors;
@ -82,6 +83,28 @@ public class BftMiningCoordinatorTest {
verify(bftProcessor).stop();
}
@Test
public void restartsMiningAfterStop() {
assertThat(bftMiningCoordinator.isMining()).isFalse();
bftMiningCoordinator.stop();
verify(bftProcessor, never()).stop();
bftMiningCoordinator.enable();
bftMiningCoordinator.start();
assertThat(bftMiningCoordinator.isMining()).isTrue();
bftMiningCoordinator.stop();
assertThat(bftMiningCoordinator.isMining()).isFalse();
verify(bftProcessor).stop();
bftMiningCoordinator.start();
assertThat(bftMiningCoordinator.isMining()).isTrue();
// BFT processor should be started once for every time the mining
// coordinator is restarted
verify(bftProcessor, times(2)).start();
}
@Test
public void getsMinTransactionGasPrice() {
final Wei minGasPrice = Wei.of(10);

@ -23,8 +23,6 @@ import org.hyperledger.besu.datatypes.KZGProof;
import org.hyperledger.besu.datatypes.VersionedHash;
import org.hyperledger.besu.evm.precompile.KZGPointEvalPrecompiledContract;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@ -36,6 +34,8 @@ import org.bouncycastle.crypto.digests.SHA256Digest;
public class BlobTestFixture {
private byte byteValue = 0x00;
public BlobTestFixture() {
try {
// optimistically tear down a potential previous loaded trusted setup
@ -58,14 +58,8 @@ public class BlobTestFixture {
;
public BlobTriplet createBlobTriplet() {
byte[] rawMaterial = {};
try (InputStream readme =
BlobTestFixture.class.getResourceAsStream(
"/org/hyperledger/besu/ethereum/core/encoding/BlobDataFixture.bin")) {
rawMaterial = readme.readAllBytes();
} catch (IOException e) {
fail("Failed to read blob file", e);
}
byte[] rawMaterial = new byte[131072];
rawMaterial[0] = byteValue++;
Bytes48 commitment = Bytes48.wrap(CKZG4844JNI.blobToKzgCommitment(rawMaterial));

@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
@ -44,7 +45,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest
private final Bytes location;
protected Bytes data;
protected boolean requiresPersisting = true;
protected AtomicBoolean requiresPersisting = new AtomicBoolean(true);
protected TrieNodeHealingRequest(final Hash nodeHash, final Hash rootHash, final Bytes location) {
super(TRIE_NODE, rootHash);
@ -65,7 +66,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest
return 0;
}
int saved = 0;
if (requiresPersisting) {
if (requiresPersisting.getAndSet(false)) {
checkNotNull(data, "Must set data before node can be persisted.");
saved =
doPersist(
@ -143,7 +144,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest
}
public boolean isRequiresPersisting() {
return requiresPersisting;
return requiresPersisting.get();
}
public Bytes32 getNodeHash() {
@ -173,7 +174,7 @@ public abstract class TrieNodeHealingRequest extends SnapDataRequest
}
public void setRequiresPersisting(final boolean requiresPersisting) {
this.requiresPersisting = requiresPersisting;
this.requiresPersisting.set(requiresPersisting);
}
private boolean nodeIsHashReferencedDescendant(final Node<Bytes> node) {

@ -61,6 +61,7 @@ import java.util.HashMap;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
@ -77,6 +78,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -107,8 +110,10 @@ public class TransactionPool implements BlockAddedObserver {
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
private final Set<Address> localSenders = ConcurrentHashMap.newKeySet();
private final EthScheduler.OrderedProcessor<BlockAddedEvent> blockAddedEventOrderedProcessor;
private final Map<VersionedHash, BlobsWithCommitments.BlobQuad> mapOfBlobsInTransactionPool =
new HashMap<>();
private final ListMultimap<VersionedHash, BlobsWithCommitments.BlobQuad>
mapOfBlobsInTransactionPool =
Multimaps.synchronizedListMultimap(
Multimaps.newListMultimap(new HashMap<>(), () -> new ArrayList<>(1)));
public TransactionPool(
final Supplier<PendingTransactions> pendingTransactionsSupplier,
@ -660,6 +665,7 @@ public class TransactionPool implements BlockAddedObserver {
}
final List<BlobsWithCommitments.BlobQuad> blobQuads =
maybeBlobsWithCommitments.get().getBlobQuads();
blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.put(bq.versionedHash(), bq));
}
@ -672,15 +678,18 @@ public class TransactionPool implements BlockAddedObserver {
}
final List<BlobsWithCommitments.BlobQuad> blobQuads =
maybeBlobsWithCommitments.get().getBlobQuads();
blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.remove(bq.versionedHash()));
blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.remove(bq.versionedHash(), bq));
}
public BlobsWithCommitments.BlobQuad getBlobQuad(final VersionedHash vh) {
BlobsWithCommitments.BlobQuad blobQuad = mapOfBlobsInTransactionPool.get(vh);
if (blobQuad == null) {
blobQuad = cacheForBlobsOfTransactionsAddedToABlock.get(vh);
try {
// returns an empty list if the key is not present, so getFirst() will throw
return mapOfBlobsInTransactionPool.get(vh).getFirst();
} catch (NoSuchElementException e) {
// do nothing
}
return blobQuad;
return cacheForBlobsOfTransactionsAddedToABlock.get(vh);
}
public boolean isEnabled() {

@ -17,6 +17,9 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
@ -25,12 +28,15 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataR
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountTrieNodeHealingRequest;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.services.tasks.Task;
import java.util.Collections;
import java.util.List;
import org.apache.tuweni.bytes.Bytes;
@ -39,9 +45,12 @@ import org.junit.jupiter.api.Test;
public class PersistDataStepTest {
private final WorldStateKeyValueStorage worldStateKeyValueStorage =
spy(
new InMemoryKeyValueStorageProvider()
.createWorldStateStorage(DataStorageConfiguration.DEFAULT_CONFIG));
private final WorldStateStorageCoordinator worldStateStorageCoordinator =
new InMemoryKeyValueStorageProvider()
.createWorldStateStorageCoordinator(DataStorageConfiguration.DEFAULT_CONFIG);
new WorldStateStorageCoordinator(worldStateKeyValueStorage);
private final SnapSyncProcessState snapSyncState = mock(SnapSyncProcessState.class);
private final SnapWorldDownloadState downloadState = mock(SnapWorldDownloadState.class);
@ -67,6 +76,33 @@ public class PersistDataStepTest {
assertDataPersisted(tasks);
}
@Test
public void shouldPersistTrieNodeHealDataOnlyOnce() {
final Bytes stateTrieNode =
Bytes.fromHexString(
"0xe2a0310e2d527612073b26eecdfd717e6a320cf44b4afac2b0732d9fcbe2b7fa0cf602");
final Hash hash = Hash.hash(stateTrieNode);
final Bytes location = Bytes.of(0x02);
final AccountTrieNodeHealingRequest accountTrieNodeDataRequest =
SnapDataRequest.createAccountTrieNodeDataRequest(hash, location, Collections.emptySet());
accountTrieNodeDataRequest.setData(stateTrieNode);
final BonsaiWorldStateKeyValueStorage.Updater updater =
(BonsaiWorldStateKeyValueStorage.Updater) spy(worldStateKeyValueStorage.updater());
when(worldStateKeyValueStorage.updater())
.thenReturn(updater)
.thenReturn(mock(BonsaiWorldStateKeyValueStorage.Updater.class));
List<Task<SnapDataRequest>> result =
persistDataStep.persist(List.of(new StubTask(accountTrieNodeDataRequest)));
persistDataStep.persist(List.of(new StubTask(accountTrieNodeDataRequest)));
verify(updater, times(1)).putAccountStateTrieNode(location, hash, stateTrieNode);
assertDataPersisted(result);
}
@Test
public void shouldSkipPersistDataWhenNoData() {
final List<Task<SnapDataRequest>> tasks = TaskGenerator.createAccountRequest(false);
@ -110,6 +146,14 @@ public class PersistDataStepTest {
.getStrategy(BonsaiWorldStateKeyValueStorage.class)
.getCode(Hash.wrap(data.getCodeHash()), Hash.wrap(data.getAccountHash())))
.isPresent();
} else if (task.getData() instanceof AccountTrieNodeHealingRequest) {
final AccountTrieNodeHealingRequest data =
(AccountTrieNodeHealingRequest) task.getData();
assertThat(
worldStateStorageCoordinator
.getStrategy(BonsaiWorldStateKeyValueStorage.class)
.getTrieNodeUnsafe(data.getLocation()))
.isPresent();
} else {
fail("not expected message");
}

@ -17,7 +17,6 @@ package org.hyperledger.besu.ethereum.eth.transactions;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.mainnet.ValidationResult.valid;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.EXCEEDS_BLOCK_GAS_LIMIT;
@ -29,275 +28,55 @@ import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.TRANSACTION_REPLACEMENT_UNDERPRICED;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.TX_FEECAP_EXCEEDED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.LENIENT;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.crypto.KeyPair;
import org.hyperledger.besu.crypto.SignatureAlgorithmFactory;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlobTestFixture;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderBuilder;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ExecutionContextTestFixture;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredTransactionPoolBaseFeeTest;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.LegacyTransactionPoolBaseFeeTest;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolScheduleBuilder;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpecAdapters;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidatorFactory;
import org.hyperledger.besu.ethereum.mainnet.ValidationResult;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.evm.internal.EvmConfiguration;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.TransactionPoolValidatorService;
import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidator;
import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidatorFactory;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.util.number.Percentage;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
@SuppressWarnings("unchecked")
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = LENIENT)
public abstract class AbstractTransactionPoolTest {
protected static final KeyPair KEY_PAIR1 =
SignatureAlgorithmFactory.getInstance().generateKeyPair();
private static final KeyPair KEY_PAIR2 =
SignatureAlgorithmFactory.getInstance().generateKeyPair();
protected static final Wei BASE_FEE_FLOOR = Wei.of(7L);
protected static final Wei DEFAULT_MIN_GAS_PRICE = Wei.of(50L);
protected final EthScheduler ethScheduler = new DeterministicEthScheduler();
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
protected TransactionValidatorFactory transactionValidatorFactory;
@Mock protected PendingTransactionAddedListener listener;
@Mock protected TransactionsMessageSender transactionsMessageSender;
@Mock protected NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
@Mock protected ProtocolSpec protocolSpec;
protected ProtocolSchedule protocolSchedule;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
protected MutableBlockchain blockchain;
protected TransactionBroadcaster transactionBroadcaster;
protected PendingTransactions transactions;
protected final Transaction transaction0 = createTransaction(0);
protected final Transaction transaction1 = createTransaction(1);
protected final Transaction transactionBlob = createBlobTransaction(2);
protected final Transaction transactionOtherSender = createTransaction(1, KEY_PAIR2);
private ExecutionContextTestFixture executionContext;
protected ProtocolContext protocolContext;
protected TransactionPool transactionPool;
protected long blockGasLimit;
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
private PeerTransactionTracker peerTransactionTracker;
private ArgumentCaptor<Runnable> syncTaskCapture;
protected abstract PendingTransactions createPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final BiFunction<PendingTransaction, PendingTransaction, Boolean>
transactionReplacementTester);
protected TransactionTestFixture createBaseTransactionGasPriceMarket(
final int transactionNumber) {
return new TransactionTestFixture()
.nonce(transactionNumber)
.gasLimit(blockGasLimit)
.type(TransactionType.FRONTIER);
}
protected TransactionTestFixture createBaseTransactionBaseFeeMarket(final int nonce) {
return new TransactionTestFixture()
.nonce(nonce)
.gasLimit(blockGasLimit)
.gasPrice(null)
.maxFeePerGas(Optional.of(Wei.of(5000L)))
.maxPriorityFeePerGas(Optional.of(Wei.of(1000L)))
.type(TransactionType.EIP1559);
}
protected abstract ExecutionContextTestFixture createExecutionContextTestFixture();
protected static ExecutionContextTestFixture createExecutionContextTestFixtureBaseFeeMarket() {
final var genesisConfigFile = GenesisConfigFile.fromResource("/txpool-test-genesis.json");
final ProtocolSchedule protocolSchedule =
new ProtocolScheduleBuilder(
genesisConfigFile.getConfigOptions(),
BigInteger.valueOf(1),
ProtocolSpecAdapters.create(0, Function.identity()),
new PrivacyParameters(),
false,
EvmConfiguration.DEFAULT,
MiningParameters.MINING_DISABLED,
new BadBlockManager(),
false,
new NoOpMetricsSystem())
.createProtocolSchedule();
final ExecutionContextTestFixture executionContextTestFixture =
ExecutionContextTestFixture.builder(genesisConfigFile)
.protocolSchedule(protocolSchedule)
.build();
final Block block =
new Block(
new BlockHeaderTestFixture()
.gasLimit(
executionContextTestFixture
.getBlockchain()
.getChainHeadBlock()
.getHeader()
.getGasLimit())
.difficulty(Difficulty.ONE)
.baseFeePerGas(Wei.of(10L))
.parentHash(executionContextTestFixture.getBlockchain().getChainHeadHash())
.number(executionContextTestFixture.getBlockchain().getChainHeadBlockNumber() + 1)
.buildHeader(),
new BlockBody(List.of(), List.of()));
executionContextTestFixture.getBlockchain().appendBlock(block, List.of());
return executionContextTestFixture;
}
protected abstract FeeMarket getFeeMarket();
@BeforeEach
public void setUp() {
executionContext = createExecutionContextTestFixture();
protocolContext = executionContext.getProtocolContext();
blockchain = executionContext.getBlockchain();
when(protocolSpec.getTransactionValidatorFactory()).thenReturn(transactionValidatorFactory);
when(protocolSpec.getFeeMarket()).thenReturn(getFeeMarket());
protocolSchedule = spy(executionContext.getProtocolSchedule());
doReturn(protocolSpec).when(protocolSchedule).getByBlockHeader(any());
blockGasLimit = blockchain.getChainHeadBlock().getHeader().getGasLimit();
ethProtocolManager = EthProtocolManagerTestUtil.create();
ethContext = spy(ethProtocolManager.ethContext());
final EthScheduler ethScheduler = spy(ethContext.getScheduler());
syncTaskCapture = ArgumentCaptor.forClass(Runnable.class);
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
doReturn(ethScheduler).when(ethContext).getScheduler();
peerTransactionTracker = new PeerTransactionTracker(ethContext.getEthPeers());
transactionBroadcaster =
spy(
new TransactionBroadcaster(
ethContext,
peerTransactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender));
transactionPool = createTransactionPool();
blockchain.observeBlockAdded(transactionPool);
}
protected TransactionPool createTransactionPool() {
return createTransactionPool(b -> b.minGasPrice(Wei.of(2)));
}
private TransactionPool createTransactionPool(
final Consumer<ImmutableTransactionPoolConfiguration.Builder> configConsumer) {
final ImmutableTransactionPoolConfiguration.Builder configBuilder =
ImmutableTransactionPoolConfiguration.builder();
configConsumer.accept(configBuilder);
final TransactionPoolConfiguration poolConfig = configBuilder.build();
final TransactionPoolReplacementHandler transactionReplacementHandler =
new TransactionPoolReplacementHandler(
poolConfig.getPriceBump(), poolConfig.getBlobPriceBump());
final BiFunction<PendingTransaction, PendingTransaction, Boolean> transactionReplacementTester =
(t1, t2) ->
transactionReplacementHandler.shouldReplace(
t1, t2, protocolContext.getBlockchain().getChainHeadHeader());
transactions = spy(createPendingTransactions(poolConfig, transactionReplacementTester));
final TransactionPool txPool =
new TransactionPool(
() -> transactions,
protocolSchedule,
protocolContext,
transactionBroadcaster,
ethContext,
new TransactionPoolMetrics(metricsSystem),
poolConfig,
new BlobCache());
txPool.setEnabled();
return txPool;
}
public abstract class AbstractTransactionPoolTest extends AbstractTransactionPoolTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
@ -466,21 +245,21 @@ public abstract class AbstractTransactionPoolTest {
public void shouldReAddBlobTxsWhenReorgHappens() {
givenTransactionIsValid(transaction0);
givenTransactionIsValid(transaction1);
givenTransactionIsValid(transactionBlob);
givenTransactionIsValid(transactionWithBlobs);
addAndAssertRemoteTransactionsValid(transaction0);
addAndAssertRemoteTransactionsValid(transaction1);
addAndAssertRemoteTransactionsValid(transactionBlob);
addAndAssertRemoteTransactionsValid(transactionWithBlobs);
final BlockHeader commonParent = getHeaderForCurrentChainHead();
final Block originalFork1 = appendBlock(Difficulty.of(1000), commonParent, transaction0);
final Block originalFork2 =
appendBlock(Difficulty.of(10), originalFork1.getHeader(), transaction1);
final Block originalFork3 =
appendBlock(Difficulty.of(1), originalFork2.getHeader(), transactionBlob);
appendBlock(Difficulty.of(1), originalFork2.getHeader(), transactionWithBlobs);
assertTransactionNotPending(transaction0);
assertTransactionNotPending(transaction1);
assertTransactionNotPending(transactionBlob);
assertTransactionNotPending(transactionWithBlobs);
final Block reorgFork1 = appendBlock(Difficulty.ONE, commonParent);
verifyChainHeadIs(originalFork3);
@ -493,14 +272,15 @@ public abstract class AbstractTransactionPoolTest {
assertTransactionPending(transaction0);
assertTransactionPending(transaction1);
assertTransactionPending(transactionBlob);
assertTransactionPending(transactionWithBlobs);
Optional<Transaction> maybeBlob = transactions.getTransactionByHash(transactionBlob.getHash());
Optional<Transaction> maybeBlob =
transactions.getTransactionByHash(transactionWithBlobs.getHash());
assertThat(maybeBlob).isPresent();
Transaction restoredBlob = maybeBlob.get();
assertThat(restoredBlob).isEqualTo(transactionBlob);
assertThat(restoredBlob).isEqualTo(transactionWithBlobs);
assertThat(restoredBlob.getBlobsWithCommitments().get().getBlobQuads())
.isEqualTo(transactionBlob.getBlobsWithCommitments().get().getBlobQuads());
.isEqualTo(transactionWithBlobs.getBlobsWithCommitments().get().getBlobQuads());
}
@ParameterizedTest
@ -1281,272 +1061,4 @@ public abstract class AbstractTransactionPoolTest {
.map(PendingTransaction::getTransaction)
.containsExactlyInAnyOrder(transaction1, transaction2a, transaction3);
}
private static TransactionPoolValidatorService getTransactionPoolValidatorServiceReturning(
final String errorMessage) {
return new TransactionPoolValidatorService() {
@Override
public PluginTransactionPoolValidator createTransactionValidator() {
return (transaction, isLocal, hasPriority) -> Optional.ofNullable(errorMessage);
}
@Override
public void registerPluginTransactionValidatorFactory(
final PluginTransactionPoolValidatorFactory pluginTransactionPoolValidatorFactory) {}
};
}
@SuppressWarnings("unused")
private static boolean isBaseFeeMarket(final ExtensionContext extensionContext) {
final Class<?> cz = extensionContext.getTestClass().get();
return cz.equals(LegacyTransactionPoolBaseFeeTest.class)
|| cz.equals(LayeredTransactionPoolBaseFeeTest.class);
}
protected void assertTransactionNotPending(final Transaction transaction) {
assertThat(transactions.getTransactionByHash(transaction.getHash())).isEmpty();
}
protected void addAndAssertRemoteTransactionInvalid(final Transaction tx) {
transactionPool.addRemoteTransactions(List.of(tx));
verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx));
assertTransactionNotPending(tx);
}
protected void assertTransactionPending(final Transaction t) {
assertThat(transactions.getTransactionByHash(t.getHash())).contains(t);
}
protected void addAndAssertRemoteTransactionsValid(final Transaction... txs) {
addAndAssertRemoteTransactionsValid(false, txs);
}
protected void addAndAssertRemotePriorityTransactionsValid(final Transaction... txs) {
addAndAssertRemoteTransactionsValid(true, txs);
}
protected void addAndAssertRemoteTransactionsValid(
final boolean hasPriority, final Transaction... txs) {
transactionPool.addRemoteTransactions(List.of(txs));
verify(transactionBroadcaster)
.onTransactionsAdded(
argThat(btxs -> btxs.size() == txs.length && btxs.containsAll(List.of(txs))));
Arrays.stream(txs).forEach(this::assertTransactionPending);
assertThat(transactions.getLocalTransactions()).doesNotContain(txs);
if (hasPriority) {
assertThat(transactions.getPriorityTransactions()).contains(txs);
}
}
protected void addAndAssertTransactionViaApiValid(
final Transaction tx, final boolean disableLocalPriority) {
final ValidationResult<TransactionInvalidReason> result =
transactionPool.addTransactionViaApi(tx);
assertThat(result.isValid()).isTrue();
assertTransactionPending(tx);
verify(transactionBroadcaster).onTransactionsAdded(singletonList(tx));
assertThat(transactions.getLocalTransactions()).contains(tx);
if (disableLocalPriority) {
assertThat(transactions.getPriorityTransactions()).doesNotContain(tx);
} else {
assertThat(transactions.getPriorityTransactions()).contains(tx);
}
}
protected void addAndAssertTransactionViaApiInvalid(
final Transaction tx, final TransactionInvalidReason invalidReason) {
final ValidationResult<TransactionInvalidReason> result =
transactionPool.addTransactionViaApi(tx);
assertThat(result.isValid()).isFalse();
assertThat(result.getInvalidReason()).isEqualTo(invalidReason);
assertTransactionNotPending(tx);
verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx));
}
@SuppressWarnings("unchecked")
protected void givenTransactionIsValid(final Transaction transaction) {
when(transactionValidatorFactory
.get()
.validate(eq(transaction), any(Optional.class), any(Optional.class), any()))
.thenReturn(valid());
when(transactionValidatorFactory
.get()
.validateForSender(
eq(transaction), nullable(Account.class), any(TransactionValidationParams.class)))
.thenReturn(valid());
}
protected abstract Block appendBlock(
final Difficulty difficulty,
final BlockHeader parentBlock,
final Transaction... transactionsToAdd);
protected Transaction createTransactionGasPriceMarket(
final int transactionNumber, final Wei maxPrice) {
return createBaseTransaction(transactionNumber).gasPrice(maxPrice).createTransaction(KEY_PAIR1);
}
protected Transaction createTransactionBaseFeeMarket(final int nonce, final Wei maxPrice) {
return createBaseTransaction(nonce)
.maxFeePerGas(Optional.of(maxPrice))
.maxPriorityFeePerGas(Optional.of(maxPrice.divide(5L)))
.createTransaction(KEY_PAIR1);
}
protected abstract TransactionTestFixture createBaseTransaction(final int nonce);
protected Transaction createTransaction(
final int transactionNumber, final Optional<BigInteger> maybeChainId) {
return createBaseTransaction(transactionNumber)
.chainId(maybeChainId)
.createTransaction(KEY_PAIR1);
}
protected abstract Transaction createTransaction(final int nonce, final Wei maxPrice);
protected Transaction createTransaction(final int nonce) {
return createTransaction(nonce, Optional.of(BigInteger.ONE));
}
protected Transaction createTransaction(final int nonce, final KeyPair keyPair) {
return createBaseTransaction(nonce).createTransaction(keyPair);
}
protected void verifyChainHeadIs(final Block forkBlock2) {
assertThat(blockchain.getChainHeadHash()).isEqualTo(forkBlock2.getHash());
}
protected BlockHeader getHeaderForCurrentChainHead() {
return blockchain.getBlockHeader(blockchain.getChainHeadHash()).get();
}
protected void appendBlock(final Transaction... transactionsToAdd) {
appendBlock(Difficulty.ONE, getHeaderForCurrentChainHead(), transactionsToAdd);
}
protected void protocolSupportsTxReplayProtection(
final long chainId, final boolean isSupportedAtCurrentBlock) {
when(protocolSpec.isReplayProtectionSupported()).thenReturn(isSupportedAtCurrentBlock);
when(protocolSchedule.getChainId()).thenReturn(Optional.of(BigInteger.valueOf(chainId)));
}
protected void protocolDoesNotSupportTxReplayProtection() {
when(protocolSchedule.getChainId()).thenReturn(Optional.empty());
}
protected Transaction createTransactionWithoutChainId(final int transactionNumber) {
return createTransaction(transactionNumber, Optional.empty());
}
protected void whenBlockBaseFeeIs(final Wei baseFee) {
final BlockHeader header =
BlockHeaderBuilder.fromHeader(blockchain.getChainHeadHeader())
.baseFee(baseFee)
.blockHeaderFunctions(new MainnetBlockHeaderFunctions())
.parentHash(blockchain.getChainHeadHash())
.buildBlockHeader();
blockchain.appendBlock(new Block(header, BlockBody.empty()), emptyList());
}
protected Transaction createFrontierTransaction(final int transactionNumber, final Wei gasPrice) {
return new TransactionTestFixture()
.nonce(transactionNumber)
.gasPrice(gasPrice)
.gasLimit(blockGasLimit)
.type(TransactionType.FRONTIER)
.createTransaction(KEY_PAIR1);
}
protected Transaction createBlobTransaction(final int nonce) {
return new TransactionTestFixture()
.nonce(nonce)
.gasLimit(blockGasLimit)
.gasPrice(null)
.maxFeePerGas(Optional.of(Wei.of(5000L)))
.maxPriorityFeePerGas(Optional.of(Wei.of(1000L)))
.type(TransactionType.BLOB)
.blobsWithCommitments(Optional.of(new BlobTestFixture().createBlobsWithCommitments(6)))
.createTransaction(KEY_PAIR1);
}
protected int addTxAndGetPendingTxsCount(
final Wei genesisBaseFee,
final Wei minGasPrice,
final Wei lastBlockBaseFee,
final Wei txMaxFeePerGas,
final boolean isLocal,
final boolean hasPriority) {
when(protocolSpec.getFeeMarket()).thenReturn(FeeMarket.london(0, Optional.of(genesisBaseFee)));
whenBlockBaseFeeIs(lastBlockBaseFee);
final Transaction transaction = createTransaction(0, txMaxFeePerGas);
if (hasPriority) {
transactionPool =
createTransactionPool(
b -> b.minGasPrice(minGasPrice).prioritySenders(Set.of(transaction.getSender())));
} else {
transactionPool =
createTransactionPool(b -> b.minGasPrice(minGasPrice).noLocalPriority(true));
}
givenTransactionIsValid(transaction);
if (isLocal) {
transactionPool.addTransactionViaApi(transaction);
} else {
transactionPool.addRemoteTransactions(List.of(transaction));
}
return transactions.size();
}
protected Block appendBlockGasPriceMarket(
final Difficulty difficulty,
final BlockHeader parentBlock,
final Transaction[] transactionsToAdd) {
final List<Transaction> transactionList = asList(transactionsToAdd);
final Block block =
new Block(
new BlockHeaderTestFixture()
.difficulty(difficulty)
.gasLimit(parentBlock.getGasLimit())
.parentHash(parentBlock.getHash())
.number(parentBlock.getNumber() + 1)
.buildHeader(),
new BlockBody(transactionList, emptyList()));
final List<TransactionReceipt> transactionReceipts =
transactionList.stream()
.map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty()))
.collect(toList());
blockchain.appendBlock(block, transactionReceipts);
return block;
}
protected Block appendBlockBaseFeeMarket(
final Difficulty difficulty,
final BlockHeader parentBlock,
final Transaction[] transactionsToAdd) {
final List<Transaction> transactionList = asList(transactionsToAdd);
final Block block =
new Block(
new BlockHeaderTestFixture()
.baseFeePerGas(Wei.of(10L))
.gasLimit(parentBlock.getGasLimit())
.difficulty(difficulty)
.parentHash(parentBlock.getHash())
.number(parentBlock.getNumber() + 1)
.buildHeader(),
new BlockBody(transactionList, emptyList()));
final List<TransactionReceipt> transactionReceipts =
transactionList.stream()
.map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty()))
.collect(toList());
blockchain.appendBlock(block, transactionReceipts);
return block;
}
}

@ -0,0 +1,585 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.mainnet.ValidationResult.valid;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.LENIENT;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.crypto.KeyPair;
import org.hyperledger.besu.crypto.SignatureAlgorithmFactory;
import org.hyperledger.besu.datatypes.BlobsWithCommitments;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlobTestFixture;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderBuilder;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ExecutionContextTestFixture;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredTransactionPoolBaseFeeTest;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.LegacyTransactionPoolBaseFeeTest;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolScheduleBuilder;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpecAdapters;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidatorFactory;
import org.hyperledger.besu.ethereum.mainnet.ValidationResult;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.evm.internal.EvmConfiguration;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.TransactionPoolValidatorService;
import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidator;
import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidatorFactory;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
@SuppressWarnings("unchecked")
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = LENIENT)
public abstract class AbstractTransactionPoolTestBase {
protected static final KeyPair KEY_PAIR1 =
SignatureAlgorithmFactory.getInstance().generateKeyPair();
private static final KeyPair KEY_PAIR2 =
SignatureAlgorithmFactory.getInstance().generateKeyPair();
protected static final Wei BASE_FEE_FLOOR = Wei.of(7L);
protected static final Wei DEFAULT_MIN_GAS_PRICE = Wei.of(50L);
protected final EthScheduler ethScheduler = new DeterministicEthScheduler();
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
protected TransactionValidatorFactory transactionValidatorFactory;
@Mock protected PendingTransactionAddedListener listener;
@Mock protected TransactionsMessageSender transactionsMessageSender;
@Mock protected NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
@Mock protected ProtocolSpec protocolSpec;
protected ProtocolSchedule protocolSchedule;
protected final MetricsSystem metricsSystem = new NoOpMetricsSystem();
protected MutableBlockchain blockchain;
protected TransactionBroadcaster transactionBroadcaster;
protected PendingTransactions transactions;
protected final Transaction transaction0 = createTransaction(0);
protected final Transaction transaction1 = createTransaction(1);
protected final Transaction transactionWithBlobs = createBlobTransaction(2);
protected final Transaction transactionWithBlobsReplacement =
createReplacementTransactionWithBlobs(2);
protected final Transaction transactionWithSameBlobs =
createBlobTransactionWithSameBlobs(3, transactionWithBlobs.getBlobsWithCommitments().get());
protected final Transaction transactionWithSameBlobsReplacement =
createReplacementTransactionWithBlobs(3);
protected final Transaction transactionOtherSender = createTransaction(1, KEY_PAIR2);
private ExecutionContextTestFixture executionContext;
protected ProtocolContext protocolContext;
protected TransactionPool transactionPool;
protected long blockGasLimit;
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
protected ArgumentCaptor<Runnable> syncTaskCapture;
protected PeerTransactionTracker peerTransactionTracker;
private BlobTestFixture blobTestFixture;
protected abstract PendingTransactions createPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final BiFunction<PendingTransaction, PendingTransaction, Boolean>
transactionReplacementTester);
protected TransactionTestFixture createBaseTransactionGasPriceMarket(
final int transactionNumber) {
return new TransactionTestFixture()
.nonce(transactionNumber)
.gasLimit(blockGasLimit)
.type(TransactionType.FRONTIER);
}
protected TransactionTestFixture createBaseTransactionBaseFeeMarket(final int nonce) {
return new TransactionTestFixture()
.nonce(nonce)
.gasLimit(blockGasLimit)
.gasPrice(null)
.maxFeePerGas(Optional.of(Wei.of(5000L)))
.maxPriorityFeePerGas(Optional.of(Wei.of(1000L)))
.type(TransactionType.EIP1559);
}
protected abstract ExecutionContextTestFixture createExecutionContextTestFixture();
protected static ExecutionContextTestFixture createExecutionContextTestFixtureBaseFeeMarket() {
final var genesisConfigFile = GenesisConfigFile.fromResource("/txpool-test-genesis.json");
final ProtocolSchedule protocolSchedule =
new ProtocolScheduleBuilder(
genesisConfigFile.getConfigOptions(),
BigInteger.valueOf(1),
ProtocolSpecAdapters.create(0, Function.identity()),
new PrivacyParameters(),
false,
EvmConfiguration.DEFAULT,
MiningParameters.MINING_DISABLED,
new BadBlockManager(),
false,
new NoOpMetricsSystem())
.createProtocolSchedule();
final ExecutionContextTestFixture executionContextTestFixture =
ExecutionContextTestFixture.builder(genesisConfigFile)
.protocolSchedule(protocolSchedule)
.build();
final Block block =
new Block(
new BlockHeaderTestFixture()
.gasLimit(
executionContextTestFixture
.getBlockchain()
.getChainHeadBlock()
.getHeader()
.getGasLimit())
.difficulty(Difficulty.ONE)
.baseFeePerGas(Wei.of(10L))
.parentHash(executionContextTestFixture.getBlockchain().getChainHeadHash())
.number(executionContextTestFixture.getBlockchain().getChainHeadBlockNumber() + 1)
.buildHeader(),
new BlockBody(List.of(), List.of()));
executionContextTestFixture.getBlockchain().appendBlock(block, List.of());
return executionContextTestFixture;
}
protected abstract FeeMarket getFeeMarket();
@BeforeEach
public void setUp() {
executionContext = createExecutionContextTestFixture();
protocolContext = executionContext.getProtocolContext();
blockchain = executionContext.getBlockchain();
when(protocolSpec.getTransactionValidatorFactory()).thenReturn(transactionValidatorFactory);
when(protocolSpec.getFeeMarket()).thenReturn(getFeeMarket());
protocolSchedule = spy(executionContext.getProtocolSchedule());
doReturn(protocolSpec).when(protocolSchedule).getByBlockHeader(any());
blockGasLimit = blockchain.getChainHeadBlock().getHeader().getGasLimit();
ethProtocolManager = EthProtocolManagerTestUtil.create();
ethContext = spy(ethProtocolManager.ethContext());
final EthScheduler ethScheduler = spy(ethContext.getScheduler());
syncTaskCapture = ArgumentCaptor.forClass(Runnable.class);
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
doReturn(ethScheduler).when(ethContext).getScheduler();
peerTransactionTracker = new PeerTransactionTracker(ethContext.getEthPeers());
transactionBroadcaster =
spy(
new TransactionBroadcaster(
ethContext,
peerTransactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender));
transactionPool = createTransactionPool();
blockchain.observeBlockAdded(transactionPool);
}
protected TransactionPool createTransactionPool() {
return createTransactionPool(b -> b.minGasPrice(Wei.of(2)));
}
TransactionPool createTransactionPool(
final Consumer<ImmutableTransactionPoolConfiguration.Builder> configConsumer) {
final ImmutableTransactionPoolConfiguration.Builder configBuilder =
ImmutableTransactionPoolConfiguration.builder();
configConsumer.accept(configBuilder);
final TransactionPoolConfiguration poolConfig = configBuilder.build();
final TransactionPoolReplacementHandler transactionReplacementHandler =
new TransactionPoolReplacementHandler(
poolConfig.getPriceBump(), poolConfig.getBlobPriceBump());
final BiFunction<PendingTransaction, PendingTransaction, Boolean> transactionReplacementTester =
(t1, t2) ->
transactionReplacementHandler.shouldReplace(
t1, t2, protocolContext.getBlockchain().getChainHeadHeader());
transactions = spy(createPendingTransactions(poolConfig, transactionReplacementTester));
final TransactionPool txPool =
new TransactionPool(
() -> transactions,
protocolSchedule,
protocolContext,
transactionBroadcaster,
ethContext,
new TransactionPoolMetrics(metricsSystem),
poolConfig,
new BlobCache());
txPool.setEnabled();
return txPool;
}
static TransactionPoolValidatorService getTransactionPoolValidatorServiceReturning(
final String errorMessage) {
return new TransactionPoolValidatorService() {
@Override
public PluginTransactionPoolValidator createTransactionValidator() {
return (transaction, isLocal, hasPriority) -> Optional.ofNullable(errorMessage);
}
@Override
public void registerPluginTransactionValidatorFactory(
final PluginTransactionPoolValidatorFactory pluginTransactionPoolValidatorFactory) {}
};
}
@SuppressWarnings("unused")
private static boolean isBaseFeeMarket(final ExtensionContext extensionContext) {
final Class<?> cz = extensionContext.getTestClass().get();
return cz.equals(LegacyTransactionPoolBaseFeeTest.class)
|| cz.equals(LayeredTransactionPoolBaseFeeTest.class)
|| cz.equals(BlobV1TransactionPoolTest.class);
}
protected void assertTransactionNotPending(final Transaction transaction) {
assertThat(transactions.getTransactionByHash(transaction.getHash())).isEmpty();
}
protected void addAndAssertRemoteTransactionInvalid(final Transaction tx) {
transactionPool.addRemoteTransactions(List.of(tx));
verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx));
assertTransactionNotPending(tx);
}
protected void assertTransactionPending(final Transaction t) {
assertThat(transactions.getTransactionByHash(t.getHash())).contains(t);
}
protected void addAndAssertRemoteTransactionsValid(final Transaction... txs) {
addAndAssertRemoteTransactionsValid(false, txs);
}
protected void addAndAssertRemotePriorityTransactionsValid(final Transaction... txs) {
addAndAssertRemoteTransactionsValid(true, txs);
}
protected void addAndAssertRemoteTransactionsValid(
final boolean hasPriority, final Transaction... txs) {
transactionPool.addRemoteTransactions(List.of(txs));
verify(transactionBroadcaster)
.onTransactionsAdded(
argThat(btxs -> btxs.size() == txs.length && btxs.containsAll(List.of(txs))));
Arrays.stream(txs).forEach(this::assertTransactionPending);
assertThat(transactions.getLocalTransactions()).doesNotContain(txs);
if (hasPriority) {
assertThat(transactions.getPriorityTransactions()).contains(txs);
}
}
protected void addAndAssertTransactionViaApiValid(
final Transaction tx, final boolean disableLocalPriority) {
final ValidationResult<TransactionInvalidReason> result =
transactionPool.addTransactionViaApi(tx);
assertThat(result.isValid()).isTrue();
assertTransactionPending(tx);
verify(transactionBroadcaster).onTransactionsAdded(singletonList(tx));
assertThat(transactions.getLocalTransactions()).contains(tx);
if (disableLocalPriority) {
assertThat(transactions.getPriorityTransactions()).doesNotContain(tx);
} else {
assertThat(transactions.getPriorityTransactions()).contains(tx);
}
}
protected void addAndAssertTransactionViaApiInvalid(
final Transaction tx, final TransactionInvalidReason invalidReason) {
final ValidationResult<TransactionInvalidReason> result =
transactionPool.addTransactionViaApi(tx);
assertThat(result.isValid()).isFalse();
assertThat(result.getInvalidReason()).isEqualTo(invalidReason);
assertTransactionNotPending(tx);
verify(transactionBroadcaster, never()).onTransactionsAdded(singletonList(tx));
}
@SuppressWarnings("unchecked")
protected void givenTransactionIsValid(final Transaction transaction) {
when(transactionValidatorFactory
.get()
.validate(eq(transaction), any(Optional.class), any(Optional.class), any()))
.thenReturn(valid());
when(transactionValidatorFactory
.get()
.validateForSender(
eq(transaction), nullable(Account.class), any(TransactionValidationParams.class)))
.thenReturn(valid());
}
protected abstract Block appendBlock(
final Difficulty difficulty,
final BlockHeader parentBlock,
final Transaction... transactionsToAdd);
protected Transaction createTransactionGasPriceMarket(
final int transactionNumber, final Wei maxPrice) {
return createBaseTransaction(transactionNumber).gasPrice(maxPrice).createTransaction(KEY_PAIR1);
}
protected Transaction createTransactionBaseFeeMarket(final int nonce, final Wei maxPrice) {
return createBaseTransaction(nonce)
.maxFeePerGas(Optional.of(maxPrice))
.maxPriorityFeePerGas(Optional.of(maxPrice.divide(5L)))
.createTransaction(KEY_PAIR1);
}
protected abstract TransactionTestFixture createBaseTransaction(final int nonce);
protected Transaction createTransaction(
final int transactionNumber, final Optional<BigInteger> maybeChainId) {
return createBaseTransaction(transactionNumber)
.chainId(maybeChainId)
.createTransaction(KEY_PAIR1);
}
protected abstract Transaction createTransaction(final int nonce, final Wei maxPrice);
protected Transaction createTransaction(final int nonce) {
return createTransaction(nonce, Optional.of(BigInteger.ONE));
}
protected Transaction createTransaction(final int nonce, final KeyPair keyPair) {
return createBaseTransaction(nonce).createTransaction(keyPair);
}
protected void verifyChainHeadIs(final Block forkBlock2) {
assertThat(blockchain.getChainHeadHash()).isEqualTo(forkBlock2.getHash());
}
protected BlockHeader getHeaderForCurrentChainHead() {
return blockchain.getBlockHeader(blockchain.getChainHeadHash()).get();
}
protected void appendBlock(final Transaction... transactionsToAdd) {
appendBlock(Difficulty.ONE, getHeaderForCurrentChainHead(), transactionsToAdd);
}
protected void protocolSupportsTxReplayProtection(
final long chainId, final boolean isSupportedAtCurrentBlock) {
when(protocolSpec.isReplayProtectionSupported()).thenReturn(isSupportedAtCurrentBlock);
when(protocolSchedule.getChainId()).thenReturn(Optional.of(BigInteger.valueOf(chainId)));
}
protected void protocolDoesNotSupportTxReplayProtection() {
when(protocolSchedule.getChainId()).thenReturn(Optional.empty());
}
protected Transaction createTransactionWithoutChainId(final int transactionNumber) {
return createTransaction(transactionNumber, Optional.empty());
}
protected void whenBlockBaseFeeIs(final Wei baseFee) {
final BlockHeader header =
BlockHeaderBuilder.fromHeader(blockchain.getChainHeadHeader())
.baseFee(baseFee)
.blockHeaderFunctions(new MainnetBlockHeaderFunctions())
.parentHash(blockchain.getChainHeadHash())
.buildBlockHeader();
blockchain.appendBlock(new Block(header, BlockBody.empty()), emptyList());
}
protected Transaction createFrontierTransaction(final int transactionNumber, final Wei gasPrice) {
return new TransactionTestFixture()
.nonce(transactionNumber)
.gasPrice(gasPrice)
.gasLimit(blockGasLimit)
.type(TransactionType.FRONTIER)
.createTransaction(KEY_PAIR1);
}
protected Transaction createBlobTransaction(final int nonce) {
if (blobTestFixture == null) {
blobTestFixture = new BlobTestFixture();
}
return new TransactionTestFixture()
.nonce(nonce)
.gasLimit(blockGasLimit)
.gasPrice(null)
.maxFeePerGas(Optional.of(Wei.of(5000L)))
.maxPriorityFeePerGas(Optional.of(Wei.of(1000L)))
.type(TransactionType.BLOB)
.blobsWithCommitments(Optional.of(blobTestFixture.createBlobsWithCommitments(6)))
.createTransaction(KEY_PAIR1);
}
protected Transaction createBlobTransactionWithSameBlobs(
final int nonce, final BlobsWithCommitments blobsWithCommitments) {
return new TransactionTestFixture()
.nonce(nonce)
.gasLimit(blockGasLimit)
.gasPrice(null)
.maxFeePerGas(Optional.of(Wei.of(5000L)))
.maxPriorityFeePerGas(Optional.of(Wei.of(1000L)))
.type(TransactionType.BLOB)
.blobsWithCommitments(Optional.of(blobsWithCommitments))
.createTransaction(KEY_PAIR1);
}
protected Transaction createReplacementTransactionWithBlobs(final int nonce) {
if (blobTestFixture == null) {
blobTestFixture = new BlobTestFixture();
}
return new TransactionTestFixture()
.nonce(nonce)
.gasLimit(blockGasLimit)
.gasPrice(null)
.maxFeePerGas(Optional.of(Wei.of(5000L * 10)))
.maxPriorityFeePerGas(Optional.of(Wei.of(1000L * 10)))
.maxFeePerBlobGas(Optional.of(Wei.of(5000L)))
.type(TransactionType.BLOB)
.blobsWithCommitments(Optional.of(blobTestFixture.createBlobsWithCommitments(6)))
.createTransaction(KEY_PAIR1);
}
protected int addTxAndGetPendingTxsCount(
final Wei genesisBaseFee,
final Wei minGasPrice,
final Wei lastBlockBaseFee,
final Wei txMaxFeePerGas,
final boolean isLocal,
final boolean hasPriority) {
when(protocolSpec.getFeeMarket()).thenReturn(FeeMarket.london(0, Optional.of(genesisBaseFee)));
whenBlockBaseFeeIs(lastBlockBaseFee);
final Transaction transaction = createTransaction(0, txMaxFeePerGas);
if (hasPriority) {
transactionPool =
createTransactionPool(
b -> b.minGasPrice(minGasPrice).prioritySenders(Set.of(transaction.getSender())));
} else {
transactionPool =
createTransactionPool(b -> b.minGasPrice(minGasPrice).noLocalPriority(true));
}
givenTransactionIsValid(transaction);
if (isLocal) {
transactionPool.addTransactionViaApi(transaction);
} else {
transactionPool.addRemoteTransactions(List.of(transaction));
}
return transactions.size();
}
protected Block appendBlockGasPriceMarket(
final Difficulty difficulty,
final BlockHeader parentBlock,
final Transaction[] transactionsToAdd) {
final List<Transaction> transactionList = asList(transactionsToAdd);
final Block block =
new Block(
new BlockHeaderTestFixture()
.difficulty(difficulty)
.gasLimit(parentBlock.getGasLimit())
.parentHash(parentBlock.getHash())
.number(parentBlock.getNumber() + 1)
.buildHeader(),
new BlockBody(transactionList, emptyList()));
final List<TransactionReceipt> transactionReceipts =
transactionList.stream()
.map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty()))
.collect(toList());
blockchain.appendBlock(block, transactionReceipts);
return block;
}
protected Block appendBlockBaseFeeMarket(
final Difficulty difficulty,
final BlockHeader parentBlock,
final Transaction[] transactionsToAdd) {
final List<Transaction> transactionList = asList(transactionsToAdd);
final Block block =
new Block(
new BlockHeaderTestFixture()
.baseFeePerGas(Wei.of(10L))
.gasLimit(parentBlock.getGasLimit())
.difficulty(difficulty)
.parentHash(parentBlock.getHash())
.number(parentBlock.getNumber() + 1)
.buildHeader(),
new BlockBody(transactionList, emptyList()));
final List<TransactionReceipt> transactionReceipts =
transactionList.stream()
.map(transaction -> new TransactionReceipt(1, 1, emptyList(), Optional.empty()))
.collect(toList());
blockchain.appendBlock(block, transactionReceipts);
return block;
}
}

@ -0,0 +1,141 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.datatypes.BlobsWithCommitments;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ExecutionContextTestFixture;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.testutil.TestClock;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import org.junit.jupiter.api.Test;
public class BlobV1TransactionPoolTest extends AbstractTransactionPoolTestBase {
@Override
protected PendingTransactions createPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final BiFunction<PendingTransaction, PendingTransaction, Boolean>
transactionReplacementTester) {
return new BaseFeePendingTransactionsSorter(
poolConfig,
TestClock.system(ZoneId.systemDefault()),
metricsSystem,
protocolContext.getBlockchain()::getChainHeadHeader);
}
@Override
protected Transaction createTransaction(final int transactionNumber, final Wei maxPrice) {
return createTransactionBaseFeeMarket(transactionNumber, maxPrice);
}
@Override
protected TransactionTestFixture createBaseTransaction(final int transactionNumber) {
return createBaseTransactionBaseFeeMarket(transactionNumber);
}
@Override
protected ExecutionContextTestFixture createExecutionContextTestFixture() {
return createExecutionContextTestFixtureBaseFeeMarket();
}
@Override
protected FeeMarket getFeeMarket() {
return FeeMarket.london(0L, Optional.of(BASE_FEE_FLOOR));
}
@Override
protected Block appendBlock(
final Difficulty difficulty,
final BlockHeader parentBlock,
final Transaction... transactionsToAdd) {
return appendBlockBaseFeeMarket(difficulty, parentBlock, transactionsToAdd);
}
@Test
public void shouldReturnBlobWhenTransactionAddedToPool() {
givenTransactionIsValid(transactionWithBlobs);
addAndAssertRemoteTransactionsValid(transactionWithBlobs);
assertTransactionPending(transactionWithBlobs);
// assert that the blobs are returned from the tx pool
final List<BlobsWithCommitments.BlobQuad> expectedBlobQuads =
transactionWithBlobs.getBlobsWithCommitments().get().getBlobQuads();
expectedBlobQuads.forEach(
bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq));
}
@Test
public void shouldNotReturnBlobsWhenAllTxsContainingBlobsHaveBeenReplaced() {
givenTransactionIsValid(transactionWithBlobs);
givenTransactionIsValid(transactionWithBlobsReplacement);
givenTransactionIsValid(transactionWithSameBlobs); // contains same blobs as transactionBlob
givenTransactionIsValid(transactionWithSameBlobsReplacement);
addAndAssertRemoteTransactionsValid(transactionWithBlobs);
assertTransactionPending(transactionWithBlobs);
final List<BlobsWithCommitments.BlobQuad> expectedBlobQuads =
transactionWithBlobs.getBlobsWithCommitments().get().getBlobQuads();
// assert that the blobs are returned from the tx pool
expectedBlobQuads.forEach(
bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq));
// add different transaction that contains the same blobs
addAndAssertRemoteTransactionsValid(transactionWithSameBlobs);
assertTransactionPending(transactionWithBlobs);
assertTransactionPending(transactionWithSameBlobs);
// assert that the blobs are still returned from the tx pool
expectedBlobQuads.forEach(
bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq));
// replace the second blob transaction with tx with different blobs
addAndAssertRemoteTransactionsValid(transactionWithSameBlobsReplacement);
assertTransactionPending(transactionWithSameBlobsReplacement);
assertTransactionNotPending(transactionWithSameBlobs);
// assert that the blob is still returned from the tx pool
expectedBlobQuads.forEach(
bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isEqualTo(bq));
// replace the first blob transaction with tx with different blobs
addAndAssertRemoteTransactionsValid(transactionWithBlobsReplacement);
assertTransactionPending(transactionWithBlobsReplacement);
assertTransactionNotPending(transactionWithBlobs);
// All txs containing the expected blobs have been replaced,
// so the blobs should no longer be returned from the tx pool
expectedBlobQuads.forEach(
bq -> assertThat(transactionPool.getBlobQuad(bq.versionedHash())).isNull());
}
}

@ -26,16 +26,17 @@ import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.hash.BloomFilter;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.tuweni.bytes.Bytes;
/**
@ -54,10 +55,7 @@ public class PeerTable {
private final Map<Bytes, Integer> distanceCache;
private BloomFilter<Bytes> idBloom;
private int evictionCnt = 0;
private final LinkedHashMapWithMaximumSize<String, Integer> ipAddressCheckMap =
new LinkedHashMapWithMaximumSize<>(DEFAULT_BUCKET_SIZE * N_BUCKETS);
private final CircularFifoQueue<String> invalidIPs =
new CircularFifoQueue<>(DEFAULT_BUCKET_SIZE * N_BUCKETS);
private final Cache<String, Integer> unresponsiveIPs;
/**
* Builds a new peer table, where distance is calculated using the provided nodeId as a baseline.
@ -72,6 +70,11 @@ public class PeerTable {
.toArray(Bucket[]::new);
this.distanceCache = new ConcurrentHashMap<>();
this.maxEntriesCnt = N_BUCKETS * DEFAULT_BUCKET_SIZE;
this.unresponsiveIPs =
CacheBuilder.newBuilder()
.maximumSize(maxEntriesCnt)
.expireAfterWrite(15L, TimeUnit.MINUTES)
.build();
// A bloom filter with 4096 expected insertions of 64-byte keys with a 0.1% false positive
// probability yields a memory footprint of ~7.5kb.
@ -140,7 +143,6 @@ public class PeerTable {
if (!res.isPresent()) {
idBloom.put(id);
distanceCache.put(id, distance);
ipAddressCheckMap.put(getKey(peer.getEndpoint()), peer.getEndpoint().getUdpPort());
return AddResult.added();
}
@ -214,26 +216,12 @@ public class PeerTable {
public boolean isIpAddressInvalid(final Endpoint endpoint) {
final String key = getKey(endpoint);
if (invalidIPs.contains(key)) {
return true;
}
if (ipAddressCheckMap.containsKey(key) && ipAddressCheckMap.get(key) != endpoint.getUdpPort()) {
// This peer has multiple discovery services on the same IP address + TCP port.
invalidIPs.add(key);
for (final Bucket bucket : table) {
bucket.getPeers().stream()
.filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost()))
.forEach(bucket::evict);
}
return true;
} else {
return false;
}
return unresponsiveIPs.getIfPresent(key) != null;
}
public void invalidateIP(final Endpoint endpoint) {
final String key = getKey(endpoint);
invalidIPs.add(key);
unresponsiveIPs.put(key, Integer.MAX_VALUE);
}
private static String getKey(final Endpoint endpoint) {
@ -313,20 +301,6 @@ public class PeerTable {
}
}
private static class LinkedHashMapWithMaximumSize<K, V> extends LinkedHashMap<K, V> {
private final int maxSize;
public LinkedHashMapWithMaximumSize(final int maxSize) {
super(maxSize, 0.75f, false);
this.maxSize = maxSize;
}
@Override
protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
return size() > maxSize;
}
}
static class EvictResult {
public enum EvictOutcome {
EVICTED,

@ -188,15 +188,12 @@ public class PeerTableTest {
@Test
public void ipAddressIsInvalidReturnsTrue() {
final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1)));
final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(1)));
final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1);
final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2);
final PeerTable table = new PeerTable(Bytes.random(64));
final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());
table.invalidateIP(endpoint1);
assertThat(table.isIpAddressInvalid(peer2.getEndpoint())).isEqualTo(true);
assertThat(table.isIpAddressInvalid(peer1.getEndpoint())).isEqualTo(true);
}
@Test
@ -216,16 +213,12 @@ public class PeerTableTest {
@Test
public void invalidIPAddressNotAdded() {
final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1)));
final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(1)));
final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1);
final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2);
final PeerTable table = new PeerTable(Bytes.random(64));
table.invalidateIP(endpoint1);
final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());
final PeerTable.AddResult addResult2 = table.tryAdd(peer2);
assertThat(addResult2.getOutcome()).isEqualTo(PeerTable.AddResult.invalid().getOutcome());
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.invalid().getOutcome());
}
@Test

Loading…
Cancel
Save