Merge remote-tracking branch 'upstream/main' into tessera_as_internal_process

tessera_as_internal_process
George Tebrean 11 months ago
commit b5ce971d6e
  1. 1
      CHANGELOG.md
  2. 5
      besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java
  3. 3
      ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java
  4. 3
      ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java
  5. 49
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java
  6. 69
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java
  7. 52
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
  8. 37
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java
  9. 47
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthSchedulerTest.java
  10. 6
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  11. 6
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java
  12. 7
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java
  13. 55
      metrics/core/src/main/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplier.java
  14. 36
      metrics/core/src/test/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplierTest.java

@ -15,6 +15,7 @@
- Set Ethereum Classic mainnet activation block for Spiral network upgrade [#6267](https://github.com/hyperledger/besu/pull/6267)
- Add custom genesis file name to config overview if specified [#6297](https://github.com/hyperledger/besu/pull/6297)
- Update Gradle plugins and replace unmaintained License Gradle Plugin with the actively maintained Gradle License Report [#6275](https://github.com/hyperledger/besu/pull/6275)
- Disable transaction handling when the node is not in sync, to avoid unnecessary transaction validation work [#6302](https://github.com/hyperledger/besu/pull/6302)
- Optimize RocksDB WAL files, allows for faster restart and a more linear disk space utilization [#6328](https://github.com/hyperledger/besu/pull/6328)
- Optimize acceptance tests by eliminating docker and executing them as a processes [#5968](https://github.com/hyperledger/besu/pull/5968)

@ -41,7 +41,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
@ -64,6 +63,7 @@ import org.hyperledger.besu.plugin.data.LogWithMetadata;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.testutil.TestClock;
import java.math.BigInteger;
@ -100,7 +100,6 @@ public class BesuEventsImplTest {
@Mock private EthPeers mockEthPeers;
@Mock private EthContext mockEthContext;
@Mock private EthMessages mockEthMessages;
@Mock private EthScheduler mockEthScheduler;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private TransactionValidatorFactory mockTransactionValidatorFactory;
@ -128,7 +127,7 @@ public class BesuEventsImplTest {
when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthContext.getScheduler()).thenReturn(new DeterministicEthScheduler());
lenient().when(mockEthPeers.streamAvailablePeers()).thenAnswer(z -> Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
lenient().when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);

@ -18,6 +18,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -107,7 +108,7 @@ public class EthGetFilterChangesIntegrationTest {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();
EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

@ -18,6 +18,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -107,7 +108,7 @@ public class EthGetFilterChangesIntegrationTest {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();
EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

@ -23,9 +23,11 @@ import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -34,6 +36,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -295,4 +299,49 @@ public class EthScheduler {
delay,
unit);
}
public <ITEM> OrderedProcessor<ITEM> createOrderedProcessor(final Consumer<ITEM> processor) {
return new OrderedProcessor<>(processor);
}
/**
* This class is a way to execute a set of tasks, one by one, in a strict order, without blocking
* the caller in case there are still previous tasks queued
*
* @param <ITEM> the class of item to be processed
*/
public class OrderedProcessor<ITEM> {
private final Queue<ITEM> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final ReentrantLock blockAddedLock = new ReentrantLock();
private final Consumer<ITEM> processor;
private OrderedProcessor(final Consumer<ITEM> processor) {
this.processor = processor;
}
public void submit(final ITEM item) {
// add the item to the processing queue
blockAddedQueue.add(item);
if (blockAddedLock.hasQueuedThreads()) {
// another thread is already waiting to process the queue with our item, there is no need to
// schedule another thread
LOG.trace(
"Block added event queue is already being processed and an already queued thread is present, nothing to do");
} else {
servicesExecutor.submit(
() -> {
blockAddedLock.lock();
try {
// now that we have the lock, process as many items as possible
for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) {
processor.accept(i);
}
} finally {
blockAddedLock.unlock();
}
});
}
}
}
}

@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
@ -61,11 +62,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -107,8 +106,7 @@ public class TransactionPool implements BlockAddedObserver {
private volatile OptionalLong subscribeConnectId = OptionalLong.empty();
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
private final Set<Address> localSenders = ConcurrentHashMap.newKeySet();
private final Lock blockAddedLock = new ReentrantLock();
private final Queue<BlockAddedEvent> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final EthScheduler.OrderedProcessor<BlockAddedEvent> blockAddedEventOrderedProcessor;
public TransactionPool(
final Supplier<PendingTransactions> pendingTransactionsSupplier,
@ -130,6 +128,8 @@ public class TransactionPool implements BlockAddedObserver {
pluginTransactionValidatorFactory == null
? null
: pluginTransactionValidatorFactory.create();
this.blockAddedEventOrderedProcessor =
ethContext.getScheduler().createOrderedProcessor(this::processBlockAddedEvent);
initLogForReplay();
}
@ -322,58 +322,29 @@ public class TransactionPool implements BlockAddedObserver {
@Override
public void onBlockAdded(final BlockAddedEvent event) {
if (isPoolEnabled.get()) {
final long started = System.currentTimeMillis();
if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED)
|| event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) {
// add the event to the processing queue
blockAddedQueue.add(event);
// we want to process the added block asynchronously,
// but at the same time we must ensure that blocks are processed in order one at time
ethContext
.getScheduler()
.scheduleServiceTask(
() -> {
while (!blockAddedQueue.isEmpty()) {
if (blockAddedLock.tryLock()) {
// no other thread is processing the queue, so start processing it
try {
BlockAddedEvent e = blockAddedQueue.poll();
// check again since another thread could have stolen our task
if (e != null) {
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule
.getByBlockHeader(e.getBlock().getHeader())
.getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}
} finally {
blockAddedLock.unlock();
}
} else {
try {
// wait a bit before retrying
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
return null;
});
blockAddedEventOrderedProcessor.submit(event);
}
}
}
private void processBlockAddedEvent(final BlockAddedEvent e) {
final long started = System.currentTimeMillis();
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule.getByBlockHeader(e.getBlock().getHeader()).getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}
private void reAddTransactions(final List<Transaction> reAddTransactions) {
if (!reAddTransactions.isEmpty()) {
// if adding a blob tx, and it is missing its blob, is a re-org and we should restore the blob

@ -156,24 +156,62 @@ public class TransactionPoolFactory {
@Override
public void onInitialSyncCompleted() {
LOG.info("Enabling transaction handling following initial sync");
transactionTracker.reset();
transactionPool.setEnabled();
transactionsMessageHandler.setEnabled();
pooledTransactionsMessageHandler.setEnabled();
enableTransactionHandling(
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
}
@Override
public void onInitialSyncRestart() {
LOG.info("Disabling transaction handling during re-sync");
pooledTransactionsMessageHandler.setDisabled();
transactionsMessageHandler.setDisabled();
transactionPool.setDisabled();
disableTransactionHandling(
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
}
});
syncState.subscribeInSync(
isInSync -> {
if (isInSync != transactionPool.isEnabled()) {
if (isInSync) {
LOG.info("Node is in sync, enabling transaction handling");
enableTransactionHandling(
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
} else {
LOG.info("Node out of sync, disabling transaction handling");
disableTransactionHandling(
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
}
}
});
return transactionPool;
}
private static void enableTransactionHandling(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionsMessageHandler transactionsMessageHandler,
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
transactionTracker.reset();
transactionPool.setEnabled();
transactionsMessageHandler.setEnabled();
pooledTransactionsMessageHandler.setEnabled();
}
private static void disableTransactionHandling(
final TransactionPool transactionPool,
final TransactionsMessageHandler transactionsMessageHandler,
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
transactionPool.setDisabled();
transactionsMessageHandler.setDisabled();
pooledTransactionsMessageHandler.setDisabled();
}
private static void subscribeTransactionHandlers(
final ProtocolContext protocolContext,
final EthContext ethContext,

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.transactions;
import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
@ -46,6 +47,9 @@ public class TransactionPoolMetrics {
private final LabelledMetric<Counter> expiredMessagesCounter;
private final Map<String, RunnableCounter> expiredMessagesRunnableCounters = new HashMap<>();
private final LabelledMetric<Counter> alreadySeenTransactionsCounter;
private final Map<String, ReplaceableDoubleSupplier> spaceUsedSuppliers = new HashMap<>();
private final Map<String, ReplaceableDoubleSupplier> transactionCountSuppliers = new HashMap<>();
private final Map<String, ReplaceableDoubleSupplier> uniqueSendersSuppliers = new HashMap<>();
public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
this.metricsSystem = metricsSystem;
@ -120,17 +124,44 @@ public class TransactionPoolMetrics {
}
public void initSpaceUsed(final DoubleSupplier spaceUsedSupplier, final String layer) {
spaceUsed.labels(spaceUsedSupplier, layer);
spaceUsedSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(spaceUsedSupplier);
spaceUsed.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(spaceUsedSupplier);
});
}
public void initTransactionCount(
final DoubleSupplier transactionCountSupplier, final String layer) {
transactionCount.labels(transactionCountSupplier, layer);
transactionCountSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(transactionCountSupplier);
transactionCount.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(transactionCountSupplier);
});
}
public void initUniqueSenderCount(
final DoubleSupplier uniqueSenderCountSupplier, final String layer) {
uniqueSenderCount.labels(uniqueSenderCountSupplier, layer);
uniqueSendersSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(uniqueSenderCountSupplier);
uniqueSenderCount.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(uniqueSenderCountSupplier);
});
}
public void initExpiredMessagesCounter(final String message) {

@ -21,16 +21,24 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.testutil.MockExecutorService;
import org.hyperledger.besu.testutil.MockScheduledExecutor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -209,4 +217,43 @@ public class EthSchedulerTest {
assertThat(task.isFailed()).isTrue();
assertThat(task.isCancelled()).isTrue();
}
@Test
public void itemsSubmittedToOrderedProcessorAreProcessedInOrder() throws InterruptedException {
final int numOfItems = 100;
final Random random = new Random();
final EthScheduler realEthScheduler = new EthScheduler(1, 1, 1, new NoOpMetricsSystem());
final List<String> processedStrings = new CopyOnWriteArrayList<>();
final Consumer<String> stringProcessor =
s -> {
processedStrings.add(s);
try {
Thread.sleep(random.nextInt(20));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
final var orderProcessor = realEthScheduler.createOrderedProcessor(stringProcessor);
IntStream.range(0, numOfItems)
.mapToObj(String::valueOf)
.forEach(
s -> {
orderProcessor.submit(s);
try {
Thread.sleep(random.nextInt(20));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
final List<String> expectedStrings = new ArrayList<>(numOfItems);
IntStream.range(0, numOfItems).mapToObj(String::valueOf).forEach(expectedStrings::add);
Awaitility.await().until(() -> processedStrings.size() == numOfItems);
assertThat(processedStrings).containsExactlyElementsOf(expectedStrings);
}
}

@ -166,7 +166,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
RespondingEthPeer.blockchainResponder(
blockchain, protocolContext.getWorldStateArchive(), transactionPool);
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
// Setup data to be requested and expected response
final T requestedData = generateDataToBeRequested();
@ -190,7 +190,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
@Test
public void doesNotCompleteWhenPeersDoNotRespond() {
// Setup a unresponsive peer
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
// Setup data to be requested
final T requestedData = generateDataToBeRequested();
@ -209,7 +209,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
@Test
public void cancel() {
// Setup a unresponsive peer
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
// Setup data to be requested
final T requestedData = generateDataToBeRequested();

@ -58,7 +58,7 @@ public abstract class PeerMessageTaskTest<T>
protocolSchedule,
0.5f);
final RespondingEthPeer respondingEthPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
// Execute task and wait for response
final AtomicReference<T> actualResult = new AtomicReference<>();
@ -109,7 +109,7 @@ public abstract class PeerMessageTaskTest<T>
// Setup a unresponsive peer
final RespondingEthPeer.Responder responder = RespondingEthPeer.emptyResponder();
final RespondingEthPeer respondingEthPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
// Setup data to be requested
final T requestedData = generateDataToBeRequested();
@ -129,7 +129,7 @@ public abstract class PeerMessageTaskTest<T>
peersDoTimeout.set(true);
// Setup a unresponsive peer
final RespondingEthPeer respondingEthPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32);
// Setup data to be requested
final T requestedData = generateDataToBeRequested();

@ -32,7 +32,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@ -100,7 +99,6 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
@ -233,12 +231,9 @@ public abstract class AbstractTransactionPoolTest {
ethProtocolManager = EthProtocolManagerTestUtil.create();
ethContext = spy(ethProtocolManager.ethContext());
final EthScheduler ethScheduler = mock(EthScheduler.class);
final EthScheduler ethScheduler = spy(ethContext.getScheduler());
syncTaskCapture = ArgumentCaptor.forClass(Runnable.class);
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
doAnswer(invocation -> ((Supplier<Void>) invocation.getArguments()[0]).get())
.when(ethScheduler)
.scheduleServiceTask(any(Supplier.class));
doReturn(ethScheduler).when(ethContext).getScheduler();
peerTransactionTracker = new PeerTransactionTracker();

@ -0,0 +1,55 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.metrics;
import java.util.function.DoubleSupplier;
/**
* This class provides a replaceable double supplier. It allows to replace the current double
* supplier with a new one.
*/
public class ReplaceableDoubleSupplier implements DoubleSupplier {
private DoubleSupplier currentSupplier;
/**
* Constructs a new ReplaceableDoubleSupplier with the given initial supplier.
*
* @param currentSupplier the initial double supplier
*/
public ReplaceableDoubleSupplier(final DoubleSupplier currentSupplier) {
this.currentSupplier = currentSupplier;
}
/**
* Gets a double value from the current supplier.
*
* @return the double value supplied by the current supplier
*/
@Override
public double getAsDouble() {
return currentSupplier.getAsDouble();
}
/**
* Replaces the current double supplier with a new one.
*
* @param newSupplier the new double supplier
* @return this ReplaceableDoubleSupplier
*/
public ReplaceableDoubleSupplier replaceDoubleSupplier(final DoubleSupplier newSupplier) {
currentSupplier = newSupplier;
return this;
}
}

@ -0,0 +1,36 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.metrics;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.Test;
public class ReplaceableDoubleSupplierTest {
@Test
public void shouldWorkAsNormalSupplier() {
final var rds = new ReplaceableDoubleSupplier(() -> 1.0);
assertThat(rds.getAsDouble()).isEqualTo(1.0);
}
@Test
public void shouldReturnValueFromNewSupplierIfReplaced() {
final var rds = new ReplaceableDoubleSupplier(() -> 1.0);
assertThat(rds.getAsDouble()).isEqualTo(1.0);
rds.replaceDoubleSupplier(() -> 2.0);
assertThat(rds.getAsDouble()).isEqualTo(2.0);
}
}
Loading…
Cancel
Save