leveldb shard

pull/4066/head
lutty 3 years ago committed by Lutty
parent 7e6b16aec8
commit d6d25be453
  1. 5
      cmd/harmony/default.go
  2. 38
      cmd/harmony/flags.go
  3. 47
      cmd/harmony/flags_test.go
  4. 12
      cmd/harmony/main.go
  5. 7
      internal/configs/harmony/harmony.go
  6. 21
      internal/shardchain/dbfactory.go
  7. 37
      internal/shardchain/leveldb_shard/common.go
  8. 199
      internal/shardchain/leveldb_shard/shard.go
  9. 118
      internal/shardchain/leveldb_shard/shard_batch.go
  10. 22
      internal/shardchain/local_cache/hack.go
  11. 82
      internal/shardchain/local_cache/local_cache_batch.go
  12. 111
      internal/shardchain/local_cache/local_cache_database.go

@ -87,6 +87,11 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
},
},
DNSSync: getDefaultDNSSyncConfig(defNetworkType),
ShardData: harmonyconfig.ShardDataConfig{
EnableShardData: false,
DiskCount: 8,
ShardCount: 4,
},
}
var defaultSysConfig = harmonyconfig.SysConfig{

@ -213,6 +213,12 @@ var (
syncDiscHighFlag,
syncDiscBatchFlag,
}
shardDataFlags = []cli.Flag{
enableShardDataFlag,
diskCountFlag,
shardCountFlag,
}
)
var (
@ -312,6 +318,7 @@ func getRootFlags() []cli.Flag {
flags = append(flags, legacyMiscFlags...)
flags = append(flags, prometheusFlags...)
flags = append(flags, syncFlags...)
flags = append(flags, shardDataFlags...)
return flags
}
@ -1596,3 +1603,34 @@ func applySyncFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
config.Sync.DiscBatch = cli.GetIntFlagValue(cmd, syncDiscBatchFlag)
}
}
// shard data flags
var (
enableShardDataFlag = cli.BoolFlag{
Name: "enable_shard_data",
Usage: "whether use multi-database mode of levelDB",
DefValue: defaultConfig.ShardData.EnableShardData,
}
diskCountFlag = cli.IntFlag{
Name: "disk_count",
Usage: "the count of disks you want to storage block data",
DefValue: defaultConfig.ShardData.DiskCount,
}
shardCountFlag = cli.IntFlag{
Name: "shard_count",
Usage: "the count of shards you want to split in each disk",
DefValue: defaultConfig.ShardData.ShardCount,
}
)
func applyShardDataFlags(cmd *cobra.Command, cfg *harmonyconfig.HarmonyConfig) {
if cli.IsFlagChanged(cmd, enableShardDataFlag) {
cfg.ShardData.EnableShardData = cli.GetBoolFlagValue(cmd, enableShardDataFlag)
}
if cli.IsFlagChanged(cmd, diskCountFlag) {
cfg.ShardData.DiskCount = cli.GetIntFlagValue(cmd, diskCountFlag)
}
if cli.IsFlagChanged(cmd, shardCountFlag) {
cfg.ShardData.ShardCount = cli.GetIntFlagValue(cmd, shardCountFlag)
}
}

@ -140,6 +140,11 @@ func TestHarmonyFlags(t *testing.T) {
Gateway: "https://gateway.harmony.one",
},
Sync: defaultMainnetSyncConfig,
ShardData: harmonyconfig.ShardDataConfig{
EnableShardData: false,
DiskCount: 1,
ShardCount: 1,
},
},
},
}
@ -1233,6 +1238,48 @@ func TestSyncFlags(t *testing.T) {
}
}
func TestShardDataFlags(t *testing.T) {
tests := []struct {
args []string
expConfig harmonyconfig.ShardDataConfig
expErr error
}{
{
args: []string{},
expConfig: defaultConfig.ShardData,
},
{
args: []string{"--sharddata.enable_shard_data",
"--sharddata.disk_count", "8",
"--sharddata.shard_count", "4",
},
expConfig: harmonyconfig.ShardDataConfig{
EnableShardData: true,
DiskCount: 8,
ShardCount: 4,
},
},
}
for i, test := range tests {
ts := newFlagTestSuite(t, shardDataFlags, func(command *cobra.Command, config *harmonyconfig.HarmonyConfig) {
applyShardDataFlags(command, config)
})
hc, err := ts.run(test.args)
if assErr := assertError(err, test.expErr); assErr != nil {
t.Fatalf("Test %v: %v", i, assErr)
}
if err != nil || test.expErr != nil {
continue
}
if !reflect.DeepEqual(hc.ShardData, test.expConfig) {
t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.ShardData, test.expConfig)
}
ts.tearDown()
}
}
type flagTestSuite struct {
t *testing.T

@ -231,6 +231,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
applyRevertFlags(cmd, config)
applyPrometheusFlags(cmd, config)
applySyncFlags(cmd, config)
applyShardDataFlags(cmd, config)
}
func setupNodeLog(config harmonyconfig.HarmonyConfig) {
@ -655,7 +656,16 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
}
// Current node.
chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir}
var chainDBFactory shardchain.DBFactory
if hc.ShardData.EnableShardData {
chainDBFactory = &shardchain.LDBShardFactory{
RootDir: nodeConfig.DBDir,
DiskCount: hc.ShardData.DiskCount,
ShardCount: hc.ShardData.ShardCount,
}
} else {
chainDBFactory = &shardchain.LDBFactory{RootDir: nodeConfig.DBDir}
}
currentNode := node.New(myHost, currentConsensus, chainDBFactory, blacklist, nodeConfig.ArchiveModes(), &hc)

@ -28,6 +28,7 @@ type HarmonyConfig struct {
Legacy *LegacyConfig `toml:",omitempty"`
Prometheus *PrometheusConfig `toml:",omitempty"`
DNSSync DnsSync
ShardData ShardDataConfig
}
type DnsSync struct {
@ -65,6 +66,12 @@ type GeneralConfig struct {
EnablePruneBeaconChain bool
}
type ShardDataConfig struct {
EnableShardData bool
DiskCount int
ShardCount int
}
type ConsensusConfig struct {
MinPeers int
AggregateSig bool

@ -2,7 +2,10 @@ package shardchain
import (
"fmt"
"github.com/harmony-one/harmony/internal/shardchain/leveldb_shard"
"github.com/harmony-one/harmony/internal/shardchain/local_cache"
"path"
"path/filepath"
"github.com/ethereum/go-ethereum/core/rawdb"
@ -34,3 +37,21 @@ type MemDBFactory struct{}
func (f *MemDBFactory) NewChainDB(shardID uint32) (ethdb.Database, error) {
return rawdb.NewMemoryDatabase(), nil
}
// LDBShardFactory is a merged Multi-LDB-backed blockchain database factory.
type LDBShardFactory struct {
RootDir string // directory in which to put shard databases in.
DiskCount int
ShardCount int
}
// NewChainDB returns a new memDB for the blockchain for given shard.
func (f *LDBShardFactory) NewChainDB(shardID uint32) (ethdb.Database, error) {
dir := filepath.Join(f.RootDir, fmt.Sprintf("harmony_sharddb_%d", shardID))
shard, err := leveldb_shard.NewLeveldbShard(dir, f.DiskCount, f.ShardCount)
if err != nil {
return nil, err
}
return rawdb.NewDatabase(local_cache.NewLocalCacheDatabase(shard)), nil
}

@ -0,0 +1,37 @@
package leveldb_shard
import (
"hash/crc32"
"sync"
"sync/atomic"
)
func mapDBIndex(key []byte, dbCount uint32) uint32 {
return crc32.ChecksumIEEE(key) % dbCount
}
func parallelRunAndReturnErr(parallelNum int, cb func(index int) error) error {
wg := sync.WaitGroup{}
errAtomic := atomic.Value{}
for i := 0; i < parallelNum; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
err := cb(i)
if err != nil {
errAtomic.Store(err)
}
}(i)
}
wg.Wait()
if err := errAtomic.Load(); err != nil {
return errAtomic.Load().(error)
} else {
return nil
}
}

@ -0,0 +1,199 @@
package leveldb_shard
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"path/filepath"
"strings"
"sync"
)
type LeveldbShard struct {
dbs []*leveldb.DB
dbCount uint32
}
var shardIdxKey = []byte("__DB_SHARED_INDEX__")
func NewLeveldbShard(savePath string, diskCount int, diskShards int) (shard *LeveldbShard, err error) {
shard = &LeveldbShard{
dbs: make([]*leveldb.DB, diskCount*diskShards),
dbCount: uint32(diskCount * diskShards),
}
// clean when error
defer func() {
if err != nil {
for _, db := range shard.dbs {
if db != nil {
_ = db.Close()
}
}
shard = nil
}
}()
levelDBOptions := &opt.Options{
OpenFilesCacheCapacity: 128,
WriteBuffer: 8 << 20, //8MB, max memory occupyv = 8*2*diskCount*diskShards
BlockCacheCapacity: 16 << 20, //16MB
Filter: filter.NewBloomFilter(8),
DisableSeeksCompaction: true,
}
// async open
wg := sync.WaitGroup{}
for i := 0; i < diskCount; i++ {
for j := 0; j < diskShards; j++ {
shardPath := filepath.Join(savePath, fmt.Sprintf("disk%02d", i), fmt.Sprintf("block%02d", j))
dbIndex := i*diskShards + j
wg.Add(1)
go func() {
defer wg.Done()
ldb, openErr := leveldb.OpenFile(shardPath, levelDBOptions)
if openErr != nil {
err = openErr
return
}
indexByte := make([]byte, 8)
binary.BigEndian.PutUint64(indexByte, uint64(dbIndex))
inDBIndex, getErr := ldb.Get(shardIdxKey, nil)
if getErr != nil {
if getErr == leveldb.ErrNotFound {
putErr := ldb.Put(shardIdxKey, indexByte, nil)
if putErr != nil {
err = putErr
return
}
} else {
err = getErr
return
}
} else if bytes.Compare(indexByte, inDBIndex) != 0 {
err = fmt.Errorf("db shard index error, need %v, got %v", indexByte, inDBIndex)
return
}
shard.dbs[dbIndex] = ldb
}()
}
}
wg.Wait()
return shard, err
}
func (l *LeveldbShard) mapDB(key []byte) *leveldb.DB {
return l.dbs[mapDBIndex(key, l.dbCount)]
}
// Has retrieves if a key is present in the key-value data store.
func (l *LeveldbShard) Has(key []byte) (bool, error) {
return l.mapDB(key).Has(key, nil)
}
// Get retrieves the given key if it's present in the key-value data store.
func (l *LeveldbShard) Get(key []byte) ([]byte, error) {
return l.mapDB(key).Get(key, nil)
}
// Put inserts the given value into the key-value data store.
func (l *LeveldbShard) Put(key []byte, value []byte) error {
return l.mapDB(key).Put(key, value, nil)
}
// Delete removes the key from the key-value data store.
func (l *LeveldbShard) Delete(key []byte) error {
return l.mapDB(key).Delete(key, nil)
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
func (l *LeveldbShard) NewBatch() ethdb.Batch {
return NewLeveldbShardBatch(l)
}
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
// contained within the key-value database.
func (l *LeveldbShard) NewIterator() ethdb.Iterator {
return l.iterator(nil)
}
// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
func (l *LeveldbShard) NewIteratorWithStart(start []byte) ethdb.Iterator {
return l.iterator(&util.Range{Start: start})
}
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
func (l *LeveldbShard) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
return l.iterator(util.BytesPrefix(prefix))
}
func (l *LeveldbShard) iterator(slice *util.Range) ethdb.Iterator {
iters := make([]iterator.Iterator, l.dbCount)
for i, db := range l.dbs {
iter := db.NewIterator(slice, nil)
iters[i] = iter
}
return iterator.NewMergedIterator(iters, comparer.DefaultComparer, true)
}
// Stat returns a particular internal stat of the database.
func (l *LeveldbShard) Stat(property string) (string, error) {
sb := strings.Builder{}
for i, db := range l.dbs {
getProperty, err := db.GetProperty(property)
if err != nil {
return "", err
}
sb.WriteString(fmt.Sprintf("=== shard %02d ===\n", i))
sb.WriteString(getProperty)
sb.WriteString("\n")
}
return sb.String(), nil
}
// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (l *LeveldbShard) Compact(start []byte, limit []byte) (err error) {
return parallelRunAndReturnErr(int(l.dbCount), func(i int) error {
return l.dbs[i].CompactRange(util.Range{Start: start, Limit: limit})
})
}
// Close all the DB
func (l *LeveldbShard) Close() error {
for _, db := range l.dbs {
err := db.Close()
if err != nil {
return err
}
}
return nil
}

@ -0,0 +1,118 @@
package leveldb_shard
import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb"
"runtime"
"sync"
)
var batchesPool = sync.Pool{
New: func() interface{} {
return &leveldb.Batch{}
},
}
type LeveldbShardBatch struct {
shard *LeveldbShard
batches []*leveldb.Batch
batchesCount uint32
}
func NewLeveldbShardBatch(shard *LeveldbShard) *LeveldbShardBatch {
shardBatch := &LeveldbShardBatch{
batches: make([]*leveldb.Batch, shard.dbCount),
batchesCount: shard.dbCount,
shard: shard,
}
for i := uint32(0); i < shard.dbCount; i++ {
shardBatch.batches[i] = batchesPool.Get().(*leveldb.Batch)
}
runtime.SetFinalizer(shardBatch, func(o *LeveldbShardBatch) {
for _, batch := range o.batches {
batch.Reset()
batchesPool.Put(batch)
}
o.batches = nil
})
return shardBatch
}
func (l *LeveldbShardBatch) mapBatch(key []byte) *leveldb.Batch {
return l.batches[mapDBIndex(key, l.batchesCount)]
}
// Put inserts the given value into the key-value data store.
func (l *LeveldbShardBatch) Put(key []byte, value []byte) error {
l.mapBatch(key).Put(key, value)
return nil
}
// Delete removes the key from the key-value data store.
func (l *LeveldbShardBatch) Delete(key []byte) error {
l.mapBatch(key).Delete(key)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (l *LeveldbShardBatch) ValueSize() int {
size := 0
for _, batch := range l.batches {
size += batch.Len()
}
return size
}
// Write flushes any accumulated data to disk.
func (l *LeveldbShardBatch) Write() (err error) {
return parallelRunAndReturnErr(int(l.batchesCount), func(i int) error {
return l.shard.dbs[i].Write(l.batches[i], nil)
})
}
// Reset resets the batch for reuse.
func (l *LeveldbShardBatch) Reset() {
for _, batch := range l.batches {
batch.Reset()
}
}
// Replay replays the batch contents.
func (l *LeveldbShardBatch) Replay(w ethdb.KeyValueWriter) error {
for _, batch := range l.batches {
err := batch.Replay(&replayer{writer: w})
if err != nil {
return err
}
}
return nil
}
// replayer is a small wrapper to implement the correct replay methods.
type replayer struct {
writer ethdb.KeyValueWriter
failure error
}
// Put inserts the given value into the key-value data store.
func (r *replayer) Put(key, value []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (r *replayer) Delete(key []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Delete(key)
}

@ -0,0 +1,22 @@
package local_cache
import (
"reflect"
"unsafe"
)
func String(b []byte) (s string) {
if len(b) == 0 {
return ""
}
return *(*string)(unsafe.Pointer(&b))
}
func StringBytes(s string) []byte {
var b []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
hdr.Cap = len(s)
hdr.Len = len(s)
return b
}

@ -0,0 +1,82 @@
package local_cache
import (
"github.com/ethereum/go-ethereum/ethdb"
"sync"
)
type LocalCacheBatch struct {
db *LocalCacheDatabase
lock sync.Mutex
size int
batchWriteKey [][]byte
batchWriteValue [][]byte
batchDeleteKey [][]byte
}
func newLocalCacheBatch(db *LocalCacheDatabase) *LocalCacheBatch {
return &LocalCacheBatch{db: db}
}
func (b *LocalCacheBatch) Put(key []byte, value []byte) error {
b.lock.Lock()
defer b.lock.Unlock()
b.batchWriteKey = append(b.batchWriteKey, key)
b.batchWriteValue = append(b.batchWriteValue, value)
b.size += len(key) + len(value)
return nil
}
func (b *LocalCacheBatch) Delete(key []byte) error {
b.lock.Lock()
defer b.lock.Unlock()
b.batchDeleteKey = append(b.batchDeleteKey, key)
b.size += len(key)
return nil
}
func (b *LocalCacheBatch) ValueSize() int {
return b.size
}
func (b *LocalCacheBatch) Write() error {
b.lock.Lock()
defer b.lock.Unlock()
return b.db.batchWrite(b)
}
func (b *LocalCacheBatch) Reset() {
b.lock.Lock()
defer b.lock.Unlock()
b.batchWriteKey = b.batchWriteKey[:0]
b.batchWriteValue = b.batchWriteValue[:0]
b.batchDeleteKey = b.batchDeleteKey[:0]
b.size = 0
}
func (b *LocalCacheBatch) Replay(w ethdb.KeyValueWriter) error {
if len(b.batchWriteKey) > 0 {
for i, key := range b.batchWriteKey {
err := w.Put(key, b.batchWriteValue[i])
if err != nil {
return err
}
}
}
if len(b.batchDeleteKey) > 0 {
for _, key := range b.batchDeleteKey {
err := w.Delete(key)
if err != nil {
return err
}
}
}
return nil
}

@ -0,0 +1,111 @@
package local_cache
import (
"bytes"
"github.com/allegro/bigcache"
"github.com/ethereum/go-ethereum/ethdb"
"log"
"time"
)
type cacheWrapper struct {
*bigcache.BigCache
}
func (c *cacheWrapper) Put(key []byte, value []byte) error {
return c.BigCache.Set(String(key), value)
}
func (c *cacheWrapper) Delete(key []byte) error {
return c.BigCache.Delete(String(key))
}
type LocalCacheDatabase struct {
ethdb.KeyValueStore
enableReadCache bool
deleteMap map[string]bool
readCache *cacheWrapper
}
func NewLocalCacheDatabase(remoteDB ethdb.KeyValueStore) *LocalCacheDatabase {
config := bigcache.DefaultConfig(10 * time.Minute)
config.HardMaxCacheSize = 512
config.MaxEntriesInWindow = 2000 * 10 * 60
cache, _ := bigcache.NewBigCache(config)
db := &LocalCacheDatabase{
KeyValueStore: remoteDB,
enableReadCache: true,
deleteMap: make(map[string]bool),
readCache: &cacheWrapper{cache},
}
go func() {
for range time.Tick(time.Second) {
log.Printf("cache: %#v %d (%d)", cache.Stats(), cache.Len(), cache.Capacity())
}
}()
return db
}
func (c *LocalCacheDatabase) Has(key []byte) (bool, error) {
return c.KeyValueStore.Has(key)
}
func (c *LocalCacheDatabase) Get(key []byte) (ret []byte, err error) {
if c.enableReadCache {
if bytes.Compare(key, []byte("LastBlock")) != 0 {
strKey := String(key)
ret, err = c.readCache.Get(strKey)
if err == nil {
return ret, nil
}
defer func() {
if err == nil {
_ = c.readCache.Set(strKey, ret)
}
}()
}
}
return c.KeyValueStore.Get(key)
}
func (c *LocalCacheDatabase) Put(key []byte, value []byte) error {
if c.enableReadCache {
_ = c.readCache.Put(key, value)
}
return c.KeyValueStore.Put(key, value)
}
func (c *LocalCacheDatabase) Delete(key []byte) error {
if c.enableReadCache {
_ = c.readCache.Delete(key)
}
return c.KeyValueStore.Delete(key)
}
func (c *LocalCacheDatabase) NewBatch() ethdb.Batch {
return newLocalCacheBatch(c)
}
func (c *LocalCacheDatabase) batchWrite(b *LocalCacheBatch) error {
if c.enableReadCache {
_ = b.Replay(c.readCache)
}
batch := c.KeyValueStore.NewBatch()
err := b.Replay(batch)
if err != nil {
return err
}
return batch.Write()
}
Loading…
Cancel
Save