@ -70,18 +70,16 @@ import org.slf4j.LoggerFactory;
public abstract class RocksDBColumnarKeyValueStorage implements SegmentedKeyValueStorage {
private static final Logger LOG = LoggerFactory . getLogger ( RocksDBColumnarKeyValueStorage . class ) ;
static final String DEFAULT_COLUMN = "default" ;
private static final int ROCKSDB_FORMAT_VERSION = 5 ;
private static final long ROCKSDB_BLOCK_SIZE = 32768 ;
/** RocksDb blockcache size when using the high spec option */
protected static final long ROCKSDB_BLOCKCACHE_SIZE_HIGH_SPEC = 1_073_741_824L ;
/** RocksDb memtable size when using the high spec option */
protected static final long ROCKSDB_MEMTABLE_SIZE_HIGH_SPEC = 1_073_741_824 L;
protected static final long ROCKSDB_MEMTABLE_SIZE_HIGH_SPEC = 536_870_912 L;
/** Max total size of all WAL file, after which a flush is triggered */
protected static final long WAL_MAX_TOTAL_SIZE = 1_073_741_824L ;
/** Expected size of a single WAL file, to determine how many WAL files to keep around */
protected static final long EXPECTED_WAL_FILE_SIZE = 67_108_864L ;
/** RocksDb number of log files to keep on disk */
private static final long NUMBER_OF_LOG_FILES_TO_KEEP = 7 ;
/** RocksDb Time to roll a log file (1 day = 3600 * 24 seconds) */
@ -144,7 +142,6 @@ public abstract class RocksDBColumnarKeyValueStorage implements SegmentedKeyValu
this . rocksDBMetricsFactory = rocksDBMetricsFactory ;
try {
final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions ( ) ;
trimmedSegments = new ArrayList < > ( defaultSegments ) ;
final List < byte [ ] > existingColumnFamilies =
RocksDB . listColumnFamilies ( new Options ( ) , configuration . getDatabaseDir ( ) . toString ( ) ) ;
@ -156,14 +153,9 @@ public abstract class RocksDBColumnarKeyValueStorage implements SegmentedKeyValu
. noneMatch ( existed - > Arrays . equals ( existed , ignorableSegment . getId ( ) ) ) )
. forEach ( trimmedSegments : : remove ) ;
columnDescriptors =
trimmedSegments . stream ( ) . map ( this : : createColumnDescriptor ) . collect ( Collectors . toList ( ) ) ;
columnDescriptors . add (
new ColumnFamilyDescriptor (
DEFAULT_COLUMN . getBytes ( StandardCharsets . UTF_8 ) ,
columnFamilyOptions
. setTtl ( 0 )
. setCompressionType ( CompressionType . LZ4_COMPRESSION )
. setTableFormatConfig ( createBlockBasedTableConfig ( configuration ) ) ) ) ;
trimmedSegments . stream ( )
. map ( segment - > createColumnDescriptor ( segment , configuration ) )
. collect ( Collectors . toList ( ) ) ;
setGlobalOptions ( configuration , stats ) ;
@ -174,6 +166,80 @@ public abstract class RocksDBColumnarKeyValueStorage implements SegmentedKeyValu
}
}
/ * *
* Create a Column Family Descriptor for a given segment It defines basically the different
* options to apply to the corresponding Column Family
*
* @param segment the segment identifier
* @param configuration RocksDB configuration
* @return a column family descriptor
* /
private ColumnFamilyDescriptor createColumnDescriptor (
final SegmentIdentifier segment , final RocksDBConfiguration configuration ) {
BlockBasedTableConfig basedTableConfig = createBlockBasedTableConfig ( segment , configuration ) ;
final var options =
new ColumnFamilyOptions ( )
. setTtl ( 0 )
. setCompressionType ( CompressionType . LZ4_COMPRESSION )
. setTableFormatConfig ( basedTableConfig ) ;
if ( segment . containsStaticData ( ) ) {
options
. setEnableBlobFiles ( true )
. setEnableBlobGarbageCollection ( false )
. setMinBlobSize ( 100 )
. setBlobCompressionType ( CompressionType . LZ4_COMPRESSION ) ;
}
return new ColumnFamilyDescriptor ( segment . getId ( ) , options ) ;
}
/ * * *
* Create a Block Base Table configuration for each segment , depending on the configuration in place
* and the segment itself
*
* @param segment The segment related to the column family
* @param config RocksDB configuration
* @return Block Base Table configuration
* /
private BlockBasedTableConfig createBlockBasedTableConfig (
final SegmentIdentifier segment , final RocksDBConfiguration config ) {
final LRUCache cache =
new LRUCache (
config . isHighSpec ( ) & & segment . isEligibleToHighSpecFlag ( )
? ROCKSDB_BLOCKCACHE_SIZE_HIGH_SPEC
: config . getCacheCapacity ( ) ) ;
return new BlockBasedTableConfig ( )
. setFormatVersion ( ROCKSDB_FORMAT_VERSION )
. setBlockCache ( cache )
. setFilterPolicy ( new BloomFilter ( 10 , false ) )
. setPartitionFilters ( true )
. setCacheIndexAndFilterBlocks ( false )
. setBlockSize ( ROCKSDB_BLOCK_SIZE ) ;
}
/ * * *
* Set Global options ( DBOptions )
*
* @param configuration RocksDB configuration
* @param stats The statistics object
* /
private void setGlobalOptions ( final RocksDBConfiguration configuration , final Statistics stats ) {
options = new DBOptions ( ) ;
options
. setCreateIfMissing ( true )
. setMaxOpenFiles ( configuration . getMaxOpenFiles ( ) )
. setStatistics ( stats )
. setCreateMissingColumnFamilies ( true )
. setLogFileTimeToRoll ( TIME_TO_ROLL_LOG_FILE )
. setKeepLogFileNum ( NUMBER_OF_LOG_FILES_TO_KEEP )
. setEnv ( Env . getDefault ( ) . setBackgroundThreads ( configuration . getBackgroundThreadCount ( ) ) )
. setMaxTotalWalSize ( WAL_MAX_TOTAL_SIZE )
. setRecycleLogFileNum ( WAL_MAX_TOTAL_SIZE / EXPECTED_WAL_FILE_SIZE ) ;
}
/ * *
* Parse RocksDBException and wrap in StorageException
*
@ -219,42 +285,6 @@ public abstract class RocksDBColumnarKeyValueStorage implements SegmentedKeyValu
}
}
private ColumnFamilyDescriptor createColumnDescriptor ( final SegmentIdentifier segment ) {
final var options =
new ColumnFamilyOptions ( )
. setTtl ( 0 )
. setCompressionType ( CompressionType . LZ4_COMPRESSION )
. setTableFormatConfig ( createBlockBasedTableConfig ( configuration ) ) ;
if ( segment . containsStaticData ( ) ) {
options
. setEnableBlobFiles ( true )
. setEnableBlobGarbageCollection ( false )
. setMinBlobSize ( 100 )
. setBlobCompressionType ( CompressionType . LZ4_COMPRESSION ) ;
}
return new ColumnFamilyDescriptor ( segment . getId ( ) , options ) ;
}
private void setGlobalOptions ( final RocksDBConfiguration configuration , final Statistics stats ) {
options = new DBOptions ( ) ;
options
. setCreateIfMissing ( true )
. setMaxOpenFiles ( configuration . getMaxOpenFiles ( ) )
. setMaxTotalWalSize ( WAL_MAX_TOTAL_SIZE )
. setRecycleLogFileNum ( WAL_MAX_TOTAL_SIZE / EXPECTED_WAL_FILE_SIZE )
. setStatistics ( stats )
. setCreateMissingColumnFamilies ( true )
. setLogFileTimeToRoll ( TIME_TO_ROLL_LOG_FILE )
. setKeepLogFileNum ( NUMBER_OF_LOG_FILES_TO_KEEP )
. setEnv ( Env . getDefault ( ) . setBackgroundThreads ( configuration . getBackgroundThreadCount ( ) ) ) ;
if ( configuration . isHighSpec ( ) ) {
options . setDbWriteBufferSize ( ROCKSDB_MEMTABLE_SIZE_HIGH_SPEC ) ;
}
}
void initMetrics ( ) {
metrics = rocksDBMetricsFactory . create ( metricsSystem , configuration , getDB ( ) , stats ) ;
}
@ -287,19 +317,6 @@ public abstract class RocksDBColumnarKeyValueStorage implements SegmentedKeyValu
} ) ) ;
}
BlockBasedTableConfig createBlockBasedTableConfig ( final RocksDBConfiguration config ) {
final LRUCache cache =
new LRUCache (
config . isHighSpec ( ) ? ROCKSDB_BLOCKCACHE_SIZE_HIGH_SPEC : config . getCacheCapacity ( ) ) ;
return new BlockBasedTableConfig ( )
. setFormatVersion ( ROCKSDB_FORMAT_VERSION )
. setBlockCache ( cache )
. setFilterPolicy ( new BloomFilter ( 10 , false ) )
. setPartitionFilters ( true )
. setCacheIndexAndFilterBlocks ( false )
. setBlockSize ( ROCKSDB_BLOCK_SIZE ) ;
}
/ * *
* Safe method to map segment identifier to column handle .
*