mirror of https://github.com/hyperledger/besu
Refactoring Rocksdb as a module (#1889)
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>pull/2/head
parent
ccb939fb77
commit
dec01db6f9
@ -0,0 +1,28 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.storage.keyvalue; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.storage.SegmentIdentifier; |
||||
|
||||
public enum KeyValueSegmentIdentifier implements SegmentIdentifier { |
||||
BLOCKCHAIN, |
||||
WORLD_STATE, |
||||
PRIVATE_TRANSACTIONS, |
||||
PRIVATE_STATE, |
||||
PRUNING_STATE; |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return name(); |
||||
} |
||||
} |
@ -0,0 +1,72 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.storage.keyvalue; |
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull; |
||||
import static tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.BLOCKCHAIN; |
||||
import static tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.PRIVATE_STATE; |
||||
import static tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.PRIVATE_TRANSACTIONS; |
||||
import static tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.PRUNING_STATE; |
||||
import static tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.WORLD_STATE; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.MetricsSystem; |
||||
import tech.pegasys.pantheon.plugin.services.PantheonConfiguration; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorage; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorageFactory; |
||||
import tech.pegasys.pantheon.services.kvstore.LimitedInMemoryKeyValueStorage; |
||||
|
||||
public class KeyValueStorageProviderBuilder { |
||||
|
||||
private static final long DEFAULT_WORLD_STATE_PRE_IMAGE_CACHE_SIZE = 5_000L; |
||||
|
||||
private KeyValueStorageFactory storageFactory; |
||||
private PantheonConfiguration commonConfiguration; |
||||
private MetricsSystem metricsSystem; |
||||
|
||||
public KeyValueStorageProviderBuilder withStorageFactory( |
||||
final KeyValueStorageFactory storageFactory) { |
||||
this.storageFactory = storageFactory; |
||||
return this; |
||||
} |
||||
|
||||
public KeyValueStorageProviderBuilder withCommonConfiguration( |
||||
final PantheonConfiguration commonConfiguration) { |
||||
this.commonConfiguration = commonConfiguration; |
||||
return this; |
||||
} |
||||
|
||||
public KeyValueStorageProviderBuilder withMetricsSystem(final MetricsSystem metricsSystem) { |
||||
this.metricsSystem = metricsSystem; |
||||
return this; |
||||
} |
||||
|
||||
public KeyValueStorageProvider build() { |
||||
checkNotNull(storageFactory, "Cannot build a storage provider without a storage factory."); |
||||
checkNotNull( |
||||
commonConfiguration, |
||||
"Cannot build a storage provider without the plugin common configuration."); |
||||
checkNotNull(metricsSystem, "Cannot build a storage provider without a metrics system."); |
||||
|
||||
final KeyValueStorage worldStatePreImageStorage = |
||||
new LimitedInMemoryKeyValueStorage(DEFAULT_WORLD_STATE_PRE_IMAGE_CACHE_SIZE); |
||||
|
||||
return new KeyValueStorageProvider( |
||||
storageFactory.create(BLOCKCHAIN, commonConfiguration, metricsSystem), |
||||
storageFactory.create(WORLD_STATE, commonConfiguration, metricsSystem), |
||||
worldStatePreImageStorage, |
||||
storageFactory.create(PRIVATE_TRANSACTIONS, commonConfiguration, metricsSystem), |
||||
storageFactory.create(PRIVATE_STATE, commonConfiguration, metricsSystem), |
||||
storageFactory.create(PRUNING_STATE, commonConfiguration, metricsSystem), |
||||
storageFactory.isSegmentIsolationSupported()); |
||||
} |
||||
} |
@ -1,152 +0,0 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.storage.keyvalue; |
||||
|
||||
import static java.util.AbstractMap.SimpleEntry; |
||||
import static java.util.Arrays.asList; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.storage.StorageProvider; |
||||
import tech.pegasys.pantheon.plugin.services.MetricsSystem; |
||||
import tech.pegasys.pantheon.services.kvstore.ColumnarRocksDbKeyValueStorage; |
||||
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage; |
||||
import tech.pegasys.pantheon.services.kvstore.LimitedInMemoryKeyValueStorage; |
||||
import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration; |
||||
import tech.pegasys.pantheon.services.kvstore.RocksDbKeyValueStorage; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage.Segment; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorageAdapter; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.TreeMap; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class RocksDbStorageProvider { |
||||
public static long DEFAULT_WORLD_STATE_PREIMAGE_CACHE_SIZE = 5_000L; |
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
public static final int DEFAULT_VERSION = 1; |
||||
/** This key is the version and the value is the function used to create or load the database. */ |
||||
private static final TreeMap<Integer, StorageProviderFunction> PROVIDERS_BY_VERSION = |
||||
new TreeMap<>( |
||||
Map.ofEntries( |
||||
new SimpleEntry<>(0, RocksDbStorageProvider::ofUnsegmented), |
||||
new SimpleEntry<>(1, RocksDbStorageProvider::ofSegmented))); |
||||
|
||||
public static StorageProvider create( |
||||
final RocksDbConfiguration rocksDbConfiguration, final MetricsSystem metricsSystem) |
||||
throws IOException { |
||||
return create(rocksDbConfiguration, metricsSystem, DEFAULT_WORLD_STATE_PREIMAGE_CACHE_SIZE); |
||||
} |
||||
|
||||
public static StorageProvider create( |
||||
final RocksDbConfiguration rocksDbConfiguration, |
||||
final MetricsSystem metricsSystem, |
||||
final long worldStatePreimageCacheSize) |
||||
throws IOException { |
||||
|
||||
final Path databaseDir = rocksDbConfiguration.getDatabaseDir(); |
||||
final boolean databaseExists = databaseDir.resolve("IDENTITY").toFile().exists(); |
||||
final int databaseVersion; |
||||
if (databaseExists) { |
||||
databaseVersion = DatabaseMetadata.fromDirectory(databaseDir).getVersion(); |
||||
LOG.info("Existing database detected at {}. Version {}", databaseDir, databaseVersion); |
||||
} else { |
||||
databaseVersion = DEFAULT_VERSION; |
||||
LOG.info( |
||||
"No existing database detected at {}. Using version {}", databaseDir, databaseVersion); |
||||
Files.createDirectories(databaseDir); |
||||
new DatabaseMetadata(databaseVersion).writeToDirectory(databaseDir); |
||||
} |
||||
|
||||
final StorageProviderFunction providerFunction = |
||||
Optional.ofNullable(PROVIDERS_BY_VERSION.get(databaseVersion)) |
||||
.orElseThrow( |
||||
() -> |
||||
new IllegalStateException( |
||||
String.format( |
||||
"Invalid database version %d. Valid versions are: %s. Default version is %d", |
||||
databaseVersion, |
||||
PROVIDERS_BY_VERSION.navigableKeySet().toString(), |
||||
DEFAULT_VERSION))); |
||||
|
||||
return providerFunction.apply(rocksDbConfiguration, metricsSystem, worldStatePreimageCacheSize); |
||||
} |
||||
|
||||
private static StorageProvider ofUnsegmented( |
||||
final RocksDbConfiguration rocksDbConfiguration, |
||||
final MetricsSystem metricsSystem, |
||||
final long worldStatePreimageCacheSize) { |
||||
final KeyValueStorage kv = RocksDbKeyValueStorage.create(rocksDbConfiguration, metricsSystem); |
||||
final KeyValueStorage preimageKv = |
||||
new LimitedInMemoryKeyValueStorage(worldStatePreimageCacheSize); |
||||
return new KeyValueStorageProvider(kv, kv, preimageKv, kv, kv, kv, false); |
||||
} |
||||
|
||||
private static StorageProvider ofSegmented( |
||||
final RocksDbConfiguration rocksDbConfiguration, |
||||
final MetricsSystem metricsSystem, |
||||
final long worldStatePreimageCacheSize) { |
||||
|
||||
final SegmentedKeyValueStorage<?> columnarStorage = |
||||
ColumnarRocksDbKeyValueStorage.create( |
||||
rocksDbConfiguration, asList(RocksDbSegment.values()), metricsSystem); |
||||
final KeyValueStorage preimageStorage = |
||||
new LimitedInMemoryKeyValueStorage(worldStatePreimageCacheSize); |
||||
|
||||
return new KeyValueStorageProvider( |
||||
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.BLOCKCHAIN, columnarStorage), |
||||
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.WORLD_STATE, columnarStorage), |
||||
preimageStorage, |
||||
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRIVATE_TRANSACTIONS, columnarStorage), |
||||
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRIVATE_STATE, columnarStorage), |
||||
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRUNING_STATE, columnarStorage), |
||||
true); |
||||
} |
||||
|
||||
private enum RocksDbSegment implements Segment { |
||||
BLOCKCHAIN((byte) 1), |
||||
WORLD_STATE((byte) 2), |
||||
PRIVATE_TRANSACTIONS((byte) 3), |
||||
PRIVATE_STATE((byte) 4), |
||||
PRUNING_STATE((byte) 5); |
||||
|
||||
private final byte[] id; |
||||
|
||||
RocksDbSegment(final byte... id) { |
||||
this.id = id; |
||||
} |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return name(); |
||||
} |
||||
|
||||
@Override |
||||
public byte[] getId() { |
||||
return id; |
||||
} |
||||
} |
||||
|
||||
private interface StorageProviderFunction { |
||||
StorageProvider apply( |
||||
final RocksDbConfiguration rocksDbConfiguration, |
||||
final MetricsSystem metricsSystem, |
||||
final long worldStatePreimageCacheSize) |
||||
throws IOException; |
||||
} |
||||
} |
@ -1,104 +0,0 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.storage.keyvalue; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.plugin.services.MetricsSystem; |
||||
import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration; |
||||
|
||||
import java.nio.charset.Charset; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
|
||||
import org.junit.Rule; |
||||
import org.junit.Test; |
||||
import org.junit.rules.TemporaryFolder; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class RocksDbStorageProviderTest { |
||||
|
||||
@Mock private RocksDbConfiguration rocksDbConfiguration; |
||||
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); |
||||
private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); |
||||
|
||||
@Test |
||||
public void shouldCreateCorrectMetadataFileForLatestVersion() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
when(rocksDbConfiguration.getDatabaseDir()).thenReturn(tempDatabaseDir); |
||||
RocksDbStorageProvider.create(rocksDbConfiguration, metricsSystem); |
||||
assertEquals( |
||||
RocksDbStorageProvider.DEFAULT_VERSION, |
||||
DatabaseMetadata.fromDirectory(rocksDbConfiguration.getDatabaseDir()).getVersion()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldDetectVersion0DatabaseIfNoMetadataFileFound() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
when(rocksDbConfiguration.getDatabaseDir()).thenReturn(tempDatabaseDir); |
||||
RocksDbStorageProvider.create(rocksDbConfiguration, metricsSystem); |
||||
assertEquals(0, DatabaseMetadata.fromDirectory(tempDatabaseDir).getVersion()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldDetectCorrectVersionIfMetadataFileExists() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
new DatabaseMetadata(1).writeToDirectory(tempDatabaseDir); |
||||
when(rocksDbConfiguration.getDatabaseDir()).thenReturn(tempDatabaseDir); |
||||
RocksDbStorageProvider.create(rocksDbConfiguration, metricsSystem); |
||||
assertEquals(1, DatabaseMetadata.fromDirectory(tempDatabaseDir).getVersion()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldThrowExceptionWhenVersionNumberIsInvalid() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
new DatabaseMetadata(-1).writeToDirectory(tempDatabaseDir); |
||||
when(rocksDbConfiguration.getDatabaseDir()).thenReturn(tempDatabaseDir); |
||||
assertThatThrownBy(() -> RocksDbStorageProvider.create(rocksDbConfiguration, metricsSystem)) |
||||
.isInstanceOf(IllegalStateException.class); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldThrowExceptionWhenMetaDataFileIsCorrupted() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
when(rocksDbConfiguration.getDatabaseDir()).thenReturn(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
|
||||
final String badVersion = "{\"🦄\":1}"; |
||||
Files.write( |
||||
tempDatabaseDir.resolve(DatabaseMetadata.METADATA_FILENAME), |
||||
badVersion.getBytes(Charset.defaultCharset())); |
||||
assertThatThrownBy(() -> RocksDbStorageProvider.create(rocksDbConfiguration, metricsSystem)) |
||||
.isInstanceOf(IllegalStateException.class); |
||||
|
||||
final String badValue = "{\"version\":\"iomedae\"}"; |
||||
Files.write( |
||||
tempDatabaseDir.resolve(DatabaseMetadata.METADATA_FILENAME), |
||||
badValue.getBytes(Charset.defaultCharset())); |
||||
assertThatThrownBy(() -> RocksDbStorageProvider.create(rocksDbConfiguration, metricsSystem)) |
||||
.isInstanceOf(IllegalStateException.class); |
||||
} |
||||
} |
@ -0,0 +1,38 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.PantheonConfiguration; |
||||
|
||||
import java.net.URI; |
||||
import java.nio.file.Path; |
||||
import java.util.Optional; |
||||
|
||||
public class PantheonConfigurationImpl implements PantheonConfiguration { |
||||
|
||||
private final Path storagePath; |
||||
|
||||
public PantheonConfigurationImpl(final Path storagePath) { |
||||
this.storagePath = storagePath; |
||||
} |
||||
|
||||
@Override |
||||
public Path getStoragePath() { |
||||
return storagePath; |
||||
} |
||||
|
||||
@Override |
||||
public Optional<URI> getEnclaveUrl() { |
||||
return Optional.empty(); |
||||
} |
||||
} |
@ -0,0 +1,63 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.StorageService; |
||||
import tech.pegasys.pantheon.plugin.services.exception.StorageException; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorageFactory; |
||||
import tech.pegasys.pantheon.plugin.services.storage.SegmentIdentifier; |
||||
|
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
public class StorageServiceImpl implements StorageService { |
||||
|
||||
private final List<SegmentIdentifier> segments; |
||||
private final Map<String, KeyValueStorageFactory> factories; |
||||
|
||||
public StorageServiceImpl() { |
||||
this.segments = List.of(Segment.values()); |
||||
this.factories = new ConcurrentHashMap<>(); |
||||
} |
||||
|
||||
@Override |
||||
public void registerKeyValueStorage(final KeyValueStorageFactory factory) { |
||||
factories.put(factory.getName(), factory); |
||||
} |
||||
|
||||
@Override |
||||
public List<SegmentIdentifier> getAllSegmentIdentifiers() { |
||||
return segments; |
||||
} |
||||
|
||||
private enum Segment implements SegmentIdentifier { |
||||
BLOCKCHAIN, |
||||
WORLD_STATE, |
||||
PRIVATE_TRANSACTIONS, |
||||
PRIVATE_STATE, |
||||
PRUNING_STATE; |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return name(); |
||||
} |
||||
} |
||||
|
||||
public KeyValueStorageFactory getByName(final String name) { |
||||
return Optional.ofNullable(factories.get(name)) |
||||
.orElseThrow( |
||||
() -> new StorageException("No KeyValueStorageFactory found for key: " + name)); |
||||
} |
||||
} |
@ -1,51 +0,0 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.cli.options; |
||||
|
||||
import tech.pegasys.pantheon.services.kvstore.RocksDbConfiguration; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
public class RocksDBOptionsTest |
||||
extends AbstractCLIOptionsTest<RocksDbConfiguration.Builder, RocksDBOptions> { |
||||
|
||||
@Override |
||||
RocksDbConfiguration.Builder createDefaultDomainObject() { |
||||
return RocksDbConfiguration.builder(); |
||||
} |
||||
|
||||
@Override |
||||
RocksDbConfiguration.Builder createCustomizedDomainObject() { |
||||
return RocksDbConfiguration.builder() |
||||
.maxOpenFiles(RocksDbConfiguration.DEFAULT_MAX_OPEN_FILES + 1) |
||||
.cacheCapacity(RocksDbConfiguration.DEFAULT_CACHE_CAPACITY + 1) |
||||
.maxBackgroundCompactions(RocksDbConfiguration.DEFAULT_MAX_BACKGROUND_COMPACTIONS + 1) |
||||
.backgroundThreadCount(RocksDbConfiguration.DEFAULT_BACKGROUND_THREAD_COUNT + 1); |
||||
} |
||||
|
||||
@Override |
||||
RocksDBOptions optionsFromDomainObject(final RocksDbConfiguration.Builder domainObject) { |
||||
return RocksDBOptions.fromConfig(domainObject.build()); |
||||
} |
||||
|
||||
@Override |
||||
RocksDBOptions getOptionsFromPantheonCommand(final TestPantheonCommand command) { |
||||
return command.getRocksDBOptions(); |
||||
} |
||||
|
||||
@Override |
||||
protected List<String> getFieldsToIgnore() { |
||||
return Arrays.asList("databaseDir", "useColumns"); |
||||
} |
||||
} |
@ -0,0 +1,14 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
|
||||
jar { enabled = false } |
@ -0,0 +1,44 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.PantheonConfiguration; |
||||
import tech.pegasys.pantheon.plugin.services.storage.SegmentIdentifier; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration; |
||||
|
||||
import java.nio.file.Path; |
||||
import java.util.List; |
||||
|
||||
import com.google.common.base.Supplier; |
||||
|
||||
@Deprecated |
||||
public class RocksDBKeyValuePrivacyStorageFactory extends RocksDBKeyValueStorageFactory { |
||||
|
||||
private static final String PRIVATE_DATABASE_PATH = "private"; |
||||
|
||||
public RocksDBKeyValuePrivacyStorageFactory( |
||||
final Supplier<RocksDBFactoryConfiguration> configuration, |
||||
final List<SegmentIdentifier> segments) { |
||||
super(configuration, segments); |
||||
} |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return "rocksdb-privacy"; |
||||
} |
||||
|
||||
@Override |
||||
protected Path storagePath(final PantheonConfiguration commonConfiguration) { |
||||
return super.storagePath(commonConfiguration).resolve(PRIVATE_DATABASE_PATH); |
||||
} |
||||
} |
@ -0,0 +1,151 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.MetricsSystem; |
||||
import tech.pegasys.pantheon.plugin.services.PantheonConfiguration; |
||||
import tech.pegasys.pantheon.plugin.services.exception.StorageException; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorage; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorageFactory; |
||||
import tech.pegasys.pantheon.plugin.services.storage.SegmentIdentifier; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.DatabaseMetadata; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.unsegmented.RocksDBKeyValueStorage; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorageAdapter; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
import java.util.List; |
||||
import java.util.Set; |
||||
|
||||
import com.google.common.base.Supplier; |
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private static final int DEFAULT_VERSION = 1; |
||||
private static final Set<Integer> SUPPORTED_VERSION = Set.of(0, 1); |
||||
private static final String NAME = "rocksdb"; |
||||
|
||||
private boolean isSegmentIsolationSupported; |
||||
private SegmentedKeyValueStorage<?> segmentedStorage; |
||||
private KeyValueStorage unsegmentedStorage; |
||||
|
||||
private final Supplier<RocksDBFactoryConfiguration> configuration; |
||||
private final List<SegmentIdentifier> segments; |
||||
|
||||
public RocksDBKeyValueStorageFactory( |
||||
final Supplier<RocksDBFactoryConfiguration> configuration, |
||||
final List<SegmentIdentifier> segments) { |
||||
this.configuration = configuration; |
||||
this.segments = segments; |
||||
} |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return NAME; |
||||
} |
||||
|
||||
@Override |
||||
public KeyValueStorage create( |
||||
final SegmentIdentifier segment, |
||||
final PantheonConfiguration commonConfiguration, |
||||
final MetricsSystem metricsSystem) |
||||
throws StorageException { |
||||
|
||||
if (requiresInit()) { |
||||
init(commonConfiguration, metricsSystem); |
||||
} |
||||
|
||||
return isSegmentIsolationSupported |
||||
? new SegmentedKeyValueStorageAdapter<>(segment, segmentedStorage) |
||||
: unsegmentedStorage; |
||||
} |
||||
|
||||
@Override |
||||
public boolean isSegmentIsolationSupported() { |
||||
return isSegmentIsolationSupported; |
||||
} |
||||
|
||||
public void close() throws IOException { |
||||
if (segmentedStorage != null) { |
||||
segmentedStorage.close(); |
||||
} |
||||
if (unsegmentedStorage != null) { |
||||
unsegmentedStorage.close(); |
||||
} |
||||
} |
||||
|
||||
protected Path storagePath(final PantheonConfiguration commonConfiguration) { |
||||
return commonConfiguration.getStoragePath(); |
||||
} |
||||
|
||||
private boolean requiresInit() { |
||||
return segmentedStorage == null && unsegmentedStorage == null; |
||||
} |
||||
|
||||
private void init( |
||||
final PantheonConfiguration commonConfiguration, final MetricsSystem metricsSystem) { |
||||
try { |
||||
this.isSegmentIsolationSupported = databaseVersion(commonConfiguration) == DEFAULT_VERSION; |
||||
} catch (final IOException e) { |
||||
LOG.error("Failed to retrieve the RocksDB database meta version: {}", e.getMessage()); |
||||
throw new StorageException(e.getMessage(), e); |
||||
} |
||||
|
||||
final RocksDBConfiguration rocksDBConfiguration = |
||||
RocksDBConfigurationBuilder.from(configuration.get()) |
||||
.databaseDir(storagePath(commonConfiguration)) |
||||
.build(); |
||||
|
||||
if (isSegmentIsolationSupported) { |
||||
this.unsegmentedStorage = null; |
||||
this.segmentedStorage = |
||||
new RocksDBColumnarKeyValueStorage(rocksDBConfiguration, segments, metricsSystem); |
||||
} else { |
||||
this.unsegmentedStorage = new RocksDBKeyValueStorage(rocksDBConfiguration, metricsSystem); |
||||
this.segmentedStorage = null; |
||||
} |
||||
} |
||||
|
||||
private int databaseVersion(final PantheonConfiguration commonConfiguration) throws IOException { |
||||
final Path databaseDir = storagePath(commonConfiguration); |
||||
final boolean databaseExists = databaseDir.resolve("IDENTITY").toFile().exists(); |
||||
final int databaseVersion; |
||||
if (databaseExists) { |
||||
databaseVersion = DatabaseMetadata.fromDirectory(databaseDir).getVersion(); |
||||
LOG.info("Existing database detected at {}. Version {}", databaseDir, databaseVersion); |
||||
} else { |
||||
databaseVersion = DEFAULT_VERSION; |
||||
LOG.info( |
||||
"No existing database detected at {}. Using version {}", databaseDir, databaseVersion); |
||||
Files.createDirectories(databaseDir); |
||||
new DatabaseMetadata(databaseVersion).writeToDirectory(databaseDir); |
||||
} |
||||
|
||||
if (!SUPPORTED_VERSION.contains(databaseVersion)) { |
||||
final String message = "Unsupported RocksDB Metadata version of: " + databaseVersion; |
||||
LOG.error(message); |
||||
throw new StorageException(message); |
||||
} |
||||
|
||||
return databaseVersion; |
||||
} |
||||
} |
@ -0,0 +1,117 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb; |
||||
|
||||
import tech.pegasys.pantheon.plugin.PantheonContext; |
||||
import tech.pegasys.pantheon.plugin.PantheonPlugin; |
||||
import tech.pegasys.pantheon.plugin.services.PicoCLIOptions; |
||||
import tech.pegasys.pantheon.plugin.services.StorageService; |
||||
import tech.pegasys.pantheon.plugin.services.storage.SegmentIdentifier; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
import com.google.common.base.Supplier; |
||||
import com.google.common.base.Suppliers; |
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
@AutoService(PantheonPlugin.class) |
||||
public class RocksDBPlugin implements PantheonPlugin { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private static final String NAME = "rocksdb"; |
||||
|
||||
private final RocksDBCLIOptions options; |
||||
private PantheonContext context; |
||||
private RocksDBKeyValueStorageFactory factory; |
||||
private RocksDBKeyValuePrivacyStorageFactory privacyFactory; |
||||
|
||||
public RocksDBPlugin() { |
||||
this.options = RocksDBCLIOptions.create(); |
||||
} |
||||
|
||||
@Override |
||||
public void register(final PantheonContext context) { |
||||
LOG.info("Registering plugin"); |
||||
this.context = context; |
||||
|
||||
final Optional<PicoCLIOptions> cmdlineOptions = context.getService(PicoCLIOptions.class); |
||||
|
||||
if (cmdlineOptions.isEmpty()) { |
||||
throw new IllegalStateException( |
||||
"Expecting a PicoCLIO options to register CLI options with, but none found."); |
||||
} |
||||
|
||||
cmdlineOptions.get().addPicoCLIOptions(NAME, options); |
||||
createFactoriesAndRegisterWithStorageService(); |
||||
|
||||
LOG.info("Plugin registered."); |
||||
} |
||||
|
||||
@Override |
||||
public void start() { |
||||
LOG.info("Starting plugin."); |
||||
if (factory == null) { |
||||
LOG.debug("Applied configuration: {}", options.toString()); |
||||
createFactoriesAndRegisterWithStorageService(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void stop() { |
||||
LOG.info("Stopping plugin."); |
||||
|
||||
try { |
||||
if (factory != null) { |
||||
factory.close(); |
||||
factory = null; |
||||
} |
||||
} catch (final IOException e) { |
||||
LOG.error("Failed to stop plugin: {}", e.getMessage(), e); |
||||
} |
||||
|
||||
try { |
||||
if (privacyFactory != null) { |
||||
privacyFactory.close(); |
||||
privacyFactory = null; |
||||
} |
||||
} catch (final IOException e) { |
||||
LOG.error("Failed to stop plugin: {}", e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
private void createAndRegister(final StorageService service) { |
||||
final List<SegmentIdentifier> segments = service.getAllSegmentIdentifiers(); |
||||
|
||||
final Supplier<RocksDBFactoryConfiguration> configuration = |
||||
Suppliers.memoize(options::toDomainObject); |
||||
factory = new RocksDBKeyValueStorageFactory(configuration, segments); |
||||
privacyFactory = new RocksDBKeyValuePrivacyStorageFactory(configuration, segments); |
||||
|
||||
service.registerKeyValueStorage(factory); |
||||
service.registerKeyValueStorage(privacyFactory); |
||||
} |
||||
|
||||
private void createFactoriesAndRegisterWithStorageService() { |
||||
context |
||||
.getService(StorageService.class) |
||||
.ifPresentOrElse( |
||||
this::createAndRegister, |
||||
() -> LOG.error("Failed to register KeyValueFactory due to missing StorageService.")); |
||||
} |
||||
} |
@ -0,0 +1,64 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration; |
||||
|
||||
import java.nio.file.Path; |
||||
|
||||
public class RocksDBConfiguration { |
||||
|
||||
private final Path databaseDir; |
||||
private final int maxOpenFiles; |
||||
private final String label; |
||||
private final int maxBackgroundCompactions; |
||||
private final int backgroundThreadCount; |
||||
private final long cacheCapacity; |
||||
|
||||
public RocksDBConfiguration( |
||||
final Path databaseDir, |
||||
final int maxOpenFiles, |
||||
final int maxBackgroundCompactions, |
||||
final int backgroundThreadCount, |
||||
final long cacheCapacity, |
||||
final String label) { |
||||
this.maxBackgroundCompactions = maxBackgroundCompactions; |
||||
this.backgroundThreadCount = backgroundThreadCount; |
||||
this.databaseDir = databaseDir; |
||||
this.maxOpenFiles = maxOpenFiles; |
||||
this.cacheCapacity = cacheCapacity; |
||||
this.label = label; |
||||
} |
||||
|
||||
public Path getDatabaseDir() { |
||||
return databaseDir; |
||||
} |
||||
|
||||
public int getMaxOpenFiles() { |
||||
return maxOpenFiles; |
||||
} |
||||
|
||||
public int getMaxBackgroundCompactions() { |
||||
return maxBackgroundCompactions; |
||||
} |
||||
|
||||
public int getBackgroundThreadCount() { |
||||
return backgroundThreadCount; |
||||
} |
||||
|
||||
public long getCacheCapacity() { |
||||
return cacheCapacity; |
||||
} |
||||
|
||||
public String getLabel() { |
||||
return label; |
||||
} |
||||
} |
@ -0,0 +1,78 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration; |
||||
|
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_BACKGROUND_THREAD_COUNT; |
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_CACHE_CAPACITY; |
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_MAX_BACKGROUND_COMPACTIONS; |
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_MAX_OPEN_FILES; |
||||
|
||||
import java.nio.file.Path; |
||||
|
||||
public class RocksDBConfigurationBuilder { |
||||
|
||||
private Path databaseDir; |
||||
private String label = "blockchain"; |
||||
private int maxOpenFiles = DEFAULT_MAX_OPEN_FILES; |
||||
private long cacheCapacity = DEFAULT_CACHE_CAPACITY; |
||||
private int maxBackgroundCompactions = DEFAULT_MAX_BACKGROUND_COMPACTIONS; |
||||
private int backgroundThreadCount = DEFAULT_BACKGROUND_THREAD_COUNT; |
||||
|
||||
public RocksDBConfigurationBuilder databaseDir(final Path databaseDir) { |
||||
this.databaseDir = databaseDir; |
||||
return this; |
||||
} |
||||
|
||||
public RocksDBConfigurationBuilder maxOpenFiles(final int maxOpenFiles) { |
||||
this.maxOpenFiles = maxOpenFiles; |
||||
return this; |
||||
} |
||||
|
||||
public RocksDBConfigurationBuilder label(final String label) { |
||||
this.label = label; |
||||
return this; |
||||
} |
||||
|
||||
public RocksDBConfigurationBuilder cacheCapacity(final long cacheCapacity) { |
||||
this.cacheCapacity = cacheCapacity; |
||||
return this; |
||||
} |
||||
|
||||
public RocksDBConfigurationBuilder maxBackgroundCompactions(final int maxBackgroundCompactions) { |
||||
this.maxBackgroundCompactions = maxBackgroundCompactions; |
||||
return this; |
||||
} |
||||
|
||||
public RocksDBConfigurationBuilder backgroundThreadCount(final int backgroundThreadCount) { |
||||
this.backgroundThreadCount = backgroundThreadCount; |
||||
return this; |
||||
} |
||||
|
||||
public static RocksDBConfigurationBuilder from(final RocksDBFactoryConfiguration configuration) { |
||||
return new RocksDBConfigurationBuilder() |
||||
.backgroundThreadCount(configuration.getBackgroundThreadCount()) |
||||
.cacheCapacity(configuration.getCacheCapacity()) |
||||
.maxBackgroundCompactions(configuration.getMaxBackgroundCompactions()) |
||||
.maxOpenFiles(configuration.getMaxOpenFiles()); |
||||
} |
||||
|
||||
public RocksDBConfiguration build() { |
||||
return new RocksDBConfiguration( |
||||
databaseDir, |
||||
maxOpenFiles, |
||||
maxBackgroundCompactions, |
||||
backgroundThreadCount, |
||||
cacheCapacity, |
||||
label); |
||||
} |
||||
} |
@ -0,0 +1,48 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration; |
||||
|
||||
public class RocksDBFactoryConfiguration { |
||||
|
||||
private final int maxOpenFiles; |
||||
private final int maxBackgroundCompactions; |
||||
private final int backgroundThreadCount; |
||||
private final long cacheCapacity; |
||||
|
||||
public RocksDBFactoryConfiguration( |
||||
final int maxOpenFiles, |
||||
final int maxBackgroundCompactions, |
||||
final int backgroundThreadCount, |
||||
final long cacheCapacity) { |
||||
this.maxBackgroundCompactions = maxBackgroundCompactions; |
||||
this.backgroundThreadCount = backgroundThreadCount; |
||||
this.maxOpenFiles = maxOpenFiles; |
||||
this.cacheCapacity = cacheCapacity; |
||||
} |
||||
|
||||
public int getMaxOpenFiles() { |
||||
return maxOpenFiles; |
||||
} |
||||
|
||||
public int getMaxBackgroundCompactions() { |
||||
return maxBackgroundCompactions; |
||||
} |
||||
|
||||
public int getBackgroundThreadCount() { |
||||
return backgroundThreadCount; |
||||
} |
||||
|
||||
public long getCacheCapacity() { |
||||
return cacheCapacity; |
||||
} |
||||
} |
@ -0,0 +1,82 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb.unsegmented; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.exception.StorageException; |
||||
import tech.pegasys.pantheon.plugin.services.metrics.OperationTimer; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorageTransaction; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.RocksDBMetrics; |
||||
|
||||
import org.rocksdb.RocksDBException; |
||||
import org.rocksdb.Transaction; |
||||
import org.rocksdb.WriteOptions; |
||||
|
||||
public class RocksDBTransaction implements KeyValueStorageTransaction { |
||||
|
||||
private final RocksDBMetrics metrics; |
||||
private final Transaction innerTx; |
||||
private final WriteOptions options; |
||||
|
||||
RocksDBTransaction( |
||||
final Transaction innerTx, final WriteOptions options, final RocksDBMetrics metrics) { |
||||
this.innerTx = innerTx; |
||||
this.options = options; |
||||
this.metrics = metrics; |
||||
} |
||||
|
||||
@Override |
||||
public void put(final byte[] key, final byte[] value) { |
||||
try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) { |
||||
innerTx.put(key, value); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void remove(final byte[] key) { |
||||
try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) { |
||||
innerTx.delete(key); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void commit() throws StorageException { |
||||
try (final OperationTimer.TimingContext ignored = metrics.getCommitLatency().startTimer()) { |
||||
innerTx.commit(); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void rollback() { |
||||
try { |
||||
innerTx.rollback(); |
||||
metrics.getRollbackCount().inc(); |
||||
} catch (final RocksDBException e) { |
||||
throw new StorageException(e); |
||||
} finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
private void close() { |
||||
innerTx.close(); |
||||
options.close(); |
||||
} |
||||
} |
@ -0,0 +1,115 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_BACKGROUND_THREAD_COUNT; |
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_CACHE_CAPACITY; |
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_MAX_BACKGROUND_COMPACTIONS; |
||||
import static tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions.DEFAULT_MAX_OPEN_FILES; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBCLIOptions; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration; |
||||
|
||||
import org.junit.Test; |
||||
import picocli.CommandLine; |
||||
|
||||
public class RocksDBCLIOptionsTest { |
||||
|
||||
private static final String MAX_OPEN_FILES_FLAG = "--Xrocksdb-max-open-files"; |
||||
private static final String CACHE_CAPACITY_FLAG = "--Xrocksdb-cache-capacity"; |
||||
private static final String MAX_BACKGROUND_COMPACTIONS_FLAG = |
||||
"--Xrocksdb-max-background-compactions"; |
||||
private static final String BACKGROUND_THREAD_COUNT_FLAG = "--Xrocksdb-background-thread-count"; |
||||
|
||||
@Test |
||||
public void defaultValues() { |
||||
final RocksDBCLIOptions options = RocksDBCLIOptions.create(); |
||||
|
||||
new CommandLine(options).parse(); |
||||
|
||||
final RocksDBFactoryConfiguration configuration = options.toDomainObject(); |
||||
assertThat(configuration).isNotNull(); |
||||
assertThat(configuration.getBackgroundThreadCount()).isEqualTo(DEFAULT_BACKGROUND_THREAD_COUNT); |
||||
assertThat(configuration.getCacheCapacity()).isEqualTo(DEFAULT_CACHE_CAPACITY); |
||||
assertThat(configuration.getMaxBackgroundCompactions()) |
||||
.isEqualTo(DEFAULT_MAX_BACKGROUND_COMPACTIONS); |
||||
assertThat(configuration.getMaxOpenFiles()).isEqualTo(DEFAULT_MAX_OPEN_FILES); |
||||
} |
||||
|
||||
@Test |
||||
public void customBackgroundThreadCount() { |
||||
final RocksDBCLIOptions options = RocksDBCLIOptions.create(); |
||||
final int expectedBackgroundThreadCount = 99; |
||||
|
||||
new CommandLine(options) |
||||
.parse(BACKGROUND_THREAD_COUNT_FLAG, "" + expectedBackgroundThreadCount); |
||||
|
||||
final RocksDBFactoryConfiguration configuration = options.toDomainObject(); |
||||
assertThat(configuration).isNotNull(); |
||||
assertThat(configuration.getBackgroundThreadCount()).isEqualTo(expectedBackgroundThreadCount); |
||||
assertThat(configuration.getCacheCapacity()).isEqualTo(DEFAULT_CACHE_CAPACITY); |
||||
assertThat(configuration.getMaxBackgroundCompactions()) |
||||
.isEqualTo(DEFAULT_MAX_BACKGROUND_COMPACTIONS); |
||||
assertThat(configuration.getMaxOpenFiles()).isEqualTo(DEFAULT_MAX_OPEN_FILES); |
||||
} |
||||
|
||||
@Test |
||||
public void customCacheCapacity() { |
||||
final RocksDBCLIOptions options = RocksDBCLIOptions.create(); |
||||
final long expectedCacheCapacity = 400050006000L; |
||||
|
||||
new CommandLine(options).parse(CACHE_CAPACITY_FLAG, "" + expectedCacheCapacity); |
||||
|
||||
final RocksDBFactoryConfiguration configuration = options.toDomainObject(); |
||||
assertThat(configuration).isNotNull(); |
||||
assertThat(configuration.getBackgroundThreadCount()).isEqualTo(DEFAULT_BACKGROUND_THREAD_COUNT); |
||||
assertThat(configuration.getCacheCapacity()).isEqualTo(expectedCacheCapacity); |
||||
assertThat(configuration.getMaxBackgroundCompactions()) |
||||
.isEqualTo(DEFAULT_MAX_BACKGROUND_COMPACTIONS); |
||||
assertThat(configuration.getMaxOpenFiles()).isEqualTo(DEFAULT_MAX_OPEN_FILES); |
||||
} |
||||
|
||||
@Test |
||||
public void customMaxBackgroundCompactions() { |
||||
final RocksDBCLIOptions options = RocksDBCLIOptions.create(); |
||||
final int expectedMaxBackgroundCompactions = 223344; |
||||
|
||||
new CommandLine(options) |
||||
.parse(MAX_BACKGROUND_COMPACTIONS_FLAG, "" + expectedMaxBackgroundCompactions); |
||||
|
||||
final RocksDBFactoryConfiguration configuration = options.toDomainObject(); |
||||
assertThat(configuration).isNotNull(); |
||||
assertThat(configuration.getBackgroundThreadCount()).isEqualTo(DEFAULT_BACKGROUND_THREAD_COUNT); |
||||
assertThat(configuration.getCacheCapacity()).isEqualTo(DEFAULT_CACHE_CAPACITY); |
||||
assertThat(configuration.getMaxBackgroundCompactions()) |
||||
.isEqualTo(expectedMaxBackgroundCompactions); |
||||
assertThat(configuration.getMaxOpenFiles()).isEqualTo(DEFAULT_MAX_OPEN_FILES); |
||||
} |
||||
|
||||
@Test |
||||
public void customMaxOpenFiles() { |
||||
final RocksDBCLIOptions options = RocksDBCLIOptions.create(); |
||||
final int expectedMaxOpenFiles = 65; |
||||
|
||||
new CommandLine(options).parse(MAX_OPEN_FILES_FLAG, "" + expectedMaxOpenFiles); |
||||
|
||||
final RocksDBFactoryConfiguration configuration = options.toDomainObject(); |
||||
assertThat(configuration).isNotNull(); |
||||
assertThat(configuration.getBackgroundThreadCount()).isEqualTo(DEFAULT_BACKGROUND_THREAD_COUNT); |
||||
assertThat(configuration.getCacheCapacity()).isEqualTo(DEFAULT_CACHE_CAPACITY); |
||||
assertThat(configuration.getMaxBackgroundCompactions()) |
||||
.isEqualTo(DEFAULT_MAX_BACKGROUND_COMPACTIONS); |
||||
assertThat(configuration.getMaxOpenFiles()).isEqualTo(expectedMaxOpenFiles); |
||||
} |
||||
} |
@ -0,0 +1,141 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertTrue; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import tech.pegasys.pantheon.metrics.ObservableMetricsSystem; |
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.plugin.services.PantheonConfiguration; |
||||
import tech.pegasys.pantheon.plugin.services.exception.StorageException; |
||||
import tech.pegasys.pantheon.plugin.services.storage.SegmentIdentifier; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.DatabaseMetadata; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration; |
||||
|
||||
import java.nio.charset.Charset; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
import java.util.List; |
||||
|
||||
import org.junit.Rule; |
||||
import org.junit.Test; |
||||
import org.junit.rules.TemporaryFolder; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class RocksDBKeyValueStorageFactoryTest { |
||||
|
||||
private static final String METADATA_FILENAME = "DATABASE_METADATA.json"; |
||||
private static final int DEFAULT_VERSION = 1; |
||||
|
||||
@Mock private RocksDBFactoryConfiguration rocksDbConfiguration; |
||||
@Mock private PantheonConfiguration commonConfiguration; |
||||
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); |
||||
private final ObservableMetricsSystem metricsSystem = new NoOpMetricsSystem(); |
||||
private final List<SegmentIdentifier> segments = List.of(); |
||||
@Mock private SegmentIdentifier segment; |
||||
|
||||
@Test |
||||
public void shouldCreateCorrectMetadataFileForLatestVersion() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
when(commonConfiguration.getStoragePath()).thenReturn(tempDatabaseDir); |
||||
|
||||
final RocksDBKeyValueStorageFactory storageFactory = |
||||
new RocksDBKeyValueStorageFactory(() -> rocksDbConfiguration, segments); |
||||
|
||||
// Side effect is creation of the Metadata version file
|
||||
storageFactory.create(() -> "block-chain", commonConfiguration, metricsSystem); |
||||
|
||||
assertEquals( |
||||
DEFAULT_VERSION, |
||||
DatabaseMetadata.fromDirectory(commonConfiguration.getStoragePath()).getVersion()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldDetectVersion0DatabaseIfNoMetadataFileFound() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
when(commonConfiguration.getStoragePath()).thenReturn(tempDatabaseDir); |
||||
|
||||
final RocksDBKeyValueStorageFactory storageFactory = |
||||
new RocksDBKeyValueStorageFactory(() -> rocksDbConfiguration, segments); |
||||
|
||||
storageFactory.create(segment, commonConfiguration, metricsSystem); |
||||
|
||||
assertEquals(0, DatabaseMetadata.fromDirectory(tempDatabaseDir).getVersion()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldDetectCorrectVersionIfMetadataFileExists() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
new DatabaseMetadata(DEFAULT_VERSION).writeToDirectory(tempDatabaseDir); |
||||
when(commonConfiguration.getStoragePath()).thenReturn(tempDatabaseDir); |
||||
final RocksDBKeyValueStorageFactory storageFactory = |
||||
new RocksDBKeyValueStorageFactory(() -> rocksDbConfiguration, segments); |
||||
|
||||
storageFactory.create(() -> "block-chain", commonConfiguration, metricsSystem); |
||||
|
||||
assertEquals(DEFAULT_VERSION, DatabaseMetadata.fromDirectory(tempDatabaseDir).getVersion()); |
||||
assertTrue(storageFactory.isSegmentIsolationSupported()); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldThrowExceptionWhenVersionNumberIsInvalid() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
new DatabaseMetadata(-1).writeToDirectory(tempDatabaseDir); |
||||
when(commonConfiguration.getStoragePath()).thenReturn(tempDatabaseDir); |
||||
|
||||
assertThatThrownBy( |
||||
() -> |
||||
new RocksDBKeyValueStorageFactory(() -> rocksDbConfiguration, segments) |
||||
.create(() -> "segment-does-not-matter", commonConfiguration, metricsSystem)) |
||||
.isInstanceOf(StorageException.class); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldThrowExceptionWhenMetaDataFileIsCorrupted() throws Exception { |
||||
final Path tempDatabaseDir = temporaryFolder.newFolder().toPath().resolve("db"); |
||||
Files.createDirectories(tempDatabaseDir); |
||||
when(commonConfiguration.getStoragePath()).thenReturn(tempDatabaseDir); |
||||
tempDatabaseDir.resolve("IDENTITY").toFile().createNewFile(); |
||||
final String badVersion = "{\"🦄\":1}"; |
||||
Files.write( |
||||
tempDatabaseDir.resolve(METADATA_FILENAME), badVersion.getBytes(Charset.defaultCharset())); |
||||
|
||||
assertThatThrownBy( |
||||
() -> |
||||
new RocksDBKeyValueStorageFactory(() -> rocksDbConfiguration, segments) |
||||
.create(() -> "bad-version", commonConfiguration, metricsSystem)) |
||||
.isInstanceOf(IllegalStateException.class); |
||||
|
||||
final String badValue = "{\"version\":\"iomedae\"}"; |
||||
Files.write( |
||||
tempDatabaseDir.resolve(METADATA_FILENAME), badValue.getBytes(Charset.defaultCharset())); |
||||
|
||||
assertThatThrownBy( |
||||
() -> |
||||
new RocksDBKeyValueStorageFactory(() -> rocksDbConfiguration, segments) |
||||
.create(() -> "bad-value", commonConfiguration, metricsSystem)) |
||||
.isInstanceOf(IllegalStateException.class); |
||||
} |
||||
} |
@ -0,0 +1,130 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugin.services.storage.rocksdb.segmented; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.anyString; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import tech.pegasys.pantheon.kvstore.AbstractKeyValueStorageTest; |
||||
import tech.pegasys.pantheon.metrics.ObservableMetricsSystem; |
||||
import tech.pegasys.pantheon.metrics.PantheonMetricCategory; |
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.plugin.services.metrics.Counter; |
||||
import tech.pegasys.pantheon.plugin.services.metrics.LabelledMetric; |
||||
import tech.pegasys.pantheon.plugin.services.metrics.OperationTimer; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorage; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder; |
||||
import tech.pegasys.pantheon.plugin.services.storage.rocksdb.unsegmented.RocksDBKeyValueStorage; |
||||
|
||||
import java.util.function.LongSupplier; |
||||
|
||||
import org.junit.Rule; |
||||
import org.junit.Test; |
||||
import org.junit.rules.TemporaryFolder; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class RocksDBKeyValueStorageTest extends AbstractKeyValueStorageTest { |
||||
|
||||
@Mock private ObservableMetricsSystem metricsSystemMock; |
||||
@Mock private LabelledMetric<OperationTimer> labelledMetricOperationTimerMock; |
||||
@Mock private LabelledMetric<Counter> labelledMetricCounterMock; |
||||
@Mock private OperationTimer operationTimerMock; |
||||
@Rule public final TemporaryFolder folder = new TemporaryFolder(); |
||||
|
||||
@Override |
||||
protected KeyValueStorage createStore() throws Exception { |
||||
return new RocksDBKeyValueStorage(config(), new NoOpMetricsSystem()); |
||||
} |
||||
|
||||
@Test |
||||
public void createStoreMustCreateMetrics() throws Exception { |
||||
// Prepare mocks
|
||||
when(labelledMetricOperationTimerMock.labels(any())).thenReturn(operationTimerMock); |
||||
when(metricsSystemMock.createLabelledTimer( |
||||
eq(PantheonMetricCategory.KVSTORE_ROCKSDB), anyString(), anyString(), any())) |
||||
.thenReturn(labelledMetricOperationTimerMock); |
||||
when(metricsSystemMock.createLabelledCounter( |
||||
eq(PantheonMetricCategory.KVSTORE_ROCKSDB), anyString(), anyString(), any())) |
||||
.thenReturn(labelledMetricCounterMock); |
||||
// Prepare argument captors
|
||||
final ArgumentCaptor<String> labelledTimersMetricsNameArgs = |
||||
ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> labelledTimersHelpArgs = ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> labelledCountersMetricsNameArgs = |
||||
ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> labelledCountersHelpArgs = ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> longGaugesMetricsNameArgs = ArgumentCaptor.forClass(String.class); |
||||
final ArgumentCaptor<String> longGaugesHelpArgs = ArgumentCaptor.forClass(String.class); |
||||
|
||||
// Actual call
|
||||
final KeyValueStorage keyValueStorage = new RocksDBKeyValueStorage(config(), metricsSystemMock); |
||||
|
||||
// Assertions
|
||||
assertThat(keyValueStorage).isNotNull(); |
||||
verify(metricsSystemMock, times(4)) |
||||
.createLabelledTimer( |
||||
eq(PantheonMetricCategory.KVSTORE_ROCKSDB), |
||||
labelledTimersMetricsNameArgs.capture(), |
||||
labelledTimersHelpArgs.capture(), |
||||
any()); |
||||
assertThat(labelledTimersMetricsNameArgs.getAllValues()) |
||||
.containsExactly( |
||||
"read_latency_seconds", |
||||
"remove_latency_seconds", |
||||
"write_latency_seconds", |
||||
"commit_latency_seconds"); |
||||
assertThat(labelledTimersHelpArgs.getAllValues()) |
||||
.containsExactly( |
||||
"Latency for read from RocksDB.", |
||||
"Latency of remove requests from RocksDB.", |
||||
"Latency for write to RocksDB.", |
||||
"Latency for commits to RocksDB."); |
||||
|
||||
verify(metricsSystemMock, times(2)) |
||||
.createLongGauge( |
||||
eq(PantheonMetricCategory.KVSTORE_ROCKSDB), |
||||
longGaugesMetricsNameArgs.capture(), |
||||
longGaugesHelpArgs.capture(), |
||||
any(LongSupplier.class)); |
||||
assertThat(longGaugesMetricsNameArgs.getAllValues()) |
||||
.containsExactly("rocks_db_table_readers_memory_bytes", "rocks_db_files_size_bytes"); |
||||
assertThat(longGaugesHelpArgs.getAllValues()) |
||||
.containsExactly( |
||||
"Estimated memory used for RocksDB index and filter blocks in bytes", |
||||
"Estimated database size in bytes"); |
||||
|
||||
verify(metricsSystemMock) |
||||
.createLabelledCounter( |
||||
eq(PantheonMetricCategory.KVSTORE_ROCKSDB), |
||||
labelledCountersMetricsNameArgs.capture(), |
||||
labelledCountersHelpArgs.capture(), |
||||
any()); |
||||
assertThat(labelledCountersMetricsNameArgs.getValue()).isEqualTo("rollback_count"); |
||||
assertThat(labelledCountersHelpArgs.getValue()) |
||||
.isEqualTo("Number of RocksDB transactions rolled back."); |
||||
} |
||||
|
||||
private RocksDBConfiguration config() throws Exception { |
||||
return new RocksDBConfigurationBuilder().databaseDir(folder.newFolder().toPath()).build(); |
||||
} |
||||
} |
@ -1,126 +0,0 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import static com.google.common.base.Preconditions.checkState; |
||||
|
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
|
||||
import java.io.Closeable; |
||||
import java.util.Optional; |
||||
import java.util.function.Predicate; |
||||
|
||||
/** Service provided by pantheon to facilitate persistent data storage. */ |
||||
public interface KeyValueStorage extends Closeable { |
||||
|
||||
void clear(); |
||||
|
||||
default boolean containsKey(final BytesValue key) throws StorageException { |
||||
return get(key).isPresent(); |
||||
} |
||||
|
||||
/** |
||||
* @param key Index into persistent data repository. |
||||
* @return The value persisted at the key index. |
||||
*/ |
||||
Optional<BytesValue> get(BytesValue key) throws StorageException; |
||||
|
||||
long removeUnless(Predicate<BytesValue> inUseCheck); |
||||
|
||||
/** |
||||
* Begins a transaction. Returns a transaction object that can be updated and committed. |
||||
* |
||||
* @return An object representing the transaction. |
||||
*/ |
||||
Transaction startTransaction() throws StorageException; |
||||
|
||||
class StorageException extends RuntimeException { |
||||
public StorageException(final Throwable t) { |
||||
super(t); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Represents a set of changes to be committed atomically. A single transaction is not |
||||
* thread-safe, but multiple transactions can execute concurrently. |
||||
*/ |
||||
interface Transaction { |
||||
|
||||
/** |
||||
* Add the given key-value pair to the set of updates to be committed. |
||||
* |
||||
* @param key The key to set / modify. |
||||
* @param value The value to be set. |
||||
*/ |
||||
void put(BytesValue key, BytesValue value); |
||||
|
||||
/** |
||||
* Schedules the given key to be deleted from storage. |
||||
* |
||||
* @param key The key to delete |
||||
*/ |
||||
void remove(BytesValue key); |
||||
|
||||
/** |
||||
* Atomically commit the set of changes contained in this transaction to the underlying |
||||
* key-value storage from which this transaction was started. After committing, the transaction |
||||
* is no longer usable and will throw exceptions if modifications are attempted. |
||||
*/ |
||||
void commit() throws StorageException; |
||||
|
||||
/** |
||||
* Cancel this transaction. After rolling back, the transaction is no longer usable and will |
||||
* throw exceptions if modifications are attempted. |
||||
*/ |
||||
void rollback(); |
||||
} |
||||
|
||||
abstract class AbstractTransaction implements Transaction { |
||||
|
||||
private boolean active = true; |
||||
|
||||
@Override |
||||
public final void put(final BytesValue key, final BytesValue value) { |
||||
checkState(active, "Cannot invoke put() on a completed transaction."); |
||||
doPut(key, value); |
||||
} |
||||
|
||||
@Override |
||||
public final void remove(final BytesValue key) { |
||||
checkState(active, "Cannot invoke remove() on a completed transaction."); |
||||
doRemove(key); |
||||
} |
||||
|
||||
@Override |
||||
public final void commit() throws StorageException { |
||||
checkState(active, "Cannot commit a completed transaction."); |
||||
active = false; |
||||
doCommit(); |
||||
} |
||||
|
||||
@Override |
||||
public final void rollback() { |
||||
checkState(active, "Cannot rollback a completed transaction."); |
||||
active = false; |
||||
doRollback(); |
||||
} |
||||
|
||||
protected abstract void doPut(BytesValue key, BytesValue value); |
||||
|
||||
protected abstract void doRemove(BytesValue key); |
||||
|
||||
protected abstract void doCommit() throws StorageException; |
||||
|
||||
protected abstract void doRollback(); |
||||
} |
||||
} |
@ -0,0 +1,56 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import static com.google.common.base.Preconditions.checkState; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.exception.StorageException; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorageTransaction; |
||||
|
||||
public class KeyValueStorageTransactionTransitionValidatorDecorator |
||||
implements KeyValueStorageTransaction { |
||||
|
||||
private final KeyValueStorageTransaction transaction; |
||||
private boolean active = true; |
||||
|
||||
public KeyValueStorageTransactionTransitionValidatorDecorator( |
||||
final KeyValueStorageTransaction toDecorate) { |
||||
this.transaction = toDecorate; |
||||
} |
||||
|
||||
@Override |
||||
public void put(final byte[] key, final byte[] value) { |
||||
checkState(active, "Cannot invoke put() on a completed transaction."); |
||||
transaction.put(key, value); |
||||
} |
||||
|
||||
@Override |
||||
public void remove(final byte[] key) { |
||||
checkState(active, "Cannot invoke remove() on a completed transaction."); |
||||
transaction.remove(key); |
||||
} |
||||
|
||||
@Override |
||||
public final void commit() throws StorageException { |
||||
checkState(active, "Cannot commit a completed transaction."); |
||||
active = false; |
||||
transaction.commit(); |
||||
} |
||||
|
||||
@Override |
||||
public final void rollback() { |
||||
checkState(active, "Cannot rollback a completed transaction."); |
||||
active = false; |
||||
transaction.rollback(); |
||||
} |
||||
} |
@ -1,128 +0,0 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import tech.pegasys.pantheon.services.util.RocksDbUtil; |
||||
|
||||
import java.nio.file.Path; |
||||
|
||||
public class RocksDbConfiguration { |
||||
public static final int DEFAULT_MAX_OPEN_FILES = 1024; |
||||
public static final long DEFAULT_CACHE_CAPACITY = 8388608; |
||||
public static final int DEFAULT_MAX_BACKGROUND_COMPACTIONS = 4; |
||||
public static final int DEFAULT_BACKGROUND_THREAD_COUNT = 4; |
||||
|
||||
private final Path databaseDir; |
||||
private final int maxOpenFiles; |
||||
private final String label; |
||||
private final int maxBackgroundCompactions; |
||||
private final int backgroundThreadCount; |
||||
private final long cacheCapacity; |
||||
|
||||
private RocksDbConfiguration( |
||||
final Path databaseDir, |
||||
final int maxOpenFiles, |
||||
final int maxBackgroundCompactions, |
||||
final int backgroundThreadCount, |
||||
final long cacheCapacity, |
||||
final String label) { |
||||
this.maxBackgroundCompactions = maxBackgroundCompactions; |
||||
this.backgroundThreadCount = backgroundThreadCount; |
||||
RocksDbUtil.loadNativeLibrary(); |
||||
this.databaseDir = databaseDir; |
||||
this.maxOpenFiles = maxOpenFiles; |
||||
this.cacheCapacity = cacheCapacity; |
||||
this.label = label; |
||||
} |
||||
|
||||
public static Builder builder() { |
||||
return new Builder(); |
||||
} |
||||
|
||||
public Path getDatabaseDir() { |
||||
return databaseDir; |
||||
} |
||||
|
||||
public int getMaxOpenFiles() { |
||||
return maxOpenFiles; |
||||
} |
||||
|
||||
public int getMaxBackgroundCompactions() { |
||||
return maxBackgroundCompactions; |
||||
} |
||||
|
||||
public int getBackgroundThreadCount() { |
||||
return backgroundThreadCount; |
||||
} |
||||
|
||||
public long getCacheCapacity() { |
||||
return cacheCapacity; |
||||
} |
||||
|
||||
public String getLabel() { |
||||
return label; |
||||
} |
||||
|
||||
public static class Builder { |
||||
|
||||
Path databaseDir; |
||||
String label = "blockchain"; |
||||
|
||||
int maxOpenFiles = DEFAULT_MAX_OPEN_FILES; |
||||
long cacheCapacity = DEFAULT_CACHE_CAPACITY; |
||||
int maxBackgroundCompactions = DEFAULT_MAX_BACKGROUND_COMPACTIONS; |
||||
int backgroundThreadCount = DEFAULT_BACKGROUND_THREAD_COUNT; |
||||
|
||||
private Builder() {} |
||||
|
||||
public Builder databaseDir(final Path databaseDir) { |
||||
this.databaseDir = databaseDir; |
||||
return this; |
||||
} |
||||
|
||||
public Builder maxOpenFiles(final int maxOpenFiles) { |
||||
this.maxOpenFiles = maxOpenFiles; |
||||
return this; |
||||
} |
||||
|
||||
public Builder label(final String label) { |
||||
this.label = label; |
||||
return this; |
||||
} |
||||
|
||||
public Builder cacheCapacity(final long cacheCapacity) { |
||||
this.cacheCapacity = cacheCapacity; |
||||
return this; |
||||
} |
||||
|
||||
public Builder maxBackgroundCompactions(final int maxBackgroundCompactions) { |
||||
this.maxBackgroundCompactions = maxBackgroundCompactions; |
||||
return this; |
||||
} |
||||
|
||||
public Builder backgroundThreadCount(final int backgroundThreadCount) { |
||||
this.backgroundThreadCount = backgroundThreadCount; |
||||
return this; |
||||
} |
||||
|
||||
public RocksDbConfiguration build() { |
||||
return new RocksDbConfiguration( |
||||
databaseDir, |
||||
maxOpenFiles, |
||||
maxBackgroundCompactions, |
||||
backgroundThreadCount, |
||||
cacheCapacity, |
||||
label); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,56 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import static com.google.common.base.Preconditions.checkState; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.exception.StorageException; |
||||
import tech.pegasys.pantheon.services.kvstore.SegmentedKeyValueStorage.Transaction; |
||||
|
||||
public class SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<S> |
||||
implements Transaction<S> { |
||||
|
||||
private final Transaction<S> transaction; |
||||
private boolean active = true; |
||||
|
||||
public SegmentedKeyValueStorageTransactionTransitionValidatorDecorator( |
||||
final Transaction<S> toDecorate) { |
||||
this.transaction = toDecorate; |
||||
} |
||||
|
||||
@Override |
||||
public final void put(final S segment, final byte[] key, final byte[] value) { |
||||
checkState(active, "Cannot invoke put() on a completed transaction."); |
||||
transaction.put(segment, key, value); |
||||
} |
||||
|
||||
@Override |
||||
public final void remove(final S segment, final byte[] key) { |
||||
checkState(active, "Cannot invoke remove() on a completed transaction."); |
||||
transaction.remove(segment, key); |
||||
} |
||||
|
||||
@Override |
||||
public final void commit() throws StorageException { |
||||
checkState(active, "Cannot commit a completed transaction."); |
||||
active = false; |
||||
transaction.commit(); |
||||
} |
||||
|
||||
@Override |
||||
public final void rollback() { |
||||
checkState(active, "Cannot rollback a completed transaction."); |
||||
active = false; |
||||
transaction.rollback(); |
||||
} |
||||
} |
@ -1,384 +0,0 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services.kvstore; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage.Transaction; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValues; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.function.Function; |
||||
|
||||
import org.junit.Ignore; |
||||
import org.junit.Test; |
||||
|
||||
@Ignore |
||||
public abstract class AbstractKeyValueStorageTest { |
||||
|
||||
protected abstract KeyValueStorage createStore() throws Exception; |
||||
|
||||
@Test |
||||
public void twoStoresAreIndependent() throws Exception { |
||||
final KeyValueStorage store1 = createStore(); |
||||
final KeyValueStorage store2 = createStore(); |
||||
|
||||
Transaction tx = store1.startTransaction(); |
||||
tx.put(BytesValue.fromHexString("0001"), BytesValue.fromHexString("0FFF")); |
||||
tx.commit(); |
||||
final Optional<BytesValue> result = store2.get(BytesValue.fromHexString("0001")); |
||||
assertThat(result).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void put() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
|
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC")); |
||||
tx.commit(); |
||||
assertThat(store.get(BytesValue.fromHexString("0F"))) |
||||
.contains(BytesValue.fromHexString("0ABC")); |
||||
|
||||
tx = store.startTransaction(); |
||||
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0DEF")); |
||||
tx.commit(); |
||||
assertThat(store.get(BytesValue.fromHexString("0F"))) |
||||
.contains(BytesValue.fromHexString("0DEF")); |
||||
} |
||||
|
||||
@Test |
||||
public void removeUnless() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC")); |
||||
tx.put(BytesValue.fromHexString("10"), BytesValue.fromHexString("0ABC")); |
||||
tx.put(BytesValue.fromHexString("11"), BytesValue.fromHexString("0ABC")); |
||||
tx.put(BytesValue.fromHexString("12"), BytesValue.fromHexString("0ABC")); |
||||
tx.commit(); |
||||
store.removeUnless(bv -> bv.toString().contains("1")); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("0F"))).isFalse(); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("10"))).isTrue(); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("11"))).isTrue(); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("12"))).isTrue(); |
||||
} |
||||
|
||||
@Test |
||||
public void clearRemovesAll() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC")); |
||||
tx.put(BytesValue.fromHexString("10"), BytesValue.fromHexString("0ABC")); |
||||
tx.put(BytesValue.fromHexString("11"), BytesValue.fromHexString("0ABC")); |
||||
tx.put(BytesValue.fromHexString("12"), BytesValue.fromHexString("0ABC")); |
||||
tx.commit(); |
||||
store.clear(); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("0F"))).isFalse(); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("10"))).isFalse(); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("11"))).isFalse(); |
||||
assertThat(store.containsKey(BytesValue.fromHexString("12"))).isFalse(); |
||||
} |
||||
|
||||
@Test |
||||
public void containsKey() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final BytesValue key = BytesValue.fromHexString("ABCD"); |
||||
|
||||
assertThat(store.containsKey(key)).isFalse(); |
||||
|
||||
final Transaction transaction = store.startTransaction(); |
||||
transaction.put(key, BytesValue.fromHexString("DEFF")); |
||||
transaction.commit(); |
||||
|
||||
assertThat(store.containsKey(key)).isTrue(); |
||||
} |
||||
|
||||
@Test |
||||
public void removeExisting() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC")); |
||||
tx.commit(); |
||||
tx = store.startTransaction(); |
||||
tx.remove(BytesValue.fromHexString("0F")); |
||||
tx.commit(); |
||||
assertThat(store.get(BytesValue.fromHexString("0F"))).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void removeExistingSameTransaction() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValue.fromHexString("0F"), BytesValue.fromHexString("0ABC")); |
||||
tx.remove(BytesValue.fromHexString("0F")); |
||||
tx.commit(); |
||||
assertThat(store.get(BytesValue.fromHexString("0F"))).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void removeNonExistent() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
Transaction tx = store.startTransaction(); |
||||
tx.remove(BytesValue.fromHexString("0F")); |
||||
tx.commit(); |
||||
assertThat(store.get(BytesValue.fromHexString("0F"))).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void concurrentUpdate() throws Exception { |
||||
final int keyCount = 1000; |
||||
final KeyValueStorage store = createStore(); |
||||
|
||||
final CountDownLatch finishedLatch = new CountDownLatch(2); |
||||
final Function<BytesValue, Thread> updater = |
||||
(value) -> |
||||
new Thread( |
||||
() -> { |
||||
try { |
||||
for (int i = 0; i < keyCount; i++) { |
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValues.toMinimalBytes(i), value); |
||||
tx.commit(); |
||||
} |
||||
} finally { |
||||
finishedLatch.countDown(); |
||||
} |
||||
}); |
||||
|
||||
// Run 2 concurrent transactions that write a bunch of values to the same keys
|
||||
final BytesValue a = BytesValue.of(10); |
||||
final BytesValue b = BytesValue.of(20); |
||||
updater.apply(a).start(); |
||||
updater.apply(b).start(); |
||||
|
||||
finishedLatch.await(); |
||||
|
||||
for (int i = 0; i < keyCount; i++) { |
||||
final BytesValue key = BytesValues.toMinimalBytes(i); |
||||
final BytesValue actual = store.get(key).get(); |
||||
assertThat(actual.equals(a) || actual.equals(b)).isTrue(); |
||||
} |
||||
|
||||
store.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
// Add some values
|
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValue.of(1), BytesValue.of(1)); |
||||
tx.put(BytesValue.of(2), BytesValue.of(2)); |
||||
tx.put(BytesValue.of(3), BytesValue.of(3)); |
||||
tx.commit(); |
||||
|
||||
// Start transaction that adds, modifies, and removes some values
|
||||
tx = store.startTransaction(); |
||||
tx.put(BytesValue.of(2), BytesValue.of(3)); |
||||
tx.put(BytesValue.of(2), BytesValue.of(4)); |
||||
tx.remove(BytesValue.of(3)); |
||||
tx.put(BytesValue.of(4), BytesValue.of(8)); |
||||
|
||||
// Check values before committing have not changed
|
||||
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1)); |
||||
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2)); |
||||
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3)); |
||||
assertThat(store.get(BytesValue.of(4))).isEmpty(); |
||||
|
||||
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1)); |
||||
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2)); |
||||
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3)); |
||||
assertThat(store.get(BytesValue.of(4))).isEmpty(); |
||||
|
||||
tx.commit(); |
||||
|
||||
// Check that values have been updated after commit
|
||||
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1)); |
||||
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(4)); |
||||
assertThat(store.get(BytesValue.of(3))).isEmpty(); |
||||
assertThat(store.get(BytesValue.of(4))).contains(BytesValue.of(8)); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
// Add some values
|
||||
Transaction tx = store.startTransaction(); |
||||
tx.put(BytesValue.of(1), BytesValue.of(1)); |
||||
tx.put(BytesValue.of(2), BytesValue.of(2)); |
||||
tx.put(BytesValue.of(3), BytesValue.of(3)); |
||||
tx.commit(); |
||||
|
||||
// Start transaction that adds, modifies, and removes some values
|
||||
tx = store.startTransaction(); |
||||
tx.put(BytesValue.of(2), BytesValue.of(3)); |
||||
tx.put(BytesValue.of(2), BytesValue.of(4)); |
||||
tx.remove(BytesValue.of(3)); |
||||
tx.put(BytesValue.of(4), BytesValue.of(8)); |
||||
|
||||
// Check values before committing have not changed
|
||||
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1)); |
||||
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2)); |
||||
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3)); |
||||
assertThat(store.get(BytesValue.of(4))).isEmpty(); |
||||
|
||||
tx.rollback(); |
||||
|
||||
// Check that values have not changed after rollback
|
||||
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1)); |
||||
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2)); |
||||
assertThat(store.get(BytesValue.of(3))).contains(BytesValue.of(3)); |
||||
assertThat(store.get(BytesValue.of(4))).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionCommitEmpty() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionRollbackEmpty() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionPutAfterCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.put(BytesValue.of(1), BytesValue.of(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRemoveAfterCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.remove(BytesValue.of(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionPutAfterRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.put(BytesValue.of(1), BytesValue.of(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRemoveAfterRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.remove(BytesValue.of(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionCommitAfterRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.commit(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionCommitTwice() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.commit(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRollbackAfterCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.rollback(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRollbackTwice() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final Transaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.rollback(); |
||||
} |
||||
|
||||
@Test |
||||
public void twoTransactions() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
|
||||
final Transaction tx1 = store.startTransaction(); |
||||
final Transaction tx2 = store.startTransaction(); |
||||
|
||||
tx1.put(BytesValue.of(1), BytesValue.of(1)); |
||||
tx2.put(BytesValue.of(2), BytesValue.of(2)); |
||||
|
||||
tx1.commit(); |
||||
tx2.commit(); |
||||
|
||||
assertThat(store.get(BytesValue.of(1))).contains(BytesValue.of(1)); |
||||
assertThat(store.get(BytesValue.of(2))).contains(BytesValue.of(2)); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionIsolation() throws Exception { |
||||
final int keyCount = 1000; |
||||
final KeyValueStorage store = createStore(); |
||||
|
||||
final CountDownLatch finishedLatch = new CountDownLatch(2); |
||||
final Function<BytesValue, Thread> txRunner = |
||||
(value) -> |
||||
new Thread( |
||||
() -> { |
||||
final Transaction tx = store.startTransaction(); |
||||
for (int i = 0; i < keyCount; i++) { |
||||
tx.put(BytesValues.toMinimalBytes(i), value); |
||||
} |
||||
try { |
||||
tx.commit(); |
||||
} finally { |
||||
finishedLatch.countDown(); |
||||
} |
||||
}); |
||||
|
||||
// Run 2 concurrent transactions that write a bunch of values to the same keys
|
||||
final BytesValue a = BytesValue.of(10); |
||||
final BytesValue b = BytesValue.of(20); |
||||
txRunner.apply(a).start(); |
||||
txRunner.apply(b).start(); |
||||
|
||||
finishedLatch.await(); |
||||
|
||||
// Check that transaction results are isolated (not interleaved)
|
||||
final BytesValue[] finalValues = new BytesValue[keyCount]; |
||||
final BytesValue[] expectedValues = new BytesValue[keyCount]; |
||||
for (int i = 0; i < keyCount; i++) { |
||||
final BytesValue key = BytesValues.toMinimalBytes(i); |
||||
finalValues[i] = store.get(key).get(); |
||||
} |
||||
Arrays.fill(expectedValues, 0, keyCount, finalValues[0]); |
||||
assertThat(finalValues).containsExactly(expectedValues); |
||||
assertThat(finalValues[0].equals(a) || finalValues[0].equals(b)).isTrue(); |
||||
|
||||
store.close(); |
||||
} |
||||
} |
@ -0,0 +1,398 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.kvstore; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.junit.Assert.assertArrayEquals; |
||||
import static org.junit.Assert.assertTrue; |
||||
|
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorage; |
||||
import tech.pegasys.pantheon.plugin.services.storage.KeyValueStorageTransaction; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValues; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.function.Function; |
||||
|
||||
import org.junit.Ignore; |
||||
import org.junit.Test; |
||||
|
||||
@Ignore |
||||
public abstract class AbstractKeyValueStorageTest { |
||||
|
||||
protected abstract KeyValueStorage createStore() throws Exception; |
||||
|
||||
@Test |
||||
public void twoStoresAreIndependent() throws Exception { |
||||
final KeyValueStorage store1 = createStore(); |
||||
final KeyValueStorage store2 = createStore(); |
||||
|
||||
final KeyValueStorageTransaction tx = store1.startTransaction(); |
||||
final byte[] key = bytesFromHexString("0001"); |
||||
final byte[] value = bytesFromHexString("0FFF"); |
||||
|
||||
tx.put(key, value); |
||||
tx.commit(); |
||||
|
||||
final Optional<byte[]> result = store2.get(key); |
||||
assertThat(result).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void put() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final byte[] key = bytesFromHexString("0F"); |
||||
final byte[] firstValue = bytesFromHexString("0ABC"); |
||||
final byte[] secondValue = bytesFromHexString("0DEF"); |
||||
|
||||
KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.put(key, firstValue); |
||||
tx.commit(); |
||||
assertThat(store.get(key)).contains(firstValue); |
||||
|
||||
tx = store.startTransaction(); |
||||
tx.put(key, secondValue); |
||||
tx.commit(); |
||||
assertThat(store.get(key)).contains(secondValue); |
||||
} |
||||
|
||||
@Test |
||||
public void removeUnless() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.put(bytesFromHexString("0F"), bytesFromHexString("0ABC")); |
||||
tx.put(bytesFromHexString("10"), bytesFromHexString("0ABC")); |
||||
tx.put(bytesFromHexString("11"), bytesFromHexString("0ABC")); |
||||
tx.put(bytesFromHexString("12"), bytesFromHexString("0ABC")); |
||||
tx.commit(); |
||||
store.removeAllKeysUnless(bv -> BytesValue.wrap(bv).toString().contains("1")); |
||||
assertThat(store.containsKey(bytesFromHexString("0F"))).isFalse(); |
||||
assertThat(store.containsKey(bytesFromHexString("10"))).isTrue(); |
||||
assertThat(store.containsKey(bytesFromHexString("11"))).isTrue(); |
||||
assertThat(store.containsKey(bytesFromHexString("12"))).isTrue(); |
||||
} |
||||
|
||||
@Test |
||||
public void containsKey() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final byte[] key = bytesFromHexString("ABCD"); |
||||
final byte[] value = bytesFromHexString("DEFF"); |
||||
|
||||
assertThat(store.containsKey(key)).isFalse(); |
||||
|
||||
final KeyValueStorageTransaction transaction = store.startTransaction(); |
||||
transaction.put(key, value); |
||||
transaction.commit(); |
||||
|
||||
assertThat(store.containsKey(key)).isTrue(); |
||||
} |
||||
|
||||
@Test |
||||
public void removeExisting() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final byte[] key = bytesFromHexString("0F"); |
||||
final byte[] value = bytesFromHexString("0ABC"); |
||||
|
||||
KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.put(key, value); |
||||
tx.commit(); |
||||
|
||||
tx = store.startTransaction(); |
||||
tx.remove(key); |
||||
tx.commit(); |
||||
assertThat(store.get(key)).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void removeExistingSameTransaction() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final byte[] key = bytesFromHexString("0F"); |
||||
final byte[] value = bytesFromHexString("0ABC"); |
||||
|
||||
KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.put(key, value); |
||||
tx.remove(key); |
||||
tx.commit(); |
||||
assertThat(store.get(key)).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void removeNonExistent() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final byte[] key = bytesFromHexString("0F"); |
||||
|
||||
KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.remove(key); |
||||
tx.commit(); |
||||
assertThat(store.get(key)).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void concurrentUpdate() throws Exception { |
||||
final int keyCount = 1000; |
||||
final KeyValueStorage store = createStore(); |
||||
|
||||
final CountDownLatch finishedLatch = new CountDownLatch(2); |
||||
final Function<byte[], Thread> updater = |
||||
(value) -> |
||||
new Thread( |
||||
() -> { |
||||
try { |
||||
for (int i = 0; i < keyCount; i++) { |
||||
KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.put(BytesValues.toMinimalBytes(i).getArrayUnsafe(), value); |
||||
tx.commit(); |
||||
} |
||||
} finally { |
||||
finishedLatch.countDown(); |
||||
} |
||||
}); |
||||
|
||||
// Run 2 concurrent transactions that write a bunch of values to the same keys
|
||||
final byte[] a = BytesValue.of(10).getArrayUnsafe(); |
||||
final byte[] b = BytesValue.of(20).getArrayUnsafe(); |
||||
updater.apply(a).start(); |
||||
updater.apply(b).start(); |
||||
|
||||
finishedLatch.await(); |
||||
|
||||
for (int i = 0; i < keyCount; i++) { |
||||
final byte[] key = BytesValues.toMinimalBytes(i).getArrayUnsafe(); |
||||
final byte[] actual = store.get(key).get(); |
||||
assertTrue(Arrays.equals(actual, a) || Arrays.equals(actual, b)); |
||||
} |
||||
|
||||
store.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
// Add some values
|
||||
KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.put(bytesOf(1), bytesOf(1)); |
||||
tx.put(bytesOf(2), bytesOf(2)); |
||||
tx.put(bytesOf(3), bytesOf(3)); |
||||
tx.commit(); |
||||
|
||||
// Start transaction that adds, modifies, and removes some values
|
||||
tx = store.startTransaction(); |
||||
tx.put(bytesOf(2), bytesOf(3)); |
||||
tx.put(bytesOf(2), bytesOf(4)); |
||||
tx.remove(bytesOf(3)); |
||||
tx.put(bytesOf(4), bytesOf(8)); |
||||
|
||||
// Check values before committing have not changed
|
||||
assertThat(store.get(bytesOf(1))).contains(bytesOf(1)); |
||||
assertThat(store.get(bytesOf(2))).contains(bytesOf(2)); |
||||
assertThat(store.get(bytesOf(3))).contains(bytesOf(3)); |
||||
assertThat(store.get(bytesOf(4))).isEmpty(); |
||||
|
||||
tx.commit(); |
||||
|
||||
// Check that values have been updated after commit
|
||||
assertThat(store.get(bytesOf(1))).contains(bytesOf(1)); |
||||
assertThat(store.get(bytesOf(2))).contains(bytesOf(4)); |
||||
assertThat(store.get(bytesOf(3))).isEmpty(); |
||||
assertThat(store.get(bytesOf(4))).contains(bytesOf(8)); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
// Add some values
|
||||
KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.put(bytesOf(1), bytesOf(1)); |
||||
tx.put(bytesOf(2), bytesOf(2)); |
||||
tx.put(bytesOf(3), bytesOf(3)); |
||||
tx.commit(); |
||||
|
||||
// Start transaction that adds, modifies, and removes some values
|
||||
tx = store.startTransaction(); |
||||
tx.put(bytesOf(2), bytesOf(3)); |
||||
tx.put(bytesOf(2), bytesOf(4)); |
||||
tx.remove(bytesOf(3)); |
||||
tx.put(bytesOf(4), bytesOf(8)); |
||||
|
||||
// Check values before committing have not changed
|
||||
assertThat(store.get(bytesOf(1))).contains(bytesOf(1)); |
||||
assertThat(store.get(bytesOf(2))).contains(bytesOf(2)); |
||||
assertThat(store.get(bytesOf(3))).contains(bytesOf(3)); |
||||
assertThat(store.get(bytesOf(4))).isEmpty(); |
||||
|
||||
tx.rollback(); |
||||
|
||||
// Check that values have not changed after rollback
|
||||
assertThat(store.get(bytesOf(1))).contains(bytesOf(1)); |
||||
assertThat(store.get(bytesOf(2))).contains(bytesOf(2)); |
||||
assertThat(store.get(bytesOf(3))).contains(bytesOf(3)); |
||||
assertThat(store.get(bytesOf(4))).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionCommitEmpty() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionRollbackEmpty() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionPutAfterCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.put(bytesOf(1), bytesOf(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRemoveAfterCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.remove(bytesOf(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionPutAfterRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.put(bytesOf(1), bytesOf(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRemoveAfterRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.remove(bytesOf(1)); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionCommitAfterRollback() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.commit(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionCommitTwice() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.commit(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRollbackAfterCommit() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.commit(); |
||||
tx.rollback(); |
||||
} |
||||
|
||||
@Test(expected = IllegalStateException.class) |
||||
public void transactionRollbackTwice() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
tx.rollback(); |
||||
tx.rollback(); |
||||
} |
||||
|
||||
@Test |
||||
public void twoTransactions() throws Exception { |
||||
final KeyValueStorage store = createStore(); |
||||
|
||||
final KeyValueStorageTransaction tx1 = store.startTransaction(); |
||||
final KeyValueStorageTransaction tx2 = store.startTransaction(); |
||||
|
||||
tx1.put(bytesOf(1), bytesOf(1)); |
||||
tx2.put(bytesOf(2), bytesOf(2)); |
||||
|
||||
tx1.commit(); |
||||
tx2.commit(); |
||||
|
||||
assertThat(store.get(bytesOf(1))).contains(bytesOf(1)); |
||||
assertThat(store.get(bytesOf(2))).contains(bytesOf(2)); |
||||
} |
||||
|
||||
@Test |
||||
public void transactionIsolation() throws Exception { |
||||
final int keyCount = 1000; |
||||
final KeyValueStorage store = createStore(); |
||||
|
||||
final CountDownLatch finishedLatch = new CountDownLatch(2); |
||||
final Function<byte[], Thread> txRunner = |
||||
(value) -> |
||||
new Thread( |
||||
() -> { |
||||
final KeyValueStorageTransaction tx = store.startTransaction(); |
||||
for (int i = 0; i < keyCount; i++) { |
||||
tx.put(BytesValues.toMinimalBytes(i).getArrayUnsafe(), value); |
||||
} |
||||
try { |
||||
tx.commit(); |
||||
} finally { |
||||
finishedLatch.countDown(); |
||||
} |
||||
}); |
||||
|
||||
// Run 2 concurrent transactions that write a bunch of values to the same keys
|
||||
final byte[] a = bytesOf(10); |
||||
final byte[] b = bytesOf(20); |
||||
txRunner.apply(a).start(); |
||||
txRunner.apply(b).start(); |
||||
|
||||
finishedLatch.await(); |
||||
|
||||
// Check that transaction results are isolated (not interleaved)
|
||||
final List<byte[]> finalValues = new ArrayList<>(keyCount); |
||||
for (int i = 0; i < keyCount; i++) { |
||||
final byte[] key = BytesValues.toMinimalBytes(i).getArrayUnsafe(); |
||||
finalValues.add(store.get(key).get()); |
||||
} |
||||
|
||||
// Expecting the same value for all entries
|
||||
final byte[] expected = finalValues.get(0); |
||||
for (final byte[] actual : finalValues) { |
||||
assertArrayEquals(expected, actual); |
||||
} |
||||
|
||||
assertTrue(Arrays.equals(expected, a) || Arrays.equals(expected, b)); |
||||
|
||||
store.close(); |
||||
} |
||||
|
||||
/* |
||||
* Used to mimic the wrapping with BytesValue performed in Pantheon |
||||
*/ |
||||
protected byte[] bytesFromHexString(final String hex) { |
||||
return BytesValue.fromHexString(hex).getArrayUnsafe(); |
||||
} |
||||
|
||||
protected byte[] bytesOf(final int... bytes) { |
||||
return BytesValue.of(bytes).getArrayUnsafe(); |
||||
} |
||||
} |
Loading…
Reference in new issue