Trie log prune async using TrieLogEvent (#6394)

* Change TrieLogPruner to implement a trieLogObserver
* Remove TrieLogPruner from TrieLogManager
* Remove NoOpTrieLogPruner now it is conditionally added as an observer
* Prune async using EthScheduler.servicesExecutor

---------
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
Co-authored-by: Simon Dudley <simon.dudley@consensys.net>
pull/6453/head
Gabriel Fukushima 9 months ago committed by GitHub
parent 1c1f538534
commit 3fc3fb1225
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 60
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 10
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/BonsaiWorldStateProvider.java
  3. 2
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/NoOpTrieLogManager.java
  4. 7
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogManager.java
  5. 47
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogPruner.java
  6. 4
      ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/InMemoryKeyValueStorageProvider.java
  7. 4
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/bonsai/AbstractIsolationTests.java
  8. 4
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/bonsai/BonsaiWorldStateArchiveTest.java
  9. 5
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogManagerTests.java
  10. 68
      ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogPrunerTest.java
  11. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java

@ -83,6 +83,7 @@ import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import org.hyperledger.besu.ethereum.trie.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogManager;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.forest.ForestWorldStateArchive;
import org.hyperledger.besu.ethereum.trie.forest.pruner.MarkSweepPruner;
@ -781,6 +782,15 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final JsonRpcMethods additionalJsonRpcMethodFactory =
createAdditionalJsonRpcMethodFactory(protocolContext);
if (dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningEnabled()
&& DataStorageFormat.BONSAI.equals(dataStorageConfiguration.getDataStorageFormat())) {
final TrieLogManager trieLogManager =
((BonsaiWorldStateProvider) worldStateArchive).getTrieLogManager();
final TrieLogPruner trieLogPruner =
createTrieLogPruner(worldStateStorage, blockchain, scheduler);
trieLogManager.subscribe(trieLogPruner);
}
final List<Closeable> closeables = new ArrayList<>();
closeables.add(protocolContext.getWorldStateArchive());
closeables.add(storageProvider);
@ -809,6 +819,26 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
dataStorageConfiguration);
}
private TrieLogPruner createTrieLogPruner(
final WorldStateStorage worldStateStorage,
final Blockchain blockchain,
final EthScheduler scheduler) {
final GenesisConfigOptions genesisConfigOptions = configOptionsSupplier.get();
final boolean isProofOfStake = genesisConfigOptions.getTerminalTotalDifficulty().isPresent();
final TrieLogPruner trieLogPruner =
new TrieLogPruner(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
scheduler::executeServiceTask,
dataStorageConfiguration.getUnstable().getBonsaiTrieLogRetentionThreshold(),
dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningLimit(),
isProofOfStake);
trieLogPruner.initialize();
return trieLogPruner;
}
/**
* Create synchronizer synchronizer.
*
@ -1069,29 +1099,13 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final Blockchain blockchain,
final CachedMerkleTrieLoader cachedMerkleTrieLoader) {
return switch (dataStorageConfiguration.getDataStorageFormat()) {
case BONSAI -> {
final GenesisConfigOptions genesisConfigOptions = configOptionsSupplier.get();
final boolean isProofOfStake =
genesisConfigOptions.getTerminalTotalDifficulty().isPresent();
final TrieLogPruner trieLogPruner =
dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningEnabled()
? new TrieLogPruner(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
dataStorageConfiguration.getUnstable().getBonsaiTrieLogRetentionThreshold(),
dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningLimit(),
isProofOfStake)
: TrieLogPruner.noOpTrieLogPruner();
trieLogPruner.initialize();
yield new BonsaiWorldStateProvider(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
Optional.of(dataStorageConfiguration.getBonsaiMaxLayersToLoad()),
cachedMerkleTrieLoader,
besuComponent.map(BesuComponent::getBesuPluginContext).orElse(null),
evmConfiguration,
trieLogPruner);
}
case BONSAI -> new BonsaiWorldStateProvider(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
Optional.of(dataStorageConfiguration.getBonsaiMaxLayersToLoad()),
cachedMerkleTrieLoader,
besuComponent.map(BesuComponent::getBesuPluginContext).orElse(null),
evmConfiguration);
case FOREST -> {
final WorldStatePreimageStorage preimageStorage =
storageProvider.createWorldStatePreimageStorage();

@ -31,7 +31,6 @@ import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedWorldStorageManager;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogManager;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.trie.bonsai.worldview.BonsaiWorldStateUpdateAccumulator;
import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie;
@ -73,18 +72,13 @@ public class BonsaiWorldStateProvider implements WorldStateArchive {
final Optional<Long> maxLayersToLoad,
final CachedMerkleTrieLoader cachedMerkleTrieLoader,
final BesuContext pluginContext,
final EvmConfiguration evmConfiguration,
final TrieLogPruner trieLogPruner) {
final EvmConfiguration evmConfiguration) {
this.cachedWorldStorageManager = new CachedWorldStorageManager(this, worldStateStorage);
// TODO: de-dup constructors
this.trieLogManager =
new TrieLogManager(
blockchain,
worldStateStorage,
maxLayersToLoad.orElse(RETAINED_LAYERS),
pluginContext,
trieLogPruner);
blockchain, worldStateStorage, maxLayersToLoad.orElse(RETAINED_LAYERS), pluginContext);
this.blockchain = blockchain;
this.worldStateStorage = worldStateStorage;
this.cachedMerkleTrieLoader = cachedMerkleTrieLoader;

@ -25,7 +25,7 @@ import java.util.Optional;
public class NoOpTrieLogManager extends TrieLogManager {
public NoOpTrieLogManager() {
super(null, null, 0, null, TrieLogPruner.noOpTrieLogPruner());
super(null, null, 0, null);
}
@Override

@ -47,19 +47,16 @@ public class TrieLogManager {
protected final Subscribers<TrieLogEvent.TrieLogObserver> trieLogObservers = Subscribers.create();
protected final TrieLogFactory trieLogFactory;
private final TrieLogPruner trieLogPruner;
public TrieLogManager(
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final long maxLayersToLoad,
final BesuContext pluginContext,
final TrieLogPruner trieLogPruner) {
final BesuContext pluginContext) {
this.blockchain = blockchain;
this.rootWorldStateStorage = worldStateStorage;
this.maxLayersToLoad = maxLayersToLoad;
this.trieLogFactory = setupTrieLogFactory(pluginContext);
this.trieLogPruner = trieLogPruner;
}
public synchronized void saveTrieLog(
@ -85,8 +82,6 @@ public class TrieLogManager {
} finally {
if (success) {
stateUpdater.commit();
trieLogPruner.addToPruneQueue(forBlockHeader.getNumber(), forBlockHeader.getBlockHash());
trieLogPruner.pruneFromQueue();
} else {
stateUpdater.rollback();
}

@ -20,10 +20,12 @@ import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.plugin.services.trielogs.TrieLogEvent;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import com.google.common.collect.ArrayListMultimap;
@ -33,7 +35,7 @@ import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TrieLogPruner {
public class TrieLogPruner implements TrieLogEvent.TrieLogObserver {
private static final Logger LOG = LoggerFactory.getLogger(TrieLogPruner.class);
@ -41,6 +43,7 @@ public class TrieLogPruner {
private final int loadingLimit;
private final BonsaiWorldStateKeyValueStorage rootWorldStateStorage;
private final Blockchain blockchain;
private final Consumer<Runnable> executeAsync;
private final long numBlocksToRetain;
private final boolean requireFinalizedBlock;
@ -50,11 +53,13 @@ public class TrieLogPruner {
public TrieLogPruner(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final Blockchain blockchain,
final Consumer<Runnable> executeAsync,
final long numBlocksToRetain,
final int pruningLimit,
final boolean requireFinalizedBlock) {
this.rootWorldStateStorage = rootWorldStateStorage;
this.blockchain = blockchain;
this.executeAsync = executeAsync;
this.numBlocksToRetain = numBlocksToRetain;
this.pruningLimit = pruningLimit;
this.loadingLimit = pruningLimit; // same as pruningLimit for now
@ -166,34 +171,18 @@ public class TrieLogPruner {
return wasPruned.size();
}
public static TrieLogPruner noOpTrieLogPruner() {
return new NoOpTrieLogPruner(null, null, 0, 0);
}
public static class NoOpTrieLogPruner extends TrieLogPruner {
private NoOpTrieLogPruner(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final Blockchain blockchain,
final long numBlocksToRetain,
final int pruningLimit) {
super(rootWorldStateStorage, blockchain, numBlocksToRetain, pruningLimit, true);
}
@Override
public int initialize() {
// no-op
return -1;
}
@Override
void addToPruneQueue(final long blockNumber, final Hash blockHash) {
// no-op
}
@Override
int pruneFromQueue() {
// no-op
return -1;
@Override
public void onTrieLogAdded(final TrieLogEvent event) {
if (TrieLogEvent.Type.ADDED.equals(event.getType())) {
final Hash blockHash = event.layer().getBlockHash();
final Optional<Long> blockNumber = event.layer().getBlockNumber();
blockNumber.ifPresent(
blockNum ->
executeAsync.accept(
() -> {
addToPruneQueue(blockNum, blockHash);
pruneFromQueue();
}));
}
}
}

@ -30,7 +30,6 @@ import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValue
import org.hyperledger.besu.ethereum.trie.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.forest.ForestWorldStateArchive;
import org.hyperledger.besu.ethereum.trie.forest.storage.ForestWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.forest.worldview.ForestMutableWorldState;
@ -113,8 +112,7 @@ public class InMemoryKeyValueStorageProvider extends KeyValueStorageProvider {
Optional.empty(),
cachedMerkleTrieLoader,
null,
evmConfiguration,
TrieLogPruner.noOpTrieLogPruner());
evmConfiguration);
}
public static MutableWorldState createInMemoryWorldState() {

@ -67,7 +67,6 @@ import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStorageProviderBuilder;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.ImmutableDataStorageConfiguration;
@ -163,8 +162,7 @@ public abstract class AbstractIsolationTests {
Optional.of(16L),
new CachedMerkleTrieLoader(new NoOpMetricsSystem()),
null,
EvmConfiguration.DEFAULT,
TrieLogPruner.noOpTrieLogPruner());
EvmConfiguration.DEFAULT);
var ws = archive.getMutable();
genesisState.writeStateTo(ws);
protocolContext = new ProtocolContext(blockchain, archive, null, Optional.empty());

@ -42,7 +42,6 @@ import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValu
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogFactoryImpl;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogLayer;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogManager;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.evm.internal.EvmConfiguration;
@ -127,8 +126,7 @@ class BonsaiWorldStateArchiveTest {
Optional.of(512L),
new CachedMerkleTrieLoader(new NoOpMetricsSystem()),
null,
EvmConfiguration.DEFAULT,
TrieLogPruner.noOpTrieLogPruner());
EvmConfiguration.DEFAULT);
final BlockHeader blockHeader = blockBuilder.number(0).buildHeader();
final BlockHeader chainHead = blockBuilder.number(512).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(chainHead);

@ -16,7 +16,6 @@
package org.hyperledger.besu.ethereum.trie.bonsai.trielog;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner.noOpTrieLogPruner;
import static org.mockito.Mockito.spy;
import org.hyperledger.besu.datatypes.Hash;
@ -57,9 +56,7 @@ class TrieLogManagerTests {
@BeforeEach
public void setup() {
trieLogManager =
new TrieLogManager(
blockchain, bonsaiWorldStateKeyValueStorage, 512, null, noOpTrieLogPruner());
trieLogManager = new TrieLogManager(blockchain, bonsaiWorldStateKeyValueStorage, 512, null);
}
@Test

@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.logging.log4j.Level;
@ -44,6 +45,7 @@ public class TrieLogPrunerTest {
private BonsaiWorldStateKeyValueStorage worldState;
private Blockchain blockchain;
private final Consumer<Runnable> executeAsync = Runnable::run;
@SuppressWarnings("BannedMethod")
@BeforeEach
@ -67,7 +69,8 @@ public class TrieLogPrunerTest {
when(blockchain.getBlockHeader(header2.getBlockHash())).thenReturn(Optional.empty());
// When
TrieLogPruner trieLogPruner = new TrieLogPruner(worldState, blockchain, 3, loadingLimit, false);
TrieLogPruner trieLogPruner =
new TrieLogPruner(worldState, blockchain, executeAsync, 3, loadingLimit, false);
trieLogPruner.initialize();
// Then
@ -86,7 +89,8 @@ public class TrieLogPrunerTest {
when(worldState.pruneTrieLog(any(Hash.class))).thenReturn(true);
// requireFinalizedBlock = false means this is not a PoS chain
TrieLogPruner trieLogPruner =
new TrieLogPruner(worldState, blockchain, blocksToRetain, pruningWindowSize, false);
new TrieLogPruner(
worldState, blockchain, executeAsync, blocksToRetain, pruningWindowSize, false);
trieLogPruner.addToPruneQueue(0, key(0)); // older block outside prune window
trieLogPruner.addToPruneQueue(1, key(1)); // block inside the prune window
@ -194,7 +198,8 @@ public class TrieLogPrunerTest {
final int pruningWindowSize = (int) chainHeight;
when(blockchain.getChainHeadBlockNumber()).thenReturn(chainHeight);
TrieLogPruner trieLogPruner =
new TrieLogPruner(worldState, blockchain, blocksToRetain, pruningWindowSize, true);
new TrieLogPruner(
worldState, blockchain, executeAsync, blocksToRetain, pruningWindowSize, true);
trieLogPruner.addToPruneQueue(1, key(1));
trieLogPruner.addToPruneQueue(2, key(2));
@ -228,6 +233,46 @@ public class TrieLogPrunerTest {
assertThat(trieLogPruner.pruneFromQueue()).isEqualTo(1);
}
@Test
public void onTrieLogAdded_should_prune() {
// Given
final TriggerableConsumer triggerableConsumer = new TriggerableConsumer();
TrieLogPruner trieLogPruner =
new TrieLogPruner(worldState, blockchain, triggerableConsumer, 0, 1, false);
assertThat(trieLogPruner.pruneFromQueue()).isEqualTo(0);
final TrieLogLayer layer = new TrieLogLayer();
layer.setBlockNumber(1L);
layer.setBlockHash(key(1));
when(blockchain.getChainHeadBlockNumber()).thenReturn(1L);
// When
trieLogPruner.onTrieLogAdded(new TrieLogAddedEvent(layer));
verify(worldState, never()).pruneTrieLog(key(1));
triggerableConsumer.run();
// Then
verify(worldState, times(1)).pruneTrieLog(key(1));
}
@Test
public void onTrieLogAdded_should_not_prune_when_no_blockNumber() {
// Given
TrieLogPruner trieLogPruner =
new TrieLogPruner(worldState, blockchain, executeAsync, 0, 1, false);
assertThat(trieLogPruner.pruneFromQueue()).isEqualTo(0);
final TrieLogLayer layer = new TrieLogLayer();
layer.setBlockHash(key(1));
when(blockchain.getChainHeadBlockNumber()).thenReturn(1L);
// When
trieLogPruner.onTrieLogAdded(new TrieLogAddedEvent(layer));
// Then
verify(worldState, never()).pruneTrieLog(key(1));
}
private TrieLogPruner setupPrunerAndFinalizedBlock(
final long configuredRetainHeight, final long finalizedBlockHeight) {
final long chainHeight = 5;
@ -241,7 +286,8 @@ public class TrieLogPrunerTest {
.thenReturn(Optional.of(finalizedHeader));
when(blockchain.getChainHeadBlockNumber()).thenReturn(chainHeight);
TrieLogPruner trieLogPruner =
new TrieLogPruner(worldState, blockchain, blocksToRetain, pruningWindowSize, true);
new TrieLogPruner(
worldState, blockchain, executeAsync, blocksToRetain, pruningWindowSize, true);
trieLogPruner.addToPruneQueue(1, key(1));
trieLogPruner.addToPruneQueue(2, key(2));
@ -255,4 +301,18 @@ public class TrieLogPrunerTest {
private Hash key(final int k) {
return Hash.hash(Bytes.of(k));
}
private static class TriggerableConsumer implements Consumer<Runnable> {
private Runnable runnable;
@Override
public void accept(final Runnable runnable) {
this.runnable = runnable;
}
public void run() {
runnable.run();
}
}
}

@ -142,8 +142,8 @@ public class EthScheduler {
txWorkerExecutor.execute(command);
}
public <T> CompletableFuture<T> scheduleServiceTask(final Supplier<T> task) {
return CompletableFuture.supplyAsync(task, servicesExecutor);
public void executeServiceTask(final Runnable command) {
servicesExecutor.execute(command);
}
public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {

Loading…
Cancel
Save