mirror of https://github.com/hyperledger/besu
[PAN-2871] Columnar rocksdb (#1599)
* Columnated storage to allow for iteration over world state * change MetricsCategory to PantheonMetricsCategory * consistency renaming of kvstores Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>pull/2/head
parent
ee20ed426d
commit
d4a016798e
@ -0,0 +1,326 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import tech.pegasys.pantheon.metrics.Counter; |
||||
import tech.pegasys.pantheon.metrics.MetricsSystem; |
||||
import tech.pegasys.pantheon.metrics.OperationTimer; |
||||
import tech.pegasys.pantheon.metrics.PantheonMetricCategory; |
||||
import tech.pegasys.pantheon.metrics.prometheus.PrometheusMetricsSystem; |
||||
import tech.pegasys.pantheon.metrics.rocksdb.RocksDBStats; |
||||
import tech.pegasys.pantheon.services.util.RocksDbUtil; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.io.Closeable; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.function.Predicate; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import com.google.common.collect.ImmutableMap; |
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
import org.rocksdb.ColumnFamilyDescriptor; |
||||
import org.rocksdb.ColumnFamilyHandle; |
||||
import org.rocksdb.ColumnFamilyOptions; |
||||
import org.rocksdb.DBOptions; |
||||
import org.rocksdb.Env; |
||||
import org.rocksdb.RocksDBException; |
||||
import org.rocksdb.RocksIterator; |
||||
import org.rocksdb.Statistics; |
||||
import org.rocksdb.TransactionDB; |
||||
import org.rocksdb.TransactionDBOptions; |
||||
import org.rocksdb.WriteOptions; |
||||
|
||||
public class ColumnarRocksDbKeyValueStorage |
||||
implements SegmentedKeyValueStorage<ColumnFamilyHandle>, Closeable { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private static final String DEFAULT_COLUMN = "default"; |
||||
|
||||
private final DBOptions options; |
||||
private final TransactionDBOptions txOptions; |
||||
private final TransactionDB db; |
||||
private final AtomicBoolean closed = new AtomicBoolean(false); |
||||
|
||||
private final OperationTimer readLatency; |
||||
private final OperationTimer removeLatency; |
||||
private final OperationTimer writeLatency; |
||||
private final OperationTimer commitLatency; |
||||
private final Counter rollbackCount; |
||||
private final Statistics stats; |
||||
private final Map<String, ColumnFamilyHandle> columnHandlesByName; |
||||
|
||||
public static ColumnarRocksDbKeyValueStorage create( |
||||
final RocksDbConfiguration rocksDbConfiguration, |
||||
final List<Segment> segments, |
||||
final MetricsSystem metricsSystem) |
||||
throws StorageException { |
||||
return new ColumnarRocksDbKeyValueStorage(rocksDbConfiguration, segments, metricsSystem); |
||||
} |
||||
|
||||
private ColumnarRocksDbKeyValueStorage( |
||||
final RocksDbConfiguration rocksDbConfiguration, |
||||
final List<Segment> segments, |
||||
final MetricsSystem metricsSystem) { |
||||
RocksDbUtil.loadNativeLibrary(); |
||||
try { |
||||
final List<ColumnFamilyDescriptor> columnDescriptors = |
||||
segments.stream() |
||||
.map(segment -> new ColumnFamilyDescriptor(segment.getId())) |
||||
.collect(Collectors.toList()); |
||||
columnDescriptors.add( |
||||
new ColumnFamilyDescriptor( |
||||
DEFAULT_COLUMN.getBytes(StandardCharsets.UTF_8), |
||||
new ColumnFamilyOptions() |
||||
.setTableFormatConfig(rocksDbConfiguration.getBlockBasedTableConfig()))); |
||||
|
||||
stats = new Statistics(); |
||||
options = |
||||
new DBOptions() |
||||
.setCreateIfMissing(true) |
||||
.setMaxOpenFiles(rocksDbConfiguration.getMaxOpenFiles()) |
||||
.setMaxBackgroundCompactions(rocksDbConfiguration.getMaxBackgroundCompactions()) |
||||
.setStatistics(stats) |
||||
.setCreateMissingColumnFamilies(true) |
||||
.setEnv( |
||||
Env.getDefault() |
||||
.setBackgroundThreads(rocksDbConfiguration.getBackgroundThreadCount())); |
||||
|
||||
txOptions = new TransactionDBOptions(); |
||||
final List<ColumnFamilyHandle> columnHandles = new ArrayList<>(columnDescriptors.size()); |
||||
db = |
||||
TransactionDB.open( |
||||
options, |
||||
txOptions, |
||||
rocksDbConfiguration.getDatabaseDir().toString(), |
||||
columnDescriptors, |
||||
columnHandles); |
||||
|
||||
final Map<BytesValue, String> segmentsById = |
||||
segments.stream() |
||||
.collect( |
||||
Collectors.toMap(segment -> BytesValue.wrap(segment.getId()), Segment::getName)); |
||||
|
||||
final ImmutableMap.Builder<String, ColumnFamilyHandle> builder = ImmutableMap.builder(); |
||||
for (final ColumnFamilyHandle columnHandle : columnHandles) { |
||||
final String segmentName = segmentsById.get(BytesValue.wrap(columnHandle.getName())); |
||||
if (segmentName != null) { |
||||
builder.put(segmentName, columnHandle); |
||||
} else { |
||||
builder.put(DEFAULT_COLUMN, columnHandle); |
||||
} |
||||
} |
||||
columnHandlesByName = builder.build(); |
||||
|
||||
readLatency = |
||||
metricsSystem |
||||
.createLabelledTimer( |
||||
PantheonMetricCategory.KVSTORE_ROCKSDB, |
||||
"read_latency_seconds", |
||||
"Latency for read from RocksDB.", |
||||
"database") |
||||
.labels(rocksDbConfiguration.getLabel()); |
||||
removeLatency = |
||||
metricsSystem |
||||
.createLabelledTimer( |
||||
PantheonMetricCategory.KVSTORE_ROCKSDB, |
||||
"remove_latency_seconds", |
||||
"Latency of remove requests from RocksDB.", |
||||
"database") |
||||
.labels(rocksDbConfiguration.getLabel()); |
||||
writeLatency = |
||||
metricsSystem |
||||
.createLabelledTimer( |
||||
PantheonMetricCategory.KVSTORE_ROCKSDB, |
||||
"write_latency_seconds", |
||||
"Latency for write to RocksDB.", |
||||
"database") |
||||
.labels(rocksDbConfiguration.getLabel()); |
||||
commitLatency = |
||||
metricsSystem |
||||
.createLabelledTimer( |
||||
PantheonMetricCategory.KVSTORE_ROCKSDB, |
||||
"commit_latency_seconds", |
||||
"Latency for commits to RocksDB.", |
||||
"database") |
||||
.labels(rocksDbConfiguration.getLabel()); |
||||
|
||||
if (metricsSystem instanceof PrometheusMetricsSystem) { |
||||
RocksDBStats.registerRocksDBMetrics(stats, (PrometheusMetricsSystem) metricsSystem); |
||||
} |
||||
|
||||
metricsSystem.createLongGauge( |
||||
PantheonMetricCategory.KVSTORE_ROCKSDB, |
||||
"rocks_db_table_readers_memory_bytes", |
||||
"Estimated memory used for RocksDB index and filter blocks in bytes", |
||||
() -> { |
||||
try { |
||||
return db.getLongProperty("rocksdb.estimate-table-readers-mem"); |
||||
} catch (final RocksDBException e) { |
||||
LOG.debug("Failed to get RocksDB metric", e); |
||||
return 0L; |
||||
} |
||||
}); |
||||
|
||||
rollbackCount = |
||||
metricsSystem |
||||
.createLabelledCounter( |
||||
PantheonMetricCategory.KVSTORE_ROCKSDB, |
||||
"rollback_count", |
||||
"Number of RocksDB transactions rolled back.", |
||||
"database") |
||||
.labels(rocksDbConfiguration.getLabel()); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public ColumnFamilyHandle getSegmentIdentifierByName(final Segment segment) { |
||||
return columnHandlesByName.get(segment.getName()); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<BytesValue> get(final ColumnFamilyHandle segment, final BytesValue key) |
||||
throws StorageException { |
||||
throwIfClosed(); |
||||
|
||||
try (final OperationTimer.TimingContext ignored = readLatency.startTimer()) { |
||||
return Optional.ofNullable(db.get(segment, key.getArrayUnsafe())).map(BytesValue::wrap); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public Transaction<ColumnFamilyHandle> startTransaction() throws StorageException { |
||||
throwIfClosed(); |
||||
final WriteOptions options = new WriteOptions(); |
||||
return new RocksDbTransaction(db.beginTransaction(options), options); |
||||
} |
||||
|
||||
@Override |
||||
public long removeUnless( |
||||
final ColumnFamilyHandle segmentHandle, final Predicate<BytesValue> inUseCheck) { |
||||
long removedNodeCounter = 0; |
||||
try (final RocksIterator rocksIterator = db.newIterator(segmentHandle)) { |
||||
rocksIterator.seekToFirst(); |
||||
while (rocksIterator.isValid()) { |
||||
final byte[] key = rocksIterator.key(); |
||||
if (!inUseCheck.test(BytesValue.wrap(key))) { |
||||
removedNodeCounter++; |
||||
db.delete(segmentHandle, key); |
||||
} |
||||
rocksIterator.next(); |
||||
} |
||||
} catch (final RocksDBException e) { |
||||
throw new KeyValueStorage.StorageException(e); |
||||
} |
||||
return removedNodeCounter; |
||||
} |
||||
|
||||
@Override |
||||
public void clear(final ColumnFamilyHandle segmentHandle) { |
||||
try (final RocksIterator rocksIterator = db.newIterator(segmentHandle)) { |
||||
rocksIterator.seekToFirst(); |
||||
if (rocksIterator.isValid()) { |
||||
final byte[] firstKey = rocksIterator.key(); |
||||
rocksIterator.seekToLast(); |
||||
if (rocksIterator.isValid()) { |
||||
db.deleteRange(segmentHandle, firstKey, rocksIterator.key()); |
||||
} |
||||
} |
||||
} catch (final RocksDBException e) { |
||||
throw new KeyValueStorage.StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
if (closed.compareAndSet(false, true)) { |
||||
txOptions.close(); |
||||
options.close(); |
||||
columnHandlesByName.values().forEach(ColumnFamilyHandle::close); |
||||
db.close(); |
||||
} |
||||
} |
||||
|
||||
private void throwIfClosed() { |
||||
if (closed.get()) { |
||||
LOG.error("Attempting to use a closed RocksDbKeyValueStorage"); |
||||
throw new IllegalStateException("Storage has been closed"); |
||||
} |
||||
} |
||||
|
||||
private class RocksDbTransaction extends AbstractTransaction<ColumnFamilyHandle> { |
||||
private final org.rocksdb.Transaction innerTx; |
||||
private final WriteOptions options; |
||||
|
||||
RocksDbTransaction(final org.rocksdb.Transaction innerTx, final WriteOptions options) { |
||||
this.innerTx = innerTx; |
||||
this.options = options; |
||||
} |
||||
|
||||
@Override |
||||
protected void doPut( |
||||
final ColumnFamilyHandle segment, final BytesValue key, final BytesValue value) { |
||||
try (final OperationTimer.TimingContext ignored = writeLatency.startTimer()) { |
||||
innerTx.put(segment, key.getArrayUnsafe(), value.getArrayUnsafe()); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected void doRemove(final ColumnFamilyHandle segment, final BytesValue key) { |
||||
try (final OperationTimer.TimingContext ignored = removeLatency.startTimer()) { |
||||
innerTx.delete(segment, key.getArrayUnsafe()); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected void doCommit() throws StorageException { |
||||
try (final OperationTimer.TimingContext ignored = commitLatency.startTimer()) { |
||||
innerTx.commit(); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected void doRollback() { |
||||
try { |
||||
innerTx.rollback(); |
||||
rollbackCount.inc(); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
private void close() { |
||||
innerTx.close(); |
||||
options.close(); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,143 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import static com.google.common.base.Preconditions.checkState; |
||||
|
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.io.Closeable; |
||||
import java.util.Optional; |
||||
import java.util.function.Predicate; |
||||
|
||||
/** |
||||
* Service provided by pantheon to facilitate persistent data storage. |
||||
* |
||||
* @param <S> the segment identifier type |
||||
*/ |
||||
public interface SegmentedKeyValueStorage<S> extends Closeable { |
||||
|
||||
S getSegmentIdentifierByName(Segment segment); |
||||
|
||||
/** |
||||
* @param segment the segment |
||||
* @param key Index into persistent data repository. |
||||
* @return The value persisted at the key index. |
||||
*/ |
||||
Optional<BytesValue> get(S segment, BytesValue key) throws StorageException; |
||||
|
||||
default boolean containsKey(final S segment, final BytesValue key) throws StorageException { |
||||
return get(segment, key).isPresent(); |
||||
} |
||||
|
||||
/** |
||||
* Begins a transaction. Returns a transaction object that can be updated and committed. |
||||
* |
||||
* @return An object representing the transaction. |
||||
*/ |
||||
Transaction<S> startTransaction() throws StorageException; |
||||
|
||||
long removeUnless(S segmentHandle, Predicate<BytesValue> inUseCheck); |
||||
|
||||
void clear(S segmentHandle); |
||||
|
||||
class StorageException extends RuntimeException { |
||||
public StorageException(final Throwable t) { |
||||
super(t); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Represents a set of changes to be committed atomically. A single transaction is not |
||||
* thread-safe, but multiple transactions can execute concurrently. |
||||
* |
||||
* @param <S> the segment identifier type |
||||
*/ |
||||
interface Transaction<S> { |
||||
|
||||
/** |
||||
* Add the given key-value pair to the set of updates to be committed. |
||||
* |
||||
* @param segment the database segment |
||||
* @param key The key to set / modify. |
||||
* @param value The value to be set. |
||||
*/ |
||||
void put(S segment, BytesValue key, BytesValue value); |
||||
|
||||
/** |
||||
* Schedules the given key to be deleted from storage. |
||||
* |
||||
* @param segment the database segment |
||||
* @param key The key to delete |
||||
*/ |
||||
void remove(S segment, BytesValue key); |
||||
|
||||
/** |
||||
* Atomically commit the set of changes contained in this transaction to the underlying |
||||
* key-value storage from which this transaction was started. After committing, the transaction |
||||
* is no longer usable and will throw exceptions if modifications are attempted. |
||||
*/ |
||||
void commit() throws StorageException; |
||||
|
||||
/** |
||||
* Cancel this transaction. After rolling back, the transaction is no longer usable and will |
||||
* throw exceptions if modifications are attempted. |
||||
*/ |
||||
void rollback(); |
||||
} |
||||
|
||||
interface Segment { |
||||
String getName(); |
||||
|
||||
byte[] getId(); |
||||
} |
||||
|
||||
abstract class AbstractTransaction<S> implements Transaction<S> { |
||||
|
||||
private boolean active = true; |
||||
|
||||
@Override |
||||
public final void put(final S segment, final BytesValue key, final BytesValue value) { |
||||
checkState(active, "Cannot invoke put() on a completed transaction."); |
||||
doPut(segment, key, value); |
||||
} |
||||
|
||||
@Override |
||||
public final void remove(final S segment, final BytesValue key) { |
||||
checkState(active, "Cannot invoke remove() on a completed transaction."); |
||||
doRemove(segment, key); |
||||
} |
||||
|
||||
@Override |
||||
public final void commit() throws StorageException { |
||||
checkState(active, "Cannot commit a completed transaction."); |
||||
active = false; |
||||
doCommit(); |
||||
} |
||||
|
||||
@Override |
||||
public final void rollback() { |
||||
checkState(active, "Cannot rollback a completed transaction."); |
||||
active = false; |
||||
doRollback(); |
||||
} |
||||
|
||||
protected abstract void doPut(S segment, BytesValue key, BytesValue value); |
||||
|
||||
protected abstract void doRemove(S segment, BytesValue key); |
||||
|
||||
protected abstract void doCommit() throws StorageException; |
||||
|
||||
protected abstract void doRollback(); |
||||
} |
||||
} |
@ -0,0 +1,82 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage.Segment; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.Optional; |
||||
import java.util.function.Predicate; |
||||
|
||||
public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage { |
||||
private final S segmentHandle; |
||||
private final SegmentedKeyValueStorage<S> storage; |
||||
|
||||
public SegmentedKeyValueStorageAdapter( |
||||
final Segment segment, final SegmentedKeyValueStorage<S> storage) { |
||||
this.segmentHandle = storage.getSegmentIdentifierByName(segment); |
||||
this.storage = storage; |
||||
} |
||||
|
||||
@Override |
||||
public Optional<BytesValue> get(final BytesValue key) throws StorageException { |
||||
return storage.get(segmentHandle, key); |
||||
} |
||||
|
||||
@Override |
||||
public boolean containsKey(final BytesValue key) throws StorageException { |
||||
return storage.containsKey(segmentHandle, key); |
||||
} |
||||
|
||||
@Override |
||||
public Transaction startTransaction() throws StorageException { |
||||
final SegmentedKeyValueStorage.Transaction<S> transaction = storage.startTransaction(); |
||||
return new Transaction() { |
||||
@Override |
||||
public void put(final BytesValue key, final BytesValue value) { |
||||
transaction.put(segmentHandle, key, value); |
||||
} |
||||
|
||||
@Override |
||||
public void remove(final BytesValue key) { |
||||
transaction.remove(segmentHandle, key); |
||||
} |
||||
|
||||
@Override |
||||
public void commit() throws StorageException { |
||||
transaction.commit(); |
||||
} |
||||
|
||||
@Override |
||||
public void rollback() { |
||||
transaction.rollback(); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
@Override |
||||
public long removeUnless(final Predicate<BytesValue> inUseCheck) { |
||||
return storage.removeUnless(segmentHandle, inUseCheck); |
||||
} |
||||
|
||||
@Override |
||||
public void clear() { |
||||
storage.clear(segmentHandle); |
||||
} |
||||
|
||||
@Override |
||||
public void close() throws IOException { |
||||
storage.close(); |
||||
} |
||||
} |
@ -0,0 +1,112 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
|
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage.Segment; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage.Transaction; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Optional; |
||||
|
||||
import org.junit.Rule; |
||||
import org.junit.Test; |
||||
import org.junit.rules.TemporaryFolder; |
||||
import org.rocksdb.ColumnFamilyHandle; |
||||
|
||||
public class ColumnarRocksDbKeyValueStorageTest extends AbstractKeyValueStorageTest { |
||||
@Rule public final TemporaryFolder folder = new TemporaryFolder(); |
||||
|
||||
@Test |
||||
public void twoSegmentsAreIndependent() throws Exception { |
||||
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore(); |
||||
|
||||
Transaction<ColumnFamilyHandle> tx = store.startTransaction(); |
||||
tx.put( |
||||
store.getSegmentIdentifierByName(TestSegment.BAR), |
||||
BytesValue.fromHexString("0001"), |
||||
BytesValue.fromHexString("0FFF")); |
||||
tx.commit(); |
||||
final Optional<BytesValue> result = |
||||
store.get( |
||||
store.getSegmentIdentifierByName(TestSegment.FOO), BytesValue.fromHexString("0001")); |
||||
assertEquals(Optional.empty(), result); |
||||
} |
||||
|
||||
@Test |
||||
public void canRemoveThroughSegmentIteration() throws Exception { |
||||
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore(); |
||||
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO); |
||||
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR); |
||||
|
||||
Transaction<ColumnFamilyHandle> tx = store.startTransaction(); |
||||
tx.put(fooSegment, BytesValue.of(1), BytesValue.of(1)); |
||||
tx.put(fooSegment, BytesValue.of(2), BytesValue.of(2)); |
||||
tx.put(fooSegment, BytesValue.of(3), BytesValue.of(3)); |
||||
tx.put(barSegment, BytesValue.of(4), BytesValue.of(4)); |
||||
tx.put(barSegment, BytesValue.of(5), BytesValue.of(5)); |
||||
tx.put(barSegment, BytesValue.of(6), BytesValue.of(6)); |
||||
tx.commit(); |
||||
|
||||
final long removedFromFoo = store.removeUnless(fooSegment, x -> x.equals(BytesValue.of(3))); |
||||
final long removedFromBar = store.removeUnless(barSegment, x -> x.equals(BytesValue.of(4))); |
||||
|
||||
assertEquals(2, removedFromFoo); |
||||
assertEquals(2, removedFromBar); |
||||
|
||||
assertEquals(Optional.empty(), store.get(fooSegment, BytesValue.of(1))); |
||||
assertEquals(Optional.empty(), store.get(fooSegment, BytesValue.of(2))); |
||||
assertEquals(Optional.of(BytesValue.of(3)), store.get(fooSegment, BytesValue.of(3))); |
||||
|
||||
assertEquals(Optional.of(BytesValue.of(4)), store.get(barSegment, BytesValue.of(4))); |
||||
assertEquals(Optional.empty(), store.get(barSegment, BytesValue.of(5))); |
||||
assertEquals(Optional.empty(), store.get(barSegment, BytesValue.of(6))); |
||||
} |
||||
|
||||
public enum TestSegment implements Segment { |
||||
FOO(new byte[] {1}), |
||||
BAR(new byte[] {2}); |
||||
|
||||
private final byte[] id; |
||||
|
||||
TestSegment(final byte[] id) { |
||||
this.id = id; |
||||
} |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return name(); |
||||
} |
||||
|
||||
@Override |
||||
public byte[] getId() { |
||||
return id; |
||||
} |
||||
} |
||||
|
||||
private SegmentedKeyValueStorage<ColumnFamilyHandle> createSegmentedStore() throws Exception { |
||||
return ColumnarRocksDbKeyValueStorage.create( |
||||
new RocksDbConfiguration.Builder().databaseDir(folder.newFolder().toPath()).build(), |
||||
Arrays.asList(TestSegment.FOO, TestSegment.BAR), |
||||
new NoOpMetricsSystem()); |
||||
} |
||||
|
||||
@Override |
||||
protected KeyValueStorage createStore() throws Exception { |
||||
return new SegmentedKeyValueStorageAdapter<ColumnFamilyHandle>( |
||||
TestSegment.FOO, createSegmentedStore()); |
||||
} |
||||
} |
Loading…
Reference in new issue