wrap rocksdb segmenthandles in atomicreference to ensure we do not reference closed handles (#3734)

Signed-off-by: garyschulte <garyschulte@gmail.com>
pull/3753/head
garyschulte 3 years ago committed by GitHub
parent 1693db9849
commit 23f5e2e933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 72
      plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java
  2. 32
      plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/unsegmented/RocksDBColumnarKeyValueStorageTest.java
  3. 5
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorage.java
  4. 21
      services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedKeyValueStorageAdapter.java

@ -31,16 +31,17 @@ import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransaction
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
import org.apache.tuweni.bytes.Bytes;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyDescriptor;
@ -62,19 +63,19 @@ import org.slf4j.LoggerFactory;
public class RocksDBColumnarKeyValueStorage
implements SegmentedKeyValueStorage<ColumnFamilyHandle> {
static {
RocksDbUtil.loadNativeLibrary();
}
private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class);
private static final String DEFAULT_COLUMN = "default";
private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device";
static {
RocksDbUtil.loadNativeLibrary();
}
private final DBOptions options;
private final TransactionDBOptions txOptions;
private final TransactionDB db;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<String, ColumnFamilyHandle> columnHandlesByName;
private final Map<String, AtomicReference<ColumnFamilyHandle>> columnHandlesByName;
private final RocksDBMetrics metrics;
private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true);
@ -127,14 +128,17 @@ public class RocksDBColumnarKeyValueStorage
Collectors.toMap(
segment -> Bytes.wrap(segment.getId()), SegmentIdentifier::getName));
columnHandlesByName = new HashMap<>();
final ImmutableMap.Builder<String, AtomicReference<ColumnFamilyHandle>> builder =
ImmutableMap.builder();
for (ColumnFamilyHandle columnHandle : columnHandles) {
final String segmentName =
requireNonNullElse(
segmentsById.get(Bytes.wrap(columnHandle.getName())), DEFAULT_COLUMN);
columnHandlesByName.put(segmentName, columnHandle);
builder.put(segmentName, new AtomicReference<>(columnHandle));
}
columnHandlesByName = builder.build();
} catch (final RocksDBException e) {
throw new StorageException(e);
}
@ -146,7 +150,8 @@ public class RocksDBColumnarKeyValueStorage
}
@Override
public ColumnFamilyHandle getSegmentIdentifierByName(final SegmentIdentifier segment) {
public AtomicReference<ColumnFamilyHandle> getSegmentIdentifierByName(
final SegmentIdentifier segment) {
return columnHandlesByName.get(segment.getName());
}
@ -198,30 +203,27 @@ public class RocksDBColumnarKeyValueStorage
}
@Override
public ColumnFamilyHandle clear(final ColumnFamilyHandle segmentHandle) {
try {
var entry =
columnHandlesByName.entrySet().stream()
.filter(e -> e.getValue().equals(segmentHandle))
.findAny();
if (entry.isPresent()) {
String segmentName = entry.get().getKey();
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptor(
segmentHandle.getName(), segmentHandle.getDescriptor().getOptions());
db.dropColumnFamily(segmentHandle);
segmentHandle.close();
ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor);
columnHandlesByName.put(segmentName, newHandle);
return newHandle;
}
return segmentHandle;
} catch (final RocksDBException e) {
throw new StorageException(e);
public void clear(final ColumnFamilyHandle segmentHandle) {
var entry =
columnHandlesByName.values().stream().filter(e -> e.get().equals(segmentHandle)).findAny();
if (entry.isPresent()) {
AtomicReference<ColumnFamilyHandle> segmentHandleRef = entry.get();
segmentHandleRef.getAndUpdate(
oldHandle -> {
try {
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptor(
segmentHandle.getName(), segmentHandle.getDescriptor().getOptions());
db.dropColumnFamily(oldHandle);
ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor);
segmentHandle.close();
return newHandle;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
});
}
}
@ -231,7 +233,9 @@ public class RocksDBColumnarKeyValueStorage
txOptions.close();
options.close();
tryDeleteOptions.close();
columnHandlesByName.values().forEach(ColumnFamilyHandle::close);
columnHandlesByName.values().stream()
.map(AtomicReference::get)
.forEach(ColumnFamilyHandle::close);
db.close();
}
}

@ -31,8 +31,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Rule;
import org.junit.Test;
@ -49,7 +49,10 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
final byte[] val1 = bytesFromHexString("0FFF");
final byte[] val2 = bytesFromHexString("1337");
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
Supplier<ColumnFamilyHandle> segment = () -> store.getSegmentIdentifierByName(TestSegment.FOO);
AtomicReference<ColumnFamilyHandle> segment = store.getSegmentIdentifierByName(TestSegment.FOO);
KeyValueStorage duplicateSegmentRef =
new SegmentedKeyValueStorageAdapter<>(TestSegment.FOO, store);
final Consumer<byte[]> insert =
value -> {
final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
@ -59,18 +62,18 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
// insert val:
insert.accept(val1);
final Optional<byte[]> result = store.get(segment.get(), key);
assertThat(result.orElse(null)).isEqualTo(val1);
assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val1);
assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val1);
// clear and assert empty:
store.clear(segment.get());
final Optional<byte[]> truncResult = store.get(segment.get(), key);
assertThat(truncResult).isEmpty();
assertThat(store.get(segment.get(), key)).isEmpty();
assertThat(duplicateSegmentRef.get(key)).isEmpty();
// insert into empty:
insert.accept(val2);
final Optional<byte[]> nextResult = store.get(segment.get(), key);
assertThat(nextResult.orElse(null)).isEqualTo(val2);
assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val2);
assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val2);
store.close();
}
@ -81,13 +84,14 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
tx.put(
store.getSegmentIdentifierByName(TestSegment.BAR),
store.getSegmentIdentifierByName(TestSegment.BAR).get(),
bytesFromHexString("0001"),
bytesFromHexString("0FFF"));
tx.commit();
final Optional<byte[]> result =
store.get(store.getSegmentIdentifierByName(TestSegment.FOO), bytesFromHexString("0001"));
store.get(
store.getSegmentIdentifierByName(TestSegment.FOO).get(), bytesFromHexString("0001"));
assertThat(result).isEmpty();
@ -100,8 +104,8 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
// properly
for (int i = 0; i < 50; i++) {
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO);
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR);
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get();
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get();
final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
tx.put(fooSegment, bytesOf(1), bytesOf(1));
@ -144,8 +148,8 @@ public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageT
@Test
public void canGetThroughSegmentIteration() throws Exception {
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO);
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR);
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get();
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get();
final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
tx.put(fooSegment, bytesOf(1), bytesOf(1));

@ -20,6 +20,7 @@ import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import java.io.Closeable;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -30,7 +31,7 @@ import java.util.stream.Stream;
*/
public interface SegmentedKeyValueStorage<S> extends Closeable {
S getSegmentIdentifierByName(SegmentIdentifier segment);
AtomicReference<S> getSegmentIdentifierByName(SegmentIdentifier segment);
/**
* Get the value from the associated segment and key.
@ -74,7 +75,7 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
Set<byte[]> getAllKeysThat(S segmentHandle, Predicate<byte[]> returnCondition);
S clear(S segmentHandle);
void clear(S segmentHandle);
/**
* Represents a set of changes to be committed atomically. A single transaction is not

@ -22,47 +22,48 @@ import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;
public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
private S segmentHandle;
private final AtomicReference<S> segmentHandle;
private final SegmentedKeyValueStorage<S> storage;
public SegmentedKeyValueStorageAdapter(
final SegmentIdentifier segment, final SegmentedKeyValueStorage<S> storage) {
this.segmentHandle = storage.getSegmentIdentifierByName(segment);
segmentHandle = storage.getSegmentIdentifierByName(segment);
this.storage = storage;
}
@Override
public void clear() {
segmentHandle = storage.clear(segmentHandle);
storage.clear(segmentHandle.get());
}
@Override
public boolean containsKey(final byte[] key) throws StorageException {
return storage.containsKey(segmentHandle, key);
return storage.containsKey(segmentHandle.get(), key);
}
@Override
public Optional<byte[]> get(final byte[] key) throws StorageException {
return storage.get(segmentHandle, key);
return storage.get(segmentHandle.get(), key);
}
@Override
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) {
return storage.getAllKeysThat(segmentHandle, returnCondition);
return storage.getAllKeysThat(segmentHandle.get(), returnCondition);
}
@Override
public Stream<byte[]> streamKeys() {
return storage.streamKeys(segmentHandle);
return storage.streamKeys(segmentHandle.get());
}
@Override
public boolean tryDelete(final byte[] key) {
return storage.tryDelete(segmentHandle, key);
return storage.tryDelete(segmentHandle.get(), key);
}
@Override
@ -77,12 +78,12 @@ public class SegmentedKeyValueStorageAdapter<S> implements KeyValueStorage {
@Override
public void put(final byte[] key, final byte[] value) {
transaction.put(segmentHandle, key, value);
transaction.put(segmentHandle.get(), key, value);
}
@Override
public void remove(final byte[] key) {
transaction.remove(segmentHandle, key);
transaction.remove(segmentHandle.get(), key);
}
@Override

Loading…
Cancel
Save