|
|
|
@ -12,6 +12,10 @@ |
|
|
|
|
*/ |
|
|
|
|
package tech.pegasys.pantheon.services.kvstore; |
|
|
|
|
|
|
|
|
|
import tech.pegasys.pantheon.metrics.Counter; |
|
|
|
|
import tech.pegasys.pantheon.metrics.MetricCategory; |
|
|
|
|
import tech.pegasys.pantheon.metrics.MetricsSystem; |
|
|
|
|
import tech.pegasys.pantheon.metrics.OperationTimer; |
|
|
|
|
import tech.pegasys.pantheon.util.bytes.BytesValue; |
|
|
|
|
|
|
|
|
|
import java.io.Closeable; |
|
|
|
@ -44,19 +48,47 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
private final TransactionDB db; |
|
|
|
|
private final AtomicBoolean closed = new AtomicBoolean(false); |
|
|
|
|
|
|
|
|
|
private final OperationTimer readLatency; |
|
|
|
|
private final OperationTimer removeLatency; |
|
|
|
|
private final OperationTimer writeLatency; |
|
|
|
|
private final OperationTimer commitLatency; |
|
|
|
|
private final Counter rollbackCount; |
|
|
|
|
|
|
|
|
|
static { |
|
|
|
|
RocksDB.loadLibrary(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static KeyValueStorage create(final Path storageDirectory) throws StorageException { |
|
|
|
|
return new RocksDbKeyValueStorage(storageDirectory); |
|
|
|
|
public static KeyValueStorage create( |
|
|
|
|
final Path storageDirectory, final MetricsSystem metricsSystem) throws StorageException { |
|
|
|
|
return new RocksDbKeyValueStorage(storageDirectory, metricsSystem); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private RocksDbKeyValueStorage(final Path storageDirectory) { |
|
|
|
|
private RocksDbKeyValueStorage(final Path storageDirectory, final MetricsSystem metricsSystem) { |
|
|
|
|
try { |
|
|
|
|
options = new Options().setCreateIfMissing(true); |
|
|
|
|
txOptions = new TransactionDBOptions(); |
|
|
|
|
db = TransactionDB.open(options, txOptions, storageDirectory.toString()); |
|
|
|
|
|
|
|
|
|
readLatency = |
|
|
|
|
metricsSystem.createTimer( |
|
|
|
|
MetricCategory.ROCKSDB, "read_latency_seconds", "Latency for read from RocksDB."); |
|
|
|
|
removeLatency = |
|
|
|
|
metricsSystem.createTimer( |
|
|
|
|
MetricCategory.ROCKSDB, |
|
|
|
|
"remove_latency_seconds", |
|
|
|
|
"Latency of remove requests from RocksDB."); |
|
|
|
|
writeLatency = |
|
|
|
|
metricsSystem.createTimer( |
|
|
|
|
MetricCategory.ROCKSDB, "write_latency_seconds", "Latency for write to RocksDB."); |
|
|
|
|
commitLatency = |
|
|
|
|
metricsSystem.createTimer( |
|
|
|
|
MetricCategory.ROCKSDB, "commit_latency_seconds", "Latency for commits to RocksDB."); |
|
|
|
|
|
|
|
|
|
rollbackCount = |
|
|
|
|
metricsSystem.createCounter( |
|
|
|
|
MetricCategory.ROCKSDB, |
|
|
|
|
"rollback_count", |
|
|
|
|
"Number of RocksDB transactions rolled back."); |
|
|
|
|
} catch (final RocksDBException e) { |
|
|
|
|
throw new StorageException(e); |
|
|
|
|
} |
|
|
|
@ -65,7 +97,8 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
@Override |
|
|
|
|
public Optional<BytesValue> get(final BytesValue key) throws StorageException { |
|
|
|
|
throwIfClosed(); |
|
|
|
|
try { |
|
|
|
|
|
|
|
|
|
try (final OperationTimer.TimingContext ignored = readLatency.startTimer()) { |
|
|
|
|
return Optional.ofNullable(db.get(key.extractArray())).map(BytesValue::wrap); |
|
|
|
|
} catch (final RocksDBException e) { |
|
|
|
|
throw new StorageException(e); |
|
|
|
@ -143,7 +176,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
return entry; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Stream<Entry> toStream() { |
|
|
|
|
Stream<Entry> toStream() { |
|
|
|
|
final Spliterator<Entry> split = |
|
|
|
|
Spliterators.spliteratorUnknownSize( |
|
|
|
|
this, Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL); |
|
|
|
@ -158,7 +191,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static class RocksDbTransaction extends AbstractTransaction { |
|
|
|
|
private class RocksDbTransaction extends AbstractTransaction { |
|
|
|
|
private final org.rocksdb.Transaction innerTx; |
|
|
|
|
private final WriteOptions options; |
|
|
|
|
|
|
|
|
@ -169,7 +202,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
protected void doPut(final BytesValue key, final BytesValue value) { |
|
|
|
|
try { |
|
|
|
|
try (final OperationTimer.TimingContext ignored = writeLatency.startTimer()) { |
|
|
|
|
innerTx.put(key.extractArray(), value.extractArray()); |
|
|
|
|
} catch (final RocksDBException e) { |
|
|
|
|
throw new StorageException(e); |
|
|
|
@ -178,7 +211,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
protected void doRemove(final BytesValue key) { |
|
|
|
|
try { |
|
|
|
|
try (final OperationTimer.TimingContext ignored = removeLatency.startTimer()) { |
|
|
|
|
innerTx.delete(key.extractArray()); |
|
|
|
|
} catch (final RocksDBException e) { |
|
|
|
|
throw new StorageException(e); |
|
|
|
@ -187,7 +220,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
protected void doCommit() throws StorageException { |
|
|
|
|
try { |
|
|
|
|
try (final OperationTimer.TimingContext ignored = commitLatency.startTimer()) { |
|
|
|
|
innerTx.commit(); |
|
|
|
|
} catch (final RocksDBException e) { |
|
|
|
|
throw new StorageException(e); |
|
|
|
@ -200,6 +233,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { |
|
|
|
|
protected void doRollback() { |
|
|
|
|
try { |
|
|
|
|
innerTx.rollback(); |
|
|
|
|
rollbackCount.inc(); |
|
|
|
|
} catch (final RocksDBException e) { |
|
|
|
|
throw new StorageException(e); |
|
|
|
|
} finally { |
|
|
|
|