mirror of https://github.com/hyperledger/besu
remove v0 version of the database (#5698)
Signed-off-by: Karim TAAM <karim.t2am@gmail.com>pull/5701/head
parent
86b2b600a0
commit
9f42a3f261
Binary file not shown.
@ -1,223 +0,0 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.plugin.services.storage.rocksdb.unsegmented; |
||||
|
||||
import static java.util.stream.Collectors.toUnmodifiableSet; |
||||
|
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
import org.hyperledger.besu.plugin.services.exception.StorageException; |
||||
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; |
||||
import org.hyperledger.besu.services.kvstore.KeyValueStorageTransactionTransitionValidatorDecorator; |
||||
|
||||
import java.util.Optional; |
||||
import java.util.Set; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.function.Predicate; |
||||
import java.util.stream.Stream; |
||||
|
||||
import org.apache.commons.lang3.tuple.Pair; |
||||
import org.apache.tuweni.bytes.Bytes; |
||||
import org.rocksdb.BlockBasedTableConfig; |
||||
import org.rocksdb.LRUCache; |
||||
import org.rocksdb.OptimisticTransactionDB; |
||||
import org.rocksdb.Options; |
||||
import org.rocksdb.ReadOptions; |
||||
import org.rocksdb.RocksDBException; |
||||
import org.rocksdb.RocksIterator; |
||||
import org.rocksdb.Statistics; |
||||
import org.rocksdb.Status; |
||||
import org.rocksdb.WriteOptions; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** The Rocks db key value storage. */ |
||||
public class RocksDBKeyValueStorage implements KeyValueStorage { |
||||
|
||||
static { |
||||
RocksDbUtil.loadNativeLibrary(); |
||||
} |
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyValueStorage.class); |
||||
|
||||
private final Options options; |
||||
private final OptimisticTransactionDB db; |
||||
private final AtomicBoolean closed = new AtomicBoolean(false); |
||||
private final RocksDBMetrics rocksDBMetrics; |
||||
private final WriteOptions tryDeleteOptions = |
||||
new WriteOptions().setNoSlowdown(true).setIgnoreMissingColumnFamilies(true); |
||||
private final ReadOptions readOptions = new ReadOptions().setVerifyChecksums(false); |
||||
|
||||
/** |
||||
* Instantiates a new Rocks db key value storage. |
||||
* |
||||
* @param configuration the configuration |
||||
* @param metricsSystem the metrics system |
||||
* @param rocksDBMetricsFactory the rocks db metrics factory |
||||
*/ |
||||
public RocksDBKeyValueStorage( |
||||
final RocksDBConfiguration configuration, |
||||
final MetricsSystem metricsSystem, |
||||
final RocksDBMetricsFactory rocksDBMetricsFactory) { |
||||
|
||||
try { |
||||
final Statistics stats = new Statistics(); |
||||
options = |
||||
new Options() |
||||
.setCreateIfMissing(true) |
||||
.setMaxOpenFiles(configuration.getMaxOpenFiles()) |
||||
.setTableFormatConfig(createBlockBasedTableConfig(configuration)) |
||||
.setStatistics(stats); |
||||
options.getEnv().setBackgroundThreads(configuration.getBackgroundThreadCount()); |
||||
|
||||
db = OptimisticTransactionDB.open(options, configuration.getDatabaseDir().toString()); |
||||
rocksDBMetrics = rocksDBMetricsFactory.create(metricsSystem, configuration, db, stats); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void clear() throws StorageException { |
||||
throwIfClosed(); |
||||
|
||||
try (final RocksIterator rocksIterator = db.newIterator()) { |
||||
rocksIterator.seekToFirst(); |
||||
if (rocksIterator.isValid()) { |
||||
final byte[] firstKey = rocksIterator.key(); |
||||
rocksIterator.seekToLast(); |
||||
if (rocksIterator.isValid()) { |
||||
final byte[] lastKey = rocksIterator.key(); |
||||
db.deleteRange(firstKey, lastKey); |
||||
db.delete(lastKey); |
||||
} |
||||
} |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public boolean containsKey(final byte[] key) throws StorageException { |
||||
return get(key).isPresent(); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<byte[]> get(final byte[] key) throws StorageException { |
||||
throwIfClosed(); |
||||
|
||||
try (final OperationTimer.TimingContext ignored = |
||||
rocksDBMetrics.getReadLatency().startTimer()) { |
||||
return Optional.ofNullable(db.get(readOptions, key)); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) { |
||||
return stream() |
||||
.filter(pair -> returnCondition.test(pair.getKey())) |
||||
.map(Pair::getKey) |
||||
.collect(toUnmodifiableSet()); |
||||
} |
||||
|
||||
@Override |
||||
public Stream<Pair<byte[], byte[]>> stream() { |
||||
throwIfClosed(); |
||||
|
||||
final RocksIterator rocksIterator = db.newIterator(); |
||||
rocksIterator.seekToFirst(); |
||||
return RocksDbIterator.create(rocksIterator).toStream(); |
||||
} |
||||
|
||||
@Override |
||||
public Stream<Pair<byte[], byte[]>> streamFromKey(final byte[] startKey) { |
||||
return stream().filter(e -> Bytes.wrap(startKey).compareTo(Bytes.wrap(e.getKey())) <= 0); |
||||
} |
||||
|
||||
@Override |
||||
public Stream<byte[]> streamKeys() { |
||||
throwIfClosed(); |
||||
|
||||
final RocksIterator rocksIterator = db.newIterator(); |
||||
rocksIterator.seekToFirst(); |
||||
return RocksDbIterator.create(rocksIterator).toStreamKeys(); |
||||
} |
||||
|
||||
@Override |
||||
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) { |
||||
return stream() |
||||
.filter(pair -> returnCondition.test(pair.getKey())) |
||||
.map(Pair::getValue) |
||||
.collect(toUnmodifiableSet()); |
||||
} |
||||
|
||||
@Override |
||||
public boolean tryDelete(final byte[] key) { |
||||
throwIfClosed(); |
||||
try { |
||||
db.delete(tryDeleteOptions, key); |
||||
return true; |
||||
} catch (RocksDBException e) { |
||||
if (e.getStatus().getCode() == Status.Code.Incomplete) { |
||||
return false; |
||||
} else { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public KeyValueStorageTransaction startTransaction() throws StorageException { |
||||
throwIfClosed(); |
||||
final WriteOptions options = new WriteOptions(); |
||||
options.setIgnoreMissingColumnFamilies(true); |
||||
return new KeyValueStorageTransactionTransitionValidatorDecorator( |
||||
new RocksDBTransaction(db.beginTransaction(options), options, rocksDBMetrics)); |
||||
} |
||||
|
||||
@Override |
||||
public boolean isClosed() { |
||||
return closed.get(); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
if (closed.compareAndSet(false, true)) { |
||||
tryDeleteOptions.close(); |
||||
options.close(); |
||||
db.close(); |
||||
} |
||||
} |
||||
|
||||
private BlockBasedTableConfig createBlockBasedTableConfig(final RocksDBConfiguration config) { |
||||
final LRUCache cache = new LRUCache(config.getCacheCapacity()); |
||||
return new BlockBasedTableConfig().setBlockCache(cache); |
||||
} |
||||
|
||||
private void throwIfClosed() { |
||||
if (closed.get()) { |
||||
LOG.error("Attempting to use a closed RocksDBKeyValueStorage"); |
||||
throw new IllegalStateException("Storage has been closed"); |
||||
} |
||||
} |
||||
} |
@ -1,112 +0,0 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.plugin.services.storage.rocksdb.unsegmented; |
||||
|
||||
import org.hyperledger.besu.plugin.services.exception.StorageException; |
||||
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; |
||||
|
||||
import org.rocksdb.RocksDBException; |
||||
import org.rocksdb.Transaction; |
||||
import org.rocksdb.WriteOptions; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** The RocksDb transaction. */ |
||||
public class RocksDBTransaction implements KeyValueStorageTransaction { |
||||
private static final Logger logger = LoggerFactory.getLogger(RocksDBTransaction.class); |
||||
private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device"; |
||||
|
||||
private final RocksDBMetrics metrics; |
||||
private final Transaction innerTx; |
||||
private final WriteOptions options; |
||||
|
||||
/** |
||||
* Instantiates a new RocksDb transaction. |
||||
* |
||||
* @param innerTx the inner tx |
||||
* @param options the options |
||||
* @param metrics the metrics |
||||
*/ |
||||
RocksDBTransaction( |
||||
final Transaction innerTx, final WriteOptions options, final RocksDBMetrics metrics) { |
||||
this.innerTx = innerTx; |
||||
this.options = options; |
||||
this.metrics = metrics; |
||||
} |
||||
|
||||
@Override |
||||
public void put(final byte[] key, final byte[] value) { |
||||
try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) { |
||||
innerTx.put(key, value); |
||||
} catch (final RocksDBException e) { |
||||
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { |
||||
logger.error(e.getMessage()); |
||||
System.exit(0); |
||||
} |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void remove(final byte[] key) { |
||||
try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) { |
||||
innerTx.delete(key); |
||||
} catch (final RocksDBException e) { |
||||
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { |
||||
logger.error(e.getMessage()); |
||||
System.exit(0); |
||||
} |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void commit() throws StorageException { |
||||
try (final OperationTimer.TimingContext ignored = metrics.getCommitLatency().startTimer()) { |
||||
innerTx.commit(); |
||||
} catch (final RocksDBException e) { |
||||
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { |
||||
logger.error(e.getMessage()); |
||||
System.exit(0); |
||||
} |
||||
throw new StorageException(e); |
||||
} finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void rollback() { |
||||
try { |
||||
innerTx.rollback(); |
||||
metrics.getRollbackCount().inc(); |
||||
} catch (final RocksDBException e) { |
||||
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { |
||||
logger.error(e.getMessage()); |
||||
System.exit(0); |
||||
} |
||||
throw new StorageException(e); |
||||
} finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
private void close() { |
||||
innerTx.close(); |
||||
options.close(); |
||||
} |
||||
} |
@ -1,136 +0,0 @@ |
||||
/* |
||||
* Copyright Hyperledger Besu Contributors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.anyString; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.hyperledger.besu.kvstore.AbstractKeyValueStorageTest; |
||||
import org.hyperledger.besu.metrics.BesuMetricCategory; |
||||
import org.hyperledger.besu.metrics.ObservableMetricsSystem; |
||||
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; |
||||
import org.hyperledger.besu.plugin.services.metrics.Counter; |
||||
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; |
||||
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; |
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder; |
||||
import org.hyperledger.besu.plugin.services.storage.rocksdb.unsegmented.RocksDBKeyValueStorage; |
||||
|
||||
import java.nio.file.Path; |
||||
import java.util.function.LongSupplier; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.junit.jupiter.api.io.TempDir; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
|
||||
@ExtendWith(MockitoExtension.class) |
||||
public class RocksDBKeyValueStorageTest extends AbstractKeyValueStorageTest { |
||||
|
||||
@Mock private ObservableMetricsSystem metricsSystemMock; |
||||
@Mock private LabelledMetric<OperationTimer> labelledMetricOperationTimerMock; |
||||
@Mock private LabelledMetric<Counter> labelledMetricCounterMock; |
||||
@Mock private OperationTimer operationTimerMock; |
||||
@TempDir static Path folder; |
||||
|
||||
@Override |
||||
protected KeyValueStorage createStore() throws Exception { |
||||
return new RocksDBKeyValueStorage( |
||||
config(), new NoOpMetricsSystem(), RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS); |
||||
} |
||||
|
||||
@Test |
||||
public void createStoreMustCreateMetrics() throws Exception { |
||||
// Prepare mocks
|
||||
when(labelledMetricOperationTimerMock.labels(any())).thenReturn(operationTimerMock); |
||||
when(metricsSystemMock.createLabelledTimer( |
||||
eq(BesuMetricCategory.KVSTORE_ROCKSDB), anyString(), anyString(), any())) |
||||
.thenReturn(labelledMetricOperationTimerMock); |
||||
when(metricsSystemMock.createLabelledCounter( |
||||
eq(BesuMetricCategory.KVSTORE_ROCKSDB), anyString(), anyString(), any())) |
||||
.thenReturn(labelledMetricCounterMock); |
||||
// Prepare argument captors
|
||||
final ArgumentCaptor<String> labelledTimersMetricsNameArgs = |
||||
ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> labelledTimersHelpArgs = ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> labelledCountersMetricsNameArgs = |
||||
ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> labelledCountersHelpArgs = ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> longGaugesMetricsNameArgs = ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> longGaugesHelpArgs = ArgumentCaptor.forClass(String.class); |
||||
|
||||
// Actual call
|
||||
final KeyValueStorage keyValueStorage = |
||||
new RocksDBKeyValueStorage( |
||||
config(), metricsSystemMock, RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS); |
||||
|
||||
// Assertions
|
||||
assertThat(keyValueStorage).isNotNull(); |
||||
verify(metricsSystemMock, times(4)) |
||||
.createLabelledTimer( |
||||
eq(BesuMetricCategory.KVSTORE_ROCKSDB), |
||||
labelledTimersMetricsNameArgs.capture(), |
||||
labelledTimersHelpArgs.capture(), |
||||
any()); |
||||
assertThat(labelledTimersMetricsNameArgs.getAllValues()) |
||||
.containsExactly( |
||||
"read_latency_seconds", |
||||
"remove_latency_seconds", |
||||
"write_latency_seconds", |
||||
"commit_latency_seconds"); |
||||
assertThat(labelledTimersHelpArgs.getAllValues()) |
||||
.containsExactly( |
||||
"Latency for read from RocksDB.", |
||||
"Latency of remove requests from RocksDB.", |
||||
"Latency for write to RocksDB.", |
||||
"Latency for commits to RocksDB."); |
||||
|
||||
verify(metricsSystemMock, times(2)) |
||||
.createLongGauge( |
||||
eq(BesuMetricCategory.KVSTORE_ROCKSDB), |
||||
longGaugesMetricsNameArgs.capture(), |
||||
longGaugesHelpArgs.capture(), |
||||
any(LongSupplier.class)); |
||||
assertThat(longGaugesMetricsNameArgs.getAllValues()) |
||||
.containsExactly("rocks_db_table_readers_memory_bytes", "rocks_db_files_size_bytes"); |
||||
assertThat(longGaugesHelpArgs.getAllValues()) |
||||
.containsExactly( |
||||
"Estimated memory used for RocksDB index and filter blocks in bytes", |
||||
"Estimated database size in bytes"); |
||||
|
||||
verify(metricsSystemMock) |
||||
.createLabelledCounter( |
||||
eq(BesuMetricCategory.KVSTORE_ROCKSDB), |
||||
labelledCountersMetricsNameArgs.capture(), |
||||
labelledCountersHelpArgs.capture(), |
||||
any()); |
||||
assertThat(labelledCountersMetricsNameArgs.getValue()).isEqualTo("rollback_count"); |
||||
assertThat(labelledCountersHelpArgs.getValue()) |
||||
.isEqualTo("Number of RocksDB transactions rolled back."); |
||||
} |
||||
|
||||
private RocksDBConfiguration config() throws Exception { |
||||
return new RocksDBConfigurationBuilder().databaseDir(getTempSubFolder(folder)).build(); |
||||
} |
||||
} |
Loading…
Reference in new issue