Rocksdb plugin to support OptimisticTransactionDb and TransactionDb (#5328)

* Refactor to make RocksDBColumnarKeyValueStorage abstract and extend it to optimistic and pessimistic. Atm Forest has a concurrency level that does not cope well with the OptimisticTransactionDB and RocksDB ends up raising a lot of Busy when committing RocksDBTransactions.
A TransactionDB will do conflict checking for all write operations (Put, Delete and Merge), including writes performed outside a Transaction according to RocksDB. This does impact the times we see when syncing so likely not the final solution for Forest. 
Bonsai should continue using OptimisticTransactionDB.

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>

---------

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
pull/4427/head
Gabriel Fukushima 2 years ago committed by GitHub
parent 78f8efff7a
commit d54a1bf27a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 1
      plugins/rocksdb/build.gradle
  3. 86
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java
  4. 4
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBMetricsFactory.java
  5. 7
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDbSegmentIdentifier.java
  6. 99
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/OptimisticRocksDBColumnarKeyValueStorage.java
  7. 153
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java
  8. 91
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java
  9. 2
      plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactoryTest.java
  10. 54
      plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/OptimisticTransactionDBRocksDBColumnarKeyValueStorageTest.java
  11. 37
      plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorageTest.java
  12. 2
      plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBKeyValueStorageTest.java
  13. 54
      plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorageTest.java
  14. 32
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java
  15. 67
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SnappableSegmentedKeyValueStorageAdapter.java

@ -16,6 +16,7 @@
- Reference tests are upgraded to use v12.1 of the ethereum tests [#5343](https://github.com/hyperledger/besu/pull/5343)
- Add new sepolia bootnodes, which should improve peering in the testnet. [#5352](https://github.com/hyperledger/besu/pull/5352)
- Renamed --bonsai-maximum-back-layers-to-load option to --bonsai-historical-block-limit for clarity. Removed --Xbonsai-use-snapshots option as it is no longer functional [#5337](https://github.com/hyperledger/besu/pull/5337)
- Change Forest to use TransactionDB instead of OptimisticTransactionDB [#5328](https://github.com/hyperledger/besu/pull/5328)
### Bug Fixes

@ -47,6 +47,7 @@ dependencies {
implementation 'io.prometheus:simpleclient'
implementation 'org.apache.tuweni:tuweni-bytes'
implementation 'org.rocksdb:rocksdbjni'
implementation project(path: ':ethereum:core')
testImplementation project(':testutil')

@ -1,5 +1,5 @@
/*
* Copyright ConsenSys AG.
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@ -16,6 +16,7 @@ package org.hyperledger.besu.plugin.services.storage.rocksdb;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.plugin.services.BesuConfiguration;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
@ -26,9 +27,12 @@ import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.Databa
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.OptimisticRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.TransactionDBRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.unsegmented.RocksDBKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter;
import org.hyperledger.besu.services.kvstore.SnappableSegmentedKeyValueStorageAdapter;
import java.io.IOException;
import java.nio.file.Files;
@ -149,55 +153,71 @@ public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory {
final BesuConfiguration commonConfiguration,
final MetricsSystem metricsSystem)
throws StorageException {
final boolean isForestStorageFormat =
DataStorageFormat.FOREST.getDatabaseVersion() == commonConfiguration.getDatabaseVersion();
if (requiresInit()) {
init(commonConfiguration);
}
// It's probably a good idea for the creation logic to be entirely dependent on the database
// version. Introducing intermediate booleans that represent database properties and dispatching
// creation logic based on them is error prone.
// creation logic based on them is error-prone.
switch (databaseVersion) {
case 0:
{
segmentedStorage = null;
if (unsegmentedStorage == null) {
unsegmentedStorage =
new RocksDBKeyValueStorage(
rocksDBConfiguration, metricsSystem, rocksDBMetricsFactory);
}
return unsegmentedStorage;
case 0 -> {
segmentedStorage = null;
if (unsegmentedStorage == null) {
unsegmentedStorage =
new RocksDBKeyValueStorage(
rocksDBConfiguration, metricsSystem, rocksDBMetricsFactory);
}
case 1:
case 2:
{
unsegmentedStorage = null;
if (segmentedStorage == null) {
final List<SegmentIdentifier> segmentsForVersion =
segments.stream()
.filter(segmentId -> segmentId.includeInDatabaseVersion(databaseVersion))
.collect(Collectors.toList());
return unsegmentedStorage;
}
case 1, 2 -> {
unsegmentedStorage = null;
if (segmentedStorage == null) {
final List<SegmentIdentifier> segmentsForVersion =
segments.stream()
.filter(segmentId -> segmentId.includeInDatabaseVersion(databaseVersion))
.collect(Collectors.toList());
if (isForestStorageFormat) {
LOG.debug("FOREST mode detected, using TransactionDB.");
segmentedStorage =
new RocksDBColumnarKeyValueStorage(
new TransactionDBRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
} else {
LOG.debug("Using OptimisticTransactionDB.");
segmentedStorage =
new OptimisticRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
}
final RocksDbSegmentIdentifier rocksSegment =
segmentedStorage.getSegmentIdentifierByName(segment);
return new SegmentedKeyValueStorageAdapter<>(
segment, segmentedStorage, () -> segmentedStorage.takeSnapshot(rocksSegment));
}
default:
{
throw new IllegalStateException(
String.format(
"Developer error: A supported database version (%d) was detected but there is no associated creation logic.",
databaseVersion));
final RocksDbSegmentIdentifier rocksSegment =
segmentedStorage.getSegmentIdentifierByName(segment);
if (isForestStorageFormat) {
return new SegmentedKeyValueStorageAdapter<>(segment, segmentedStorage);
} else {
return new SnappableSegmentedKeyValueStorageAdapter<>(
segment,
segmentedStorage,
() ->
((OptimisticRocksDBColumnarKeyValueStorage) segmentedStorage)
.takeSnapshot(rocksSegment));
}
}
default -> throw new IllegalStateException(
String.format(
"Developer error: A supported database version (%d) was detected but there is no associated creation logic.",
databaseVersion));
}
}

@ -23,7 +23,7 @@ import org.hyperledger.besu.plugin.services.metrics.MetricCategory;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.slf4j.Logger;
@ -72,7 +72,7 @@ public class RocksDBMetricsFactory {
public RocksDBMetrics create(
final MetricsSystem metricsSystem,
final RocksDBConfiguration rocksDbConfiguration,
final OptimisticTransactionDB db,
final RocksDB db,
final Statistics stats) {
final OperationTimer readLatency =
metricsSystem

@ -21,13 +21,13 @@ import java.util.concurrent.atomic.AtomicReference;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
/** The RocksDb segment identifier. */
public class RocksDbSegmentIdentifier {
private final OptimisticTransactionDB db;
private final RocksDB db;
private final AtomicReference<ColumnFamilyHandle> reference;
/**
@ -36,8 +36,7 @@ public class RocksDbSegmentIdentifier {
* @param db the db
* @param columnFamilyHandle the column family handle
*/
public RocksDbSegmentIdentifier(
final OptimisticTransactionDB db, final ColumnFamilyHandle columnFamilyHandle) {
public RocksDbSegmentIdentifier(final RocksDB db, final ColumnFamilyHandle columnFamilyHandle) {
this.db = db;
this.reference = new AtomicReference<>(columnFamilyHandle);
}

@ -0,0 +1,99 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransactionTransitionValidatorDecorator;
import java.util.List;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
/** Optimistic RocksDB Columnar key value storage */
public class OptimisticRocksDBColumnarKeyValueStorage extends RocksDBColumnarKeyValueStorage {
private final OptimisticTransactionDB db;
/**
* Instantiates a new Rocks db columnar key value optimistic storage.
*
* @param configuration the configuration
* @param segments the segments
* @param ignorableSegments the ignorable segments
* @param metricsSystem the metrics system
* @param rocksDBMetricsFactory the rocks db metrics factory
* @throws StorageException the storage exception
*/
public OptimisticRocksDBColumnarKeyValueStorage(
final RocksDBConfiguration configuration,
final List<SegmentIdentifier> segments,
final List<SegmentIdentifier> ignorableSegments,
final MetricsSystem metricsSystem,
final RocksDBMetricsFactory rocksDBMetricsFactory)
throws StorageException {
super(configuration, segments, ignorableSegments, metricsSystem, rocksDBMetricsFactory);
try {
db =
OptimisticTransactionDB.open(
options, configuration.getDatabaseDir().toString(), columnDescriptors, columnHandles);
initMetrics();
initColumnHandler();
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
@Override
RocksDB getDB() {
return db;
}
/**
* Start a transaction
*
* @return the new transaction started
* @throws StorageException the storage exception
*/
@Override
public Transaction<RocksDbSegmentIdentifier> startTransaction() throws StorageException {
throwIfClosed();
final WriteOptions writeOptions = new WriteOptions();
writeOptions.setIgnoreMissingColumnFamilies(true);
return new SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<>(
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions));
}
/**
* Take snapshot RocksDb columnar key value snapshot.
*
* @param segment the segment
* @return the RocksDb columnar key value snapshot
* @throws StorageException the storage exception
*/
public RocksDBColumnarKeyValueSnapshot takeSnapshot(final RocksDbSegmentIdentifier segment)
throws StorageException {
throwIfClosed();
return new RocksDBColumnarKeyValueSnapshot(db, segment, metrics);
}
}

@ -1,5 +1,5 @@
/*
* Copyright ConsenSys AG.
* Copyright Hyperledger Besu Contributors..
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@ -28,7 +28,6 @@ import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdenti
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransactionTransitionValidatorDecorator;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -54,7 +53,6 @@ import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.LRUCache;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@ -68,48 +66,51 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** The RocksDb columnar key value storage. */
public class RocksDBColumnarKeyValueStorage
public abstract class RocksDBColumnarKeyValueStorage
implements SegmentedKeyValueStorage<RocksDbSegmentIdentifier> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class);
private static final String DEFAULT_COLUMN = "default";
static final String DEFAULT_COLUMN = "default";
private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device";
private static final int ROCKSDB_FORMAT_VERSION = 5;
private static final long ROCKSDB_BLOCK_SIZE = 32768;
private static final long ROCKSDB_BLOCKCACHE_SIZE_HIGH_SPEC = 1_073_741_824L;
private static final long ROCKSDB_MEMTABLE_SIZE_HIGH_SPEC = 1_073_741_824L;
/** RocksDb blockcache size when using the high spec option */
protected static final long ROCKSDB_BLOCKCACHE_SIZE_HIGH_SPEC = 1_073_741_824L;
/** RocksDb memtable size when using the high spec option */
protected static final long ROCKSDB_MEMTABLE_SIZE_HIGH_SPEC = 1_073_741_824L;
static {
RocksDbUtil.loadNativeLibrary();
}
private final DBOptions options;
private final TransactionDBOptions txOptions;
private final OptimisticTransactionDB db;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<String, RocksDbSegmentIdentifier> columnHandlesByName;
private final RocksDBMetrics metrics;
private final WriteOptions tryDeleteOptions =
new WriteOptions().setNoSlowdown(true).setIgnoreMissingColumnFamilies(true);
private final ReadOptions readOptions = new ReadOptions().setVerifyChecksums(false);
/**
* Instantiates a new RocksDb columnar key value storage.
*
* @param configuration the configuration
* @param segments the segments
* @param metricsSystem the metrics system
* @param rocksDBMetricsFactory the RocksDb metrics factory
* @throws StorageException the storage exception
*/
public RocksDBColumnarKeyValueStorage(
final RocksDBConfiguration configuration,
final List<SegmentIdentifier> segments,
final MetricsSystem metricsSystem,
final RocksDBMetricsFactory rocksDBMetricsFactory)
throws StorageException {
this(configuration, segments, List.of(), metricsSystem, rocksDBMetricsFactory);
}
private final MetricsSystem metricsSystem;
private final RocksDBMetricsFactory rocksDBMetricsFactory;
private final RocksDBConfiguration configuration;
private Map<Bytes, String> segmentsById;
/** RocksDB DB options */
protected DBOptions options;
/** RocksDb transactionDB options */
protected TransactionDBOptions txOptions;
/** RocksDb statistics */
protected final Statistics stats = new Statistics();
/** RocksDB metrics */
protected RocksDBMetrics metrics;
/** Map of the columns handles by name */
protected Map<String, RocksDbSegmentIdentifier> columnHandlesByName;
/** Column descriptors */
protected List<ColumnFamilyDescriptor> columnDescriptors;
/** Column handles */
protected List<ColumnFamilyHandle> columnHandles;
/** Trimmed segments */
protected List<SegmentIdentifier> trimmedSegments;
/**
* Instantiates a new Rocks db columnar key value storage.
@ -129,8 +130,13 @@ public class RocksDBColumnarKeyValueStorage
final RocksDBMetricsFactory rocksDBMetricsFactory)
throws StorageException {
try (final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions()) {
final List<SegmentIdentifier> trimmedSegments = new ArrayList<>(segments);
this.configuration = configuration;
this.metricsSystem = metricsSystem;
this.rocksDBMetricsFactory = rocksDBMetricsFactory;
try {
final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
trimmedSegments = new ArrayList<>(segments);
final List<byte[]> existingColumnFamilies =
RocksDB.listColumnFamilies(new Options(), configuration.getDatabaseDir().toString());
// Only ignore if not existed currently
@ -140,7 +146,7 @@ public class RocksDBColumnarKeyValueStorage
existingColumnFamilies.stream()
.noneMatch(existed -> Arrays.equals(existed, ignorableSegment.getId())))
.forEach(trimmedSegments::remove);
final List<ColumnFamilyDescriptor> columnDescriptors =
columnDescriptors =
trimmedSegments.stream()
.map(
segment ->
@ -184,33 +190,34 @@ public class RocksDBColumnarKeyValueStorage
}
txOptions = new TransactionDBOptions();
final List<ColumnFamilyHandle> columnHandles = new ArrayList<>(columnDescriptors.size());
db =
OptimisticTransactionDB.open(
options, configuration.getDatabaseDir().toString(), columnDescriptors, columnHandles);
metrics = rocksDBMetricsFactory.create(metricsSystem, configuration, db, stats);
final Map<Bytes, String> segmentsById =
trimmedSegments.stream()
.collect(
Collectors.toMap(
segment -> Bytes.wrap(segment.getId()), SegmentIdentifier::getName));
columnHandles = new ArrayList<>(columnDescriptors.size());
} catch (RocksDBException e) {
throw new StorageException(e);
}
}
final ImmutableMap.Builder<String, RocksDbSegmentIdentifier> builder = ImmutableMap.builder();
void initMetrics() {
metrics = rocksDBMetricsFactory.create(metricsSystem, configuration, getDB(), stats);
}
for (ColumnFamilyHandle columnHandle : columnHandles) {
final String segmentName =
requireNonNullElse(
segmentsById.get(Bytes.wrap(columnHandle.getName())), DEFAULT_COLUMN);
builder.put(segmentName, new RocksDbSegmentIdentifier(db, columnHandle));
}
columnHandlesByName = builder.build();
void initColumnHandler() throws RocksDBException {
} catch (final RocksDBException e) {
throw new StorageException(e);
segmentsById =
trimmedSegments.stream()
.collect(
Collectors.toMap(
segment -> Bytes.wrap(segment.getId()), SegmentIdentifier::getName));
final ImmutableMap.Builder<String, RocksDbSegmentIdentifier> builder = ImmutableMap.builder();
for (ColumnFamilyHandle columnHandle : columnHandles) {
final String segmentName =
requireNonNullElse(segmentsById.get(Bytes.wrap(columnHandle.getName())), DEFAULT_COLUMN);
builder.put(segmentName, new RocksDbSegmentIdentifier(getDB(), columnHandle));
}
columnHandlesByName = builder.build();
}
private BlockBasedTableConfig createBlockBasedTableConfig(final RocksDBConfiguration config) {
BlockBasedTableConfig createBlockBasedTableConfig(final RocksDBConfiguration config) {
if (config.isHighSpec()) return createBlockBasedTableConfigHighSpec();
else return createBlockBasedTableConfigDefault(config);
}
@ -249,44 +256,22 @@ public class RocksDBColumnarKeyValueStorage
throwIfClosed();
try (final OperationTimer.TimingContext ignored = metrics.getReadLatency().startTimer()) {
return Optional.ofNullable(db.get(segment.get(), readOptions, key));
return Optional.ofNullable(getDB().get(segment.get(), readOptions, key));
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
/**
* Take snapshot RocksDb columnar key value snapshot.
*
* @param segment the segment
* @return the RocksDb columnar key value snapshot
* @throws StorageException the storage exception
*/
public RocksDBColumnarKeyValueSnapshot takeSnapshot(final RocksDbSegmentIdentifier segment)
throws StorageException {
throwIfClosed();
return new RocksDBColumnarKeyValueSnapshot(db, segment, metrics);
}
@Override
public Transaction<RocksDbSegmentIdentifier> startTransaction() throws StorageException {
throwIfClosed();
final WriteOptions writeOptions = new WriteOptions();
writeOptions.setIgnoreMissingColumnFamilies(true);
return new SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<>(
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions));
}
@Override
public Stream<Pair<byte[], byte[]>> stream(final RocksDbSegmentIdentifier segmentHandle) {
final RocksIterator rocksIterator = db.newIterator(segmentHandle.get());
final RocksIterator rocksIterator = getDB().newIterator(segmentHandle.get());
rocksIterator.seekToFirst();
return RocksDbIterator.create(rocksIterator).toStream();
}
@Override
public Stream<byte[]> streamKeys(final RocksDbSegmentIdentifier segmentHandle) {
final RocksIterator rocksIterator = db.newIterator(segmentHandle.get());
final RocksIterator rocksIterator = getDB().newIterator(segmentHandle.get());
rocksIterator.seekToFirst();
return RocksDbIterator.create(rocksIterator).toStreamKeys();
}
@ -294,7 +279,7 @@ public class RocksDBColumnarKeyValueStorage
@Override
public boolean tryDelete(final RocksDbSegmentIdentifier segmentHandle, final byte[] key) {
try {
db.delete(segmentHandle.get(), tryDeleteOptions, key);
getDB().delete(segmentHandle.get(), tryDeleteOptions, key);
return true;
} catch (RocksDBException e) {
if (e.getStatus().getCode() == Status.Code.Incomplete) {
@ -341,18 +326,18 @@ public class RocksDBColumnarKeyValueStorage
columnHandlesByName.values().stream()
.map(RocksDbSegmentIdentifier::get)
.forEach(ColumnFamilyHandle::close);
db.close();
getDB().close();
}
}
private void throwIfClosed() {
void throwIfClosed() {
if (closed.get()) {
LOG.error("Attempting to use a closed RocksDbKeyValueStorage");
throw new IllegalStateException("Storage has been closed");
}
}
private class RocksDbTransaction implements Transaction<RocksDbSegmentIdentifier> {
class RocksDbTransaction implements Transaction<RocksDbSegmentIdentifier> {
private final org.rocksdb.Transaction innerTx;
private final WriteOptions options;
@ -430,4 +415,6 @@ public class RocksDBColumnarKeyValueStorage
options.close();
}
}
abstract RocksDB getDB();
}

@ -0,0 +1,91 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransactionTransitionValidatorDecorator;
import java.util.List;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.TransactionDB;
import org.rocksdb.WriteOptions;
/** TransactionDB RocksDB Columnar key value storage */
public class TransactionDBRocksDBColumnarKeyValueStorage extends RocksDBColumnarKeyValueStorage {
private final TransactionDB db;
/**
* The constructor of TransactionDBRocksDBColumnarKeyValueStorage
*
* @param configuration the RocksDB configuration
* @param segments the segments
* @param ignorableSegments the ignorable segments
* @param metricsSystem the metrics system
* @param rocksDBMetricsFactory the rocksdb metrics factory
* @throws StorageException the storage exception
*/
public TransactionDBRocksDBColumnarKeyValueStorage(
final RocksDBConfiguration configuration,
final List<SegmentIdentifier> segments,
final List<SegmentIdentifier> ignorableSegments,
final MetricsSystem metricsSystem,
final RocksDBMetricsFactory rocksDBMetricsFactory)
throws StorageException {
super(configuration, segments, ignorableSegments, metricsSystem, rocksDBMetricsFactory);
try {
db =
TransactionDB.open(
options,
txOptions,
configuration.getDatabaseDir().toString(),
columnDescriptors,
columnHandles);
initMetrics();
initColumnHandler();
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
@Override
RocksDB getDB() {
return db;
}
/**
* Start a transaction
*
* @return the new transaction started
* @throws StorageException the storage exception
*/
@Override
public Transaction<RocksDbSegmentIdentifier> startTransaction() throws StorageException {
throwIfClosed();
final WriteOptions writeOptions = new WriteOptions();
writeOptions.setIgnoreMissingColumnFamilies(true);
return new SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<>(
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions));
}
}

@ -1,5 +1,5 @@
/*
* Copyright ConsenSys AG.
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at

@ -0,0 +1,54 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
public class OptimisticTransactionDBRocksDBColumnarKeyValueStorageTest
extends RocksDBColumnarKeyValueStorageTest {
@Override
protected SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore()
throws Exception {
return new OptimisticRocksDBColumnarKeyValueStorage(
new RocksDBConfigurationBuilder().databaseDir(folder.newFolder().toPath()).build(),
Arrays.asList(TestSegment.FOO, TestSegment.BAR),
List.of(),
new NoOpMetricsSystem(),
RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS);
}
@Override
protected SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore(
final Path path,
final List<SegmentIdentifier> segments,
final List<SegmentIdentifier> ignorableSegments) {
return new OptimisticRocksDBColumnarKeyValueStorage(
new RocksDBConfigurationBuilder().databaseDir(path).build(),
segments,
ignorableSegments,
new NoOpMetricsSystem(),
RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS);
}
}

@ -1,5 +1,5 @@
/*
* Copyright ConsenSys AG.
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@ -12,23 +12,19 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.unsegmented;
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import org.hyperledger.besu.kvstore.AbstractKeyValueStorageTest;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage.Transaction;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter;
import org.hyperledger.besu.services.kvstore.SnappableSegmentedKeyValueStorageAdapter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
@ -43,7 +39,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageTest {
public abstract class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageTest {
@Rule public final TemporaryFolder folder = new TemporaryFolder();
@ -55,7 +51,7 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
final SegmentedKeyValueStorage<RocksDbSegmentIdentifier> store = createSegmentedStore();
RocksDbSegmentIdentifier segment = store.getSegmentIdentifierByName(TestSegment.FOO);
KeyValueStorage duplicateSegmentRef =
new SegmentedKeyValueStorageAdapter<>(TestSegment.FOO, store);
new SnappableSegmentedKeyValueStorageAdapter<>(TestSegment.FOO, store);
final Consumer<byte[]> insert =
value -> {
@ -286,29 +282,16 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
}
}
private SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore()
throws Exception {
return new RocksDBColumnarKeyValueStorage(
new RocksDBConfigurationBuilder().databaseDir(folder.newFolder().toPath()).build(),
Arrays.asList(TestSegment.FOO, TestSegment.BAR),
new NoOpMetricsSystem(),
RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS);
}
protected abstract SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore()
throws Exception;
private SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore(
protected abstract SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore(
final Path path,
final List<SegmentIdentifier> segments,
final List<SegmentIdentifier> ignorableSegments) {
return new RocksDBColumnarKeyValueStorage(
new RocksDBConfigurationBuilder().databaseDir(path).build(),
segments,
ignorableSegments,
new NoOpMetricsSystem(),
RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS);
}
final List<SegmentIdentifier> ignorableSegments);
@Override
protected KeyValueStorage createStore() throws Exception {
return new SegmentedKeyValueStorageAdapter<>(TestSegment.FOO, createSegmentedStore());
return new SnappableSegmentedKeyValueStorageAdapter<>(TestSegment.FOO, createSegmentedStore());
}
}

@ -1,5 +1,5 @@
/*
* Copyright ConsenSys AG.
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at

@ -0,0 +1,54 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
public class TransactionDBRocksDBColumnarKeyValueStorageTest
extends RocksDBColumnarKeyValueStorageTest {
@Override
protected SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore()
throws Exception {
return new TransactionDBRocksDBColumnarKeyValueStorage(
new RocksDBConfigurationBuilder().databaseDir(folder.newFolder().toPath()).build(),
Arrays.asList(TestSegment.FOO, TestSegment.BAR),
List.of(),
new NoOpMetricsSystem(),
RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS);
}
@Override
protected SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore(
final Path path,
final List<SegmentIdentifier> segments,
final List<SegmentIdentifier> ignorableSegments) {
return new TransactionDBRocksDBColumnarKeyValueStorage(
new RocksDBConfigurationBuilder().databaseDir(path).build(),
segments,
ignorableSegments,
new NoOpMetricsSystem(),
RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS);
}
}

@ -15,16 +15,14 @@
package org.hyperledger.besu.services.kvstore;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SnappableKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
@ -34,11 +32,10 @@ import org.apache.commons.lang3.tuple.Pair;
*
* @param <S> the type parameter
*/
public class SegmentedKeyValueStorageAdapter<S> implements SnappableKeyValueStorage {
public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
private final S segmentHandle;
private final SegmentedKeyValueStorage<S> storage;
private final Supplier<SnappedKeyValueStorage> snapshotSupplier;
/**
* Instantiates a new Segmented key value storage adapter.
@ -48,28 +45,8 @@ public class SegmentedKeyValueStorageAdapter<S> implements SnappableKeyValueStor
*/
public SegmentedKeyValueStorageAdapter(
final SegmentIdentifier segment, final SegmentedKeyValueStorage<S> storage) {
this(
segment,
storage,
() -> {
throw new UnsupportedOperationException("Snapshot not supported");
});
}
/**
* Instantiates a new Segmented key value storage adapter.
*
* @param segment the segment
* @param storage the storage
* @param snapshotSupplier the snapshot supplier
*/
public SegmentedKeyValueStorageAdapter(
final SegmentIdentifier segment,
final SegmentedKeyValueStorage<S> storage,
final Supplier<SnappedKeyValueStorage> snapshotSupplier) {
segmentHandle = storage.getSegmentIdentifierByName(segment);
this.storage = storage;
this.snapshotSupplier = snapshotSupplier;
}
@Override
@ -143,9 +120,4 @@ public class SegmentedKeyValueStorageAdapter<S> implements SnappableKeyValueStor
}
};
}
@Override
public SnappedKeyValueStorage takeSnapshot() {
return snapshotSupplier.get();
}
}

@ -0,0 +1,67 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.kvstore;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SnappableKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage;
import java.util.function.Supplier;
/**
* The type Segmented key value storage adapter.
*
* @param <S> the type parameter
*/
public class SnappableSegmentedKeyValueStorageAdapter<S> extends SegmentedKeyValueStorageAdapter<S>
implements SnappableKeyValueStorage {
private final Supplier<SnappedKeyValueStorage> snapshotSupplier;
/**
* Instantiates a new Segmented key value storage adapter.
*
* @param segment the segment
* @param storage the storage
*/
public SnappableSegmentedKeyValueStorageAdapter(
final SegmentIdentifier segment, final SegmentedKeyValueStorage<S> storage) {
this(
segment,
storage,
() -> {
throw new UnsupportedOperationException("Snapshot not supported");
});
}
/**
* Instantiates a new Segmented key value storage adapter.
*
* @param segment the segment
* @param storage the storage
* @param snapshotSupplier the snapshot supplier
*/
public SnappableSegmentedKeyValueStorageAdapter(
final SegmentIdentifier segment,
final SegmentedKeyValueStorage<S> storage,
final Supplier<SnappedKeyValueStorage> snapshotSupplier) {
super(segment, storage);
this.snapshotSupplier = snapshotSupplier;
}
@Override
public SnappedKeyValueStorage takeSnapshot() {
return snapshotSupplier.get();
}
}
Loading…
Cancel
Save