Add --X-trie-log subcommand (#6303)

* Add x-trie-log subcommand for one-off trie log backlog prune

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>

---------

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Co-authored-by: Simon Dudley <simon.dudley@consensys.net>
pull/6372/head
Gabriel Fukushima 11 months ago committed by GitHub
parent 428177f514
commit e51e042906
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/RocksDbUsageHelper.java
  2. 6
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/StorageSubCommand.java
  3. 361
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelper.java
  4. 147
      besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java
  5. 18
      besu/src/main/java/org/hyperledger/besu/controller/BesuController.java
  6. 3
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  7. 265
      besu/src/test/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelperTest.java
  8. 2
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/storage/BonsaiWorldStateKeyValueStorage.java
  9. 17
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogPruner.java

@ -34,17 +34,17 @@ public class RocksDbUsageHelper {
final RocksDB rocksdb, final ColumnFamilyHandle cfHandle, final PrintWriter out)
throws RocksDBException, NumberFormatException {
final String size = rocksdb.getProperty(cfHandle, "rocksdb.estimate-live-data-size");
final String numberOfKeys = rocksdb.getProperty(cfHandle, "rocksdb.estimate-num-keys");
boolean emptyColumnFamily = false;
if (!size.isEmpty() && !size.isBlank()) {
if (!size.isBlank() && !numberOfKeys.isBlank()) {
try {
final long sizeLong = Long.parseLong(size);
final long numberOfKeysLong = Long.parseLong(numberOfKeys);
final String totalSstFilesSize =
rocksdb.getProperty(cfHandle, "rocksdb.total-sst-files-size");
final long totalSstFilesSizeLong =
!totalSstFilesSize.isEmpty() && !totalSstFilesSize.isBlank()
? Long.parseLong(totalSstFilesSize)
: 0;
if (sizeLong == 0) {
!totalSstFilesSize.isBlank() ? Long.parseLong(totalSstFilesSize) : 0;
if (sizeLong == 0 && numberOfKeysLong == 0) {
emptyColumnFamily = true;
}

@ -45,7 +45,11 @@ import picocli.CommandLine.Spec;
description = "This command provides storage related actions.",
mixinStandardHelpOptions = true,
versionProvider = VersionProvider.class,
subcommands = {StorageSubCommand.RevertVariablesStorage.class, RocksDbSubCommand.class})
subcommands = {
StorageSubCommand.RevertVariablesStorage.class,
RocksDbSubCommand.class,
TrieLogSubCommand.class
})
public class StorageSubCommand implements Runnable {
/** The constant COMMAND_NAME. */

@ -0,0 +1,361 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.cli.subcommands.storage;
import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.controller.BesuController.DATABASE_PATH;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Helper class for counting and pruning trie logs */
public class TrieLogHelper {
private static final String TRIE_LOG_FILE = "trieLogsToRetain";
private static final long BATCH_SIZE = 20_000;
private static final int ROCKSDB_MAX_INSERTS_PER_TRANSACTION = 1000;
private static final Logger LOG = LoggerFactory.getLogger(TrieLogHelper.class);
static void prune(
final DataStorageConfiguration config,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final MutableBlockchain blockchain,
final Path dataDirectoryPath) {
final String batchFileNameBase =
dataDirectoryPath.resolve(DATABASE_PATH).resolve(TRIE_LOG_FILE).toString();
validatePruneConfiguration(config);
final long layersToRetain = config.getUnstable().getBonsaiTrieLogRetentionThreshold();
final long chainHeight = blockchain.getChainHeadBlockNumber();
final long lastBlockNumberToRetainTrieLogsFor = chainHeight - layersToRetain + 1;
if (!validPruneRequirements(blockchain, chainHeight, lastBlockNumberToRetainTrieLogsFor)) {
return;
}
final long numberOfBatches = calculateNumberofBatches(layersToRetain);
processTrieLogBatches(
rootWorldStateStorage,
blockchain,
chainHeight,
lastBlockNumberToRetainTrieLogsFor,
numberOfBatches,
batchFileNameBase);
if (rootWorldStateStorage.streamTrieLogKeys(layersToRetain).count() == layersToRetain) {
deleteFiles(batchFileNameBase, numberOfBatches);
LOG.info("Prune ran successfully. Enjoy some disk space back! \uD83D\uDE80");
} else {
LOG.error("Prune failed. Re-run the subcommand to load the trie logs from file.");
}
}
private static void processTrieLogBatches(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final MutableBlockchain blockchain,
final long chainHeight,
final long lastBlockNumberToRetainTrieLogsFor,
final long numberOfBatches,
final String batchFileNameBase) {
for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) {
final long firstBlockOfBatch = chainHeight - ((batchNumber - 1) * BATCH_SIZE);
final long lastBlockOfBatch =
Math.max(chainHeight - (batchNumber * BATCH_SIZE), lastBlockNumberToRetainTrieLogsFor);
final List<Hash> trieLogKeys =
getTrieLogKeysForBlocks(blockchain, firstBlockOfBatch, lastBlockOfBatch);
saveTrieLogBatches(batchFileNameBase, rootWorldStateStorage, batchNumber, trieLogKeys);
}
LOG.info("Clear trie logs...");
rootWorldStateStorage.clearTrieLog();
for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) {
restoreTrieLogBatches(rootWorldStateStorage, batchNumber, batchFileNameBase);
}
}
private static void saveTrieLogBatches(
final String batchFileNameBase,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final List<Hash> trieLogKeys) {
LOG.info("Saving trie logs to retain in file (batch {})...", batchNumber);
try {
saveTrieLogsInFile(trieLogKeys, rootWorldStateStorage, batchNumber, batchFileNameBase);
} catch (IOException e) {
LOG.error("Error saving trie logs to file: {}", e.getMessage());
throw new RuntimeException(e);
}
}
private static void restoreTrieLogBatches(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final String batchFileNameBase) {
try {
LOG.info("Restoring trie logs retained from batch {}...", batchNumber);
recreateTrieLogs(rootWorldStateStorage, batchNumber, batchFileNameBase);
} catch (IOException e) {
LOG.error("Error recreating trie logs from batch {}: {}", batchNumber, e.getMessage());
throw new RuntimeException(e);
}
}
private static void deleteFiles(final String batchFileNameBase, final long numberOfBatches) {
LOG.info("Deleting files...");
for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) {
File file = new File(batchFileNameBase + "-" + batchNumber);
if (file.exists()) {
file.delete();
}
}
}
private static List<Hash> getTrieLogKeysForBlocks(
final MutableBlockchain blockchain,
final long firstBlockOfBatch,
final long lastBlockOfBatch) {
final List<Hash> trieLogKeys = new ArrayList<>();
for (long i = firstBlockOfBatch; i >= lastBlockOfBatch; i--) {
final Optional<BlockHeader> header = blockchain.getBlockHeader(i);
header.ifPresentOrElse(
blockHeader -> trieLogKeys.add(blockHeader.getHash()),
() -> LOG.error("Error retrieving block"));
}
return trieLogKeys;
}
private static long calculateNumberofBatches(final long layersToRetain) {
return layersToRetain / BATCH_SIZE + ((layersToRetain % BATCH_SIZE == 0) ? 0 : 1);
}
private static boolean validPruneRequirements(
final MutableBlockchain blockchain,
final long chainHeight,
final long lastBlockNumberToRetainTrieLogsFor) {
if (lastBlockNumberToRetainTrieLogsFor < 0) {
throw new IllegalArgumentException(
"Trying to retain more trie logs than chain length ("
+ chainHeight
+ "), skipping pruning");
}
final Optional<Hash> finalizedBlockHash = blockchain.getFinalized();
if (finalizedBlockHash.isEmpty()) {
throw new RuntimeException("No finalized block present, can't safely run trie log prune");
} else {
final Hash finalizedHash = finalizedBlockHash.get();
final Optional<BlockHeader> finalizedBlockHeader = blockchain.getBlockHeader(finalizedHash);
if (finalizedBlockHeader.isPresent()
&& finalizedBlockHeader.get().getNumber() < lastBlockNumberToRetainTrieLogsFor) {
throw new IllegalArgumentException(
"Trying to prune more layers than the finalized block height, skipping pruning");
}
}
return true;
}
private static void recreateTrieLogs(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final String batchFileNameBase)
throws IOException {
// process in chunk to avoid OOM
IdentityHashMap<byte[], byte[]> trieLogsToRetain =
readTrieLogsFromFile(batchFileNameBase, batchNumber);
final int chunkSize = ROCKSDB_MAX_INSERTS_PER_TRANSACTION;
List<byte[]> keys = new ArrayList<>(trieLogsToRetain.keySet());
for (int startIndex = 0; startIndex < keys.size(); startIndex += chunkSize) {
processTransactionChunk(startIndex, chunkSize, keys, trieLogsToRetain, rootWorldStateStorage);
}
}
private static void processTransactionChunk(
final int startIndex,
final int chunkSize,
final List<byte[]> keys,
final IdentityHashMap<byte[], byte[]> trieLogsToRetain,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage) {
var updater = rootWorldStateStorage.updater();
int endIndex = Math.min(startIndex + chunkSize, keys.size());
for (int i = startIndex; i < endIndex; i++) {
byte[] key = keys.get(i);
byte[] value = trieLogsToRetain.get(key);
updater.getTrieLogStorageTransaction().put(key, value);
LOG.info("Key({}): {}", i, Bytes32.wrap(key).toShortHexString());
}
updater.getTrieLogStorageTransaction().commit();
}
private static void validatePruneConfiguration(final DataStorageConfiguration config) {
checkArgument(
config.getUnstable().getBonsaiTrieLogRetentionThreshold()
>= config.getBonsaiMaxLayersToLoad(),
String.format(
"--Xbonsai-trie-log-retention-threshold minimum value is %d",
config.getBonsaiMaxLayersToLoad()));
checkArgument(
config.getUnstable().getBonsaiTrieLogPruningLimit() > 0,
String.format(
"--Xbonsai-trie-log-pruning-limit=%d must be greater than 0",
config.getUnstable().getBonsaiTrieLogPruningLimit()));
checkArgument(
config.getUnstable().getBonsaiTrieLogPruningLimit()
> config.getUnstable().getBonsaiTrieLogRetentionThreshold(),
String.format(
"--Xbonsai-trie-log-pruning-limit=%d must greater than --Xbonsai-trie-log-retention-threshold=%d",
config.getUnstable().getBonsaiTrieLogPruningLimit(),
config.getUnstable().getBonsaiTrieLogRetentionThreshold()));
}
private static void saveTrieLogsInFile(
final List<Hash> trieLogsKeys,
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final long batchNumber,
final String batchFileNameBase)
throws IOException {
File file = new File(batchFileNameBase + "-" + batchNumber);
if (file.exists()) {
LOG.error("File already exists, skipping file creation");
return;
}
try (FileOutputStream fos = new FileOutputStream(file)) {
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(getTrieLogs(trieLogsKeys, rootWorldStateStorage));
} catch (IOException e) {
LOG.error(e.getMessage());
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
private static IdentityHashMap<byte[], byte[]> readTrieLogsFromFile(
final String batchFileNameBase, final long batchNumber) {
IdentityHashMap<byte[], byte[]> trieLogs;
try (FileInputStream fis = new FileInputStream(batchFileNameBase + "-" + batchNumber);
ObjectInputStream ois = new ObjectInputStream(fis)) {
trieLogs = (IdentityHashMap<byte[], byte[]>) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.getMessage());
throw new RuntimeException(e);
}
return trieLogs;
}
private static IdentityHashMap<byte[], byte[]> getTrieLogs(
final List<Hash> trieLogKeys, final BonsaiWorldStateKeyValueStorage rootWorldStateStorage) {
IdentityHashMap<byte[], byte[]> trieLogsToRetain = new IdentityHashMap<>();
LOG.info("Obtaining trielogs from db, this may take a few minutes...");
trieLogKeys.forEach(
hash ->
rootWorldStateStorage
.getTrieLog(hash)
.ifPresent(trieLog -> trieLogsToRetain.put(hash.toArrayUnsafe(), trieLog)));
return trieLogsToRetain;
}
static TrieLogCount getCount(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final int limit,
final Blockchain blockchain) {
final AtomicInteger total = new AtomicInteger();
final AtomicInteger canonicalCount = new AtomicInteger();
final AtomicInteger forkCount = new AtomicInteger();
final AtomicInteger orphanCount = new AtomicInteger();
rootWorldStateStorage
.streamTrieLogKeys(limit)
.map(Bytes32::wrap)
.map(Hash::wrap)
.forEach(
hash -> {
total.getAndIncrement();
blockchain
.getBlockHeader(hash)
.ifPresentOrElse(
(header) -> {
long number = header.getNumber();
final Optional<BlockHeader> headerByNumber =
blockchain.getBlockHeader(number);
if (headerByNumber.isPresent()
&& headerByNumber.get().getHash().equals(hash)) {
canonicalCount.getAndIncrement();
} else {
forkCount.getAndIncrement();
}
},
orphanCount::getAndIncrement);
});
return new TrieLogCount(total.get(), canonicalCount.get(), forkCount.get(), orphanCount.get());
}
static void printCount(final PrintWriter out, final TrieLogCount count) {
out.printf(
"trieLog count: %s\n - canonical count: %s\n - fork count: %s\n - orphaned count: %s\n",
count.total, count.canonicalCount, count.forkCount, count.orphanCount);
}
record TrieLogCount(int total, int canonicalCount, int forkCount, int orphanCount) {}
}

@ -0,0 +1,147 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.cli.subcommands.storage;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.cli.util.VersionProvider;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.ParentCommand;
/** The Trie Log subcommand. */
@Command(
name = "x-trie-log",
description = "Manipulate trie logs",
mixinStandardHelpOptions = true,
versionProvider = VersionProvider.class,
subcommands = {TrieLogSubCommand.CountTrieLog.class, TrieLogSubCommand.PruneTrieLog.class})
public class TrieLogSubCommand implements Runnable {
@SuppressWarnings("UnusedVariable")
@ParentCommand
private static StorageSubCommand parentCommand;
@SuppressWarnings("unused")
@CommandLine.Spec
private CommandLine.Model.CommandSpec spec; // Picocli injects reference to command spec
@Override
public void run() {
final PrintWriter out = spec.commandLine().getOut();
spec.commandLine().usage(out);
}
private static BesuController createBesuController() {
return parentCommand.parentCommand.buildController();
}
@Command(
name = "count",
description = "This command counts all the trie logs",
mixinStandardHelpOptions = true,
versionProvider = VersionProvider.class)
static class CountTrieLog implements Runnable {
@SuppressWarnings("unused")
@ParentCommand
private TrieLogSubCommand parentCommand;
@SuppressWarnings("unused")
@CommandLine.Spec
private CommandLine.Model.CommandSpec spec; // Picocli injects reference to command spec
@Override
public void run() {
TrieLogContext context = getTrieLogContext();
final PrintWriter out = spec.commandLine().getOut();
out.println("Counting trie logs...");
TrieLogHelper.printCount(
out,
TrieLogHelper.getCount(
context.rootWorldStateStorage, Integer.MAX_VALUE, context.blockchain));
}
}
@Command(
name = "prune",
description =
"This command prunes all trie log layers below the retention threshold, including orphaned trie logs.",
mixinStandardHelpOptions = true,
versionProvider = VersionProvider.class)
static class PruneTrieLog implements Runnable {
@SuppressWarnings("unused")
@ParentCommand
private TrieLogSubCommand parentCommand;
@SuppressWarnings("unused")
@CommandLine.Spec
private CommandLine.Model.CommandSpec spec; // Picocli injects reference to command spec
@Override
public void run() {
TrieLogContext context = getTrieLogContext();
final Path dataDirectoryPath =
Paths.get(
TrieLogSubCommand.parentCommand.parentCommand.dataDir().toAbsolutePath().toString());
TrieLogHelper.prune(
context.config(),
context.rootWorldStateStorage(),
context.blockchain(),
dataDirectoryPath);
}
}
record TrieLogContext(
DataStorageConfiguration config,
BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
MutableBlockchain blockchain) {}
private static TrieLogContext getTrieLogContext() {
Configurator.setLevel(LoggerFactory.getLogger(TrieLogPruner.class).getName(), Level.DEBUG);
checkNotNull(parentCommand);
BesuController besuController = createBesuController();
final DataStorageConfiguration config = besuController.getDataStorageConfiguration();
checkArgument(
DataStorageFormat.BONSAI.equals(config.getDataStorageFormat()),
"Subcommand only works with data-storage-format=BONSAI");
final StorageProvider storageProvider = besuController.getStorageProvider();
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage =
(BonsaiWorldStateKeyValueStorage)
storageProvider.createWorldStateStorage(DataStorageFormat.BONSAI);
final MutableBlockchain blockchain = besuController.getProtocolContext().getBlockchain();
return new TrieLogContext(config, rootWorldStateStorage, blockchain);
}
}

@ -37,6 +37,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import java.io.Closeable;
import java.io.IOException;
@ -77,6 +78,7 @@ public class BesuController implements java.io.Closeable {
private final SyncState syncState;
private final EthPeers ethPeers;
private final StorageProvider storageProvider;
private final DataStorageConfiguration dataStorageConfiguration;
/**
* Instantiates a new Besu controller.
@ -96,6 +98,9 @@ public class BesuController implements java.io.Closeable {
* @param nodeKey the node key
* @param closeables the closeables
* @param additionalPluginServices the additional plugin services
* @param ethPeers the eth peers
* @param storageProvider the storage provider
* @param dataStorageConfiguration the data storage configuration
*/
BesuController(
final ProtocolSchedule protocolSchedule,
@ -114,7 +119,8 @@ public class BesuController implements java.io.Closeable {
final List<Closeable> closeables,
final PluginServiceFactory additionalPluginServices,
final EthPeers ethPeers,
final StorageProvider storageProvider) {
final StorageProvider storageProvider,
final DataStorageConfiguration dataStorageConfiguration) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethProtocolManager = ethProtocolManager;
@ -132,6 +138,7 @@ public class BesuController implements java.io.Closeable {
this.additionalPluginServices = additionalPluginServices;
this.ethPeers = ethPeers;
this.storageProvider = storageProvider;
this.dataStorageConfiguration = dataStorageConfiguration;
}
/**
@ -293,6 +300,15 @@ public class BesuController implements java.io.Closeable {
return additionalPluginServices;
}
/**
* Gets data storage configuration.
*
* @return the data storage configuration
*/
public DataStorageConfiguration getDataStorageConfiguration() {
return dataStorageConfiguration;
}
/** The type Builder. */
public static class Builder {

@ -803,7 +803,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
closeables,
additionalPluginServices,
ethPeers,
storageProvider);
storageProvider,
dataStorageConfiguration);
}
/**

@ -0,0 +1,265 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.cli.subcommands.storage;
import static org.hyperledger.besu.ethereum.worldstate.DataStorageFormat.BONSAI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.ImmutableDataStorageConfiguration;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class TrieLogHelperTest {
private static final StorageProvider storageProvider = new InMemoryKeyValueStorageProvider();
private static BonsaiWorldStateKeyValueStorage inMemoryWorldState;
@Mock private MutableBlockchain blockchain;
@TempDir static Path dataDir;
Path test;
static BlockHeader blockHeader1;
static BlockHeader blockHeader2;
static BlockHeader blockHeader3;
static BlockHeader blockHeader4;
static BlockHeader blockHeader5;
@BeforeAll
public static void setup() throws IOException {
blockHeader1 = new BlockHeaderTestFixture().number(1).buildHeader();
blockHeader2 = new BlockHeaderTestFixture().number(2).buildHeader();
blockHeader3 = new BlockHeaderTestFixture().number(3).buildHeader();
blockHeader4 = new BlockHeaderTestFixture().number(4).buildHeader();
blockHeader5 = new BlockHeaderTestFixture().number(5).buildHeader();
inMemoryWorldState =
new BonsaiWorldStateKeyValueStorage(storageProvider, new NoOpMetricsSystem());
var updater = inMemoryWorldState.updater();
updater
.getTrieLogStorageTransaction()
.put(blockHeader1.getHash().toArrayUnsafe(), Bytes.fromHexString("0x01").toArrayUnsafe());
updater
.getTrieLogStorageTransaction()
.put(blockHeader2.getHash().toArrayUnsafe(), Bytes.fromHexString("0x02").toArrayUnsafe());
updater
.getTrieLogStorageTransaction()
.put(blockHeader3.getHash().toArrayUnsafe(), Bytes.fromHexString("0x03").toArrayUnsafe());
updater
.getTrieLogStorageTransaction()
.put(blockHeader4.getHash().toArrayUnsafe(), Bytes.fromHexString("0x04").toArrayUnsafe());
updater
.getTrieLogStorageTransaction()
.put(blockHeader5.getHash().toArrayUnsafe(), Bytes.fromHexString("0x05").toArrayUnsafe());
updater.getTrieLogStorageTransaction().commit();
}
@BeforeEach
void createDirectory() throws IOException {
Files.createDirectories(dataDir.resolve("database"));
}
@AfterEach
void deleteDirectory() throws IOException {
Files.deleteIfExists(dataDir.resolve("database"));
}
void mockBlockchainBase() {
when(blockchain.getChainHeadBlockNumber()).thenReturn(5L);
when(blockchain.getFinalized()).thenReturn(Optional.of(blockHeader3.getBlockHash()));
when(blockchain.getBlockHeader(any(Hash.class))).thenReturn(Optional.of(blockHeader3));
}
@Test
public void prune() {
DataStorageConfiguration dataStorageConfiguration =
ImmutableDataStorageConfiguration.builder()
.dataStorageFormat(BONSAI)
.bonsaiMaxLayersToLoad(2L)
.unstable(
ImmutableDataStorageConfiguration.Unstable.builder()
.bonsaiTrieLogRetentionThreshold(3)
.build()
.withBonsaiTrieLogRetentionThreshold(3))
.build();
mockBlockchainBase();
when(blockchain.getBlockHeader(5)).thenReturn(Optional.of(blockHeader5));
when(blockchain.getBlockHeader(4)).thenReturn(Optional.of(blockHeader4));
when(blockchain.getBlockHeader(3)).thenReturn(Optional.of(blockHeader3));
// assert trie logs that will be pruned exist before prune call
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader1.getHash()).get(),
Bytes.fromHexString("0x01").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader2.getHash()).get(),
Bytes.fromHexString("0x02").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader3.getHash()).get(),
Bytes.fromHexString("0x03").toArrayUnsafe());
TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir);
// assert pruned trie logs are not in the DB
assertEquals(inMemoryWorldState.getTrieLog(blockHeader1.getHash()), Optional.empty());
assertEquals(inMemoryWorldState.getTrieLog(blockHeader2.getHash()), Optional.empty());
// assert retained trie logs are in the DB
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader3.getHash()).get(),
Bytes.fromHexString("0x03").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader4.getHash()).get(),
Bytes.fromHexString("0x04").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader5.getHash()).get(),
Bytes.fromHexString("0x05").toArrayUnsafe());
}
@Test
public void cantPruneIfNoFinalizedIsFound() {
DataStorageConfiguration dataStorageConfiguration =
ImmutableDataStorageConfiguration.builder()
.dataStorageFormat(BONSAI)
.bonsaiMaxLayersToLoad(2L)
.unstable(
ImmutableDataStorageConfiguration.Unstable.builder()
.bonsaiTrieLogRetentionThreshold(2)
.build()
.withBonsaiTrieLogRetentionThreshold(2))
.build();
when(blockchain.getChainHeadBlockNumber()).thenReturn(5L);
when(blockchain.getFinalized()).thenReturn(Optional.empty());
assertThrows(
RuntimeException.class,
() ->
TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir));
}
@Test
public void cantPruneIfUserRetainsMoreLayerThanExistingChainLength() {
DataStorageConfiguration dataStorageConfiguration =
ImmutableDataStorageConfiguration.builder()
.dataStorageFormat(BONSAI)
.bonsaiMaxLayersToLoad(2L)
.unstable(
ImmutableDataStorageConfiguration.Unstable.builder()
.bonsaiTrieLogRetentionThreshold(10)
.build()
.withBonsaiTrieLogRetentionThreshold(10))
.build();
when(blockchain.getChainHeadBlockNumber()).thenReturn(5L);
assertThrows(
IllegalArgumentException.class,
() ->
TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir));
}
@Test
public void cantPruneIfUserRequiredFurtherThanFinalized() {
DataStorageConfiguration dataStorageConfiguration =
ImmutableDataStorageConfiguration.builder()
.dataStorageFormat(BONSAI)
.bonsaiMaxLayersToLoad(2L)
.unstable(
ImmutableDataStorageConfiguration.Unstable.builder()
.bonsaiTrieLogRetentionThreshold(2)
.build()
.withBonsaiTrieLogRetentionThreshold(2))
.build();
mockBlockchainBase();
assertThrows(
IllegalArgumentException.class,
() ->
TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir));
}
@Test
public void exceptionWhileSavingFileStopsPruneProcess() throws IOException {
Files.delete(dataDir.resolve("database"));
DataStorageConfiguration dataStorageConfiguration =
ImmutableDataStorageConfiguration.builder()
.dataStorageFormat(BONSAI)
.bonsaiMaxLayersToLoad(2L)
.unstable(
ImmutableDataStorageConfiguration.Unstable.builder()
.bonsaiTrieLogRetentionThreshold(2)
.build()
.withBonsaiTrieLogRetentionThreshold(2))
.build();
assertThrows(
RuntimeException.class,
() ->
TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir));
// assert all trie logs are still in the DB
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader1.getHash()).get(),
Bytes.fromHexString("0x01").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader2.getHash()).get(),
Bytes.fromHexString("0x02").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader3.getHash()).get(),
Bytes.fromHexString("0x03").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader4.getHash()).get(),
Bytes.fromHexString("0x04").toArrayUnsafe());
assertArrayEquals(
inMemoryWorldState.getTrieLog(blockHeader5.getHash()).get(),
Bytes.fromHexString("0x05").toArrayUnsafe());
}
}

@ -204,7 +204,7 @@ public class BonsaiWorldStateKeyValueStorage implements WorldStateStorage, AutoC
return trieLogStorage.get(blockHash.toArrayUnsafe());
}
public Stream<byte[]> streamTrieLogKeys(final int limit) {
public Stream<byte[]> streamTrieLogKeys(final long limit) {
return trieLogStorage.streamKeys().limit(limit);
}

@ -61,33 +61,37 @@ public class TrieLogPruner {
this.requireFinalizedBlock = requireFinalizedBlock;
}
public void initialize() {
preloadQueue();
public int initialize() {
return preloadQueue();
}
private void preloadQueue() {
private int preloadQueue() {
LOG.atInfo()
.setMessage("Loading first {} trie logs from database...")
.addArgument(loadingLimit)
.log();
try (final Stream<byte[]> trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) {
final AtomicLong count = new AtomicLong();
final AtomicLong orphansPruned = new AtomicLong();
trieLogKeys.forEach(
blockHashAsBytes -> {
final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes));
final Optional<BlockHeader> header = blockchain.getBlockHeader(blockHash);
if (header.isPresent()) {
trieLogBlocksAndForksByDescendingBlockNumber.put(header.get().getNumber(), blockHash);
addToPruneQueue(header.get().getNumber(), blockHash);
count.getAndIncrement();
} else {
// prune orphaned blocks (sometimes created during block production)
rootWorldStateStorage.pruneTrieLog(blockHash);
orphansPruned.getAndIncrement();
}
});
LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue());
LOG.atInfo().log("Loaded {} trie logs from database", count);
pruneFromQueue();
return pruneFromQueue() + orphansPruned.intValue();
} catch (Exception e) {
LOG.error("Error loading trie logs from database, nothing pruned", e);
return 0;
}
}
@ -176,8 +180,9 @@ public class TrieLogPruner {
}
@Override
public void initialize() {
public int initialize() {
// no-op
return -1;
}
@Override

Loading…
Cancel
Save