diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 1ceeb3f4c..21bb5d512 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -87,6 +87,11 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ }, }, DNSSync: getDefaultDNSSyncConfig(defNetworkType), + ShardData: harmonyconfig.ShardDataConfig{ + EnableShardData: false, + DiskCount: 8, + ShardCount: 4, + }, } var defaultSysConfig = harmonyconfig.SysConfig{ diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index 72b4ca3e6..4fd5d42ce 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -213,6 +213,12 @@ var ( syncDiscHighFlag, syncDiscBatchFlag, } + + shardDataFlags = []cli.Flag{ + enableShardDataFlag, + diskCountFlag, + shardCountFlag, + } ) var ( @@ -312,6 +318,7 @@ func getRootFlags() []cli.Flag { flags = append(flags, legacyMiscFlags...) flags = append(flags, prometheusFlags...) flags = append(flags, syncFlags...) + flags = append(flags, shardDataFlags...) return flags } @@ -1596,3 +1603,34 @@ func applySyncFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { config.Sync.DiscBatch = cli.GetIntFlagValue(cmd, syncDiscBatchFlag) } } + +// shard data flags +var ( + enableShardDataFlag = cli.BoolFlag{ + Name: "enable_shard_data", + Usage: "whether use multi-database mode of levelDB", + DefValue: defaultConfig.ShardData.EnableShardData, + } + diskCountFlag = cli.IntFlag{ + Name: "disk_count", + Usage: "the count of disks you want to storage block data", + DefValue: defaultConfig.ShardData.DiskCount, + } + shardCountFlag = cli.IntFlag{ + Name: "shard_count", + Usage: "the count of shards you want to split in each disk", + DefValue: defaultConfig.ShardData.ShardCount, + } +) + +func applyShardDataFlags(cmd *cobra.Command, cfg *harmonyconfig.HarmonyConfig) { + if cli.IsFlagChanged(cmd, enableShardDataFlag) { + cfg.ShardData.EnableShardData = cli.GetBoolFlagValue(cmd, enableShardDataFlag) + } + if cli.IsFlagChanged(cmd, diskCountFlag) { + cfg.ShardData.DiskCount = cli.GetIntFlagValue(cmd, diskCountFlag) + } + if cli.IsFlagChanged(cmd, shardCountFlag) { + cfg.ShardData.ShardCount = cli.GetIntFlagValue(cmd, shardCountFlag) + } +} diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index a662ed524..7699eab6d 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -140,6 +140,11 @@ func TestHarmonyFlags(t *testing.T) { Gateway: "https://gateway.harmony.one", }, Sync: defaultMainnetSyncConfig, + ShardData: harmonyconfig.ShardDataConfig{ + EnableShardData: false, + DiskCount: 1, + ShardCount: 1, + }, }, }, } @@ -1233,6 +1238,48 @@ func TestSyncFlags(t *testing.T) { } } +func TestShardDataFlags(t *testing.T) { + tests := []struct { + args []string + expConfig harmonyconfig.ShardDataConfig + expErr error + }{ + { + args: []string{}, + expConfig: defaultConfig.ShardData, + }, + { + args: []string{"--sharddata.enable_shard_data", + "--sharddata.disk_count", "8", + "--sharddata.shard_count", "4", + }, + expConfig: harmonyconfig.ShardDataConfig{ + EnableShardData: true, + DiskCount: 8, + ShardCount: 4, + }, + }, + } + for i, test := range tests { + ts := newFlagTestSuite(t, shardDataFlags, func(command *cobra.Command, config *harmonyconfig.HarmonyConfig) { + applyShardDataFlags(command, config) + }) + hc, err := ts.run(test.args) + + if assErr := assertError(err, test.expErr); assErr != nil { + t.Fatalf("Test %v: %v", i, assErr) + } + if err != nil || test.expErr != nil { + continue + } + if !reflect.DeepEqual(hc.ShardData, test.expConfig) { + t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.ShardData, test.expConfig) + } + + ts.tearDown() + } +} + type flagTestSuite struct { t *testing.T diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index a3fdb1dcd..e94ea6c22 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -231,6 +231,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { applyRevertFlags(cmd, config) applyPrometheusFlags(cmd, config) applySyncFlags(cmd, config) + applyShardDataFlags(cmd, config) } func setupNodeLog(config harmonyconfig.HarmonyConfig) { @@ -655,7 +656,16 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi } // Current node. - chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir} + var chainDBFactory shardchain.DBFactory + if hc.ShardData.EnableShardData { + chainDBFactory = &shardchain.LDBShardFactory{ + RootDir: nodeConfig.DBDir, + DiskCount: hc.ShardData.DiskCount, + ShardCount: hc.ShardData.ShardCount, + } + } else { + chainDBFactory = &shardchain.LDBFactory{RootDir: nodeConfig.DBDir} + } currentNode := node.New(myHost, currentConsensus, chainDBFactory, blacklist, nodeConfig.ArchiveModes(), &hc) diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index a73004d71..691562501 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -28,6 +28,7 @@ type HarmonyConfig struct { Legacy *LegacyConfig `toml:",omitempty"` Prometheus *PrometheusConfig `toml:",omitempty"` DNSSync DnsSync + ShardData ShardDataConfig } type DnsSync struct { @@ -65,6 +66,12 @@ type GeneralConfig struct { EnablePruneBeaconChain bool } +type ShardDataConfig struct { + EnableShardData bool + DiskCount int + ShardCount int +} + type ConsensusConfig struct { MinPeers int AggregateSig bool diff --git a/internal/shardchain/dbfactory.go b/internal/shardchain/dbfactory.go index f845d974a..20334f88a 100644 --- a/internal/shardchain/dbfactory.go +++ b/internal/shardchain/dbfactory.go @@ -2,7 +2,10 @@ package shardchain import ( "fmt" + "github.com/harmony-one/harmony/internal/shardchain/leveldb_shard" + "github.com/harmony-one/harmony/internal/shardchain/local_cache" "path" + "path/filepath" "github.com/ethereum/go-ethereum/core/rawdb" @@ -34,3 +37,21 @@ type MemDBFactory struct{} func (f *MemDBFactory) NewChainDB(shardID uint32) (ethdb.Database, error) { return rawdb.NewMemoryDatabase(), nil } + +// LDBShardFactory is a merged Multi-LDB-backed blockchain database factory. +type LDBShardFactory struct { + RootDir string // directory in which to put shard databases in. + DiskCount int + ShardCount int +} + +// NewChainDB returns a new memDB for the blockchain for given shard. +func (f *LDBShardFactory) NewChainDB(shardID uint32) (ethdb.Database, error) { + dir := filepath.Join(f.RootDir, fmt.Sprintf("harmony_sharddb_%d", shardID)) + shard, err := leveldb_shard.NewLeveldbShard(dir, f.DiskCount, f.ShardCount) + if err != nil { + return nil, err + } + + return rawdb.NewDatabase(local_cache.NewLocalCacheDatabase(shard)), nil +} diff --git a/internal/shardchain/leveldb_shard/common.go b/internal/shardchain/leveldb_shard/common.go new file mode 100644 index 000000000..2b852da8c --- /dev/null +++ b/internal/shardchain/leveldb_shard/common.go @@ -0,0 +1,37 @@ +package leveldb_shard + +import ( + "hash/crc32" + "sync" + "sync/atomic" +) + +func mapDBIndex(key []byte, dbCount uint32) uint32 { + return crc32.ChecksumIEEE(key) % dbCount +} + +func parallelRunAndReturnErr(parallelNum int, cb func(index int) error) error { + wg := sync.WaitGroup{} + errAtomic := atomic.Value{} + + for i := 0; i < parallelNum; i++ { + wg.Add(1) + + go func(i int) { + defer wg.Done() + + err := cb(i) + if err != nil { + errAtomic.Store(err) + } + }(i) + } + + wg.Wait() + + if err := errAtomic.Load(); err != nil { + return errAtomic.Load().(error) + } else { + return nil + } +} diff --git a/internal/shardchain/leveldb_shard/shard.go b/internal/shardchain/leveldb_shard/shard.go new file mode 100644 index 000000000..4d8ddd3e2 --- /dev/null +++ b/internal/shardchain/leveldb_shard/shard.go @@ -0,0 +1,199 @@ +package leveldb_shard + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/comparer" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" + "path/filepath" + "strings" + "sync" +) + +type LeveldbShard struct { + dbs []*leveldb.DB + dbCount uint32 +} + +var shardIdxKey = []byte("__DB_SHARED_INDEX__") + +func NewLeveldbShard(savePath string, diskCount int, diskShards int) (shard *LeveldbShard, err error) { + shard = &LeveldbShard{ + dbs: make([]*leveldb.DB, diskCount*diskShards), + dbCount: uint32(diskCount * diskShards), + } + + // clean when error + defer func() { + if err != nil { + for _, db := range shard.dbs { + if db != nil { + _ = db.Close() + } + } + + shard = nil + } + }() + + levelDBOptions := &opt.Options{ + OpenFilesCacheCapacity: 128, + WriteBuffer: 8 << 20, //8MB, max memory occupyv = 8*2*diskCount*diskShards + BlockCacheCapacity: 16 << 20, //16MB + Filter: filter.NewBloomFilter(8), + DisableSeeksCompaction: true, + } + + // async open + wg := sync.WaitGroup{} + for i := 0; i < diskCount; i++ { + for j := 0; j < diskShards; j++ { + shardPath := filepath.Join(savePath, fmt.Sprintf("disk%02d", i), fmt.Sprintf("block%02d", j)) + dbIndex := i*diskShards + j + wg.Add(1) + go func() { + defer wg.Done() + + ldb, openErr := leveldb.OpenFile(shardPath, levelDBOptions) + if openErr != nil { + err = openErr + return + } + + indexByte := make([]byte, 8) + binary.BigEndian.PutUint64(indexByte, uint64(dbIndex)) + inDBIndex, getErr := ldb.Get(shardIdxKey, nil) + if getErr != nil { + if getErr == leveldb.ErrNotFound { + putErr := ldb.Put(shardIdxKey, indexByte, nil) + if putErr != nil { + err = putErr + return + } + } else { + err = getErr + return + } + } else if bytes.Compare(indexByte, inDBIndex) != 0 { + err = fmt.Errorf("db shard index error, need %v, got %v", indexByte, inDBIndex) + return + } + + shard.dbs[dbIndex] = ldb + }() + } + } + + wg.Wait() + + return shard, err +} + +func (l *LeveldbShard) mapDB(key []byte) *leveldb.DB { + return l.dbs[mapDBIndex(key, l.dbCount)] +} + +// Has retrieves if a key is present in the key-value data store. +func (l *LeveldbShard) Has(key []byte) (bool, error) { + return l.mapDB(key).Has(key, nil) +} + +// Get retrieves the given key if it's present in the key-value data store. +func (l *LeveldbShard) Get(key []byte) ([]byte, error) { + return l.mapDB(key).Get(key, nil) +} + +// Put inserts the given value into the key-value data store. +func (l *LeveldbShard) Put(key []byte, value []byte) error { + return l.mapDB(key).Put(key, value, nil) +} + +// Delete removes the key from the key-value data store. +func (l *LeveldbShard) Delete(key []byte) error { + return l.mapDB(key).Delete(key, nil) +} + +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called. +func (l *LeveldbShard) NewBatch() ethdb.Batch { + return NewLeveldbShardBatch(l) +} + +// NewIterator creates a binary-alphabetical iterator over the entire keyspace +// contained within the key-value database. +func (l *LeveldbShard) NewIterator() ethdb.Iterator { + return l.iterator(nil) +} + +// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of +// database content starting at a particular initial key (or after, if it does +// not exist). +func (l *LeveldbShard) NewIteratorWithStart(start []byte) ethdb.Iterator { + return l.iterator(&util.Range{Start: start}) +} + +// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset +// of database content with a particular key prefix. +func (l *LeveldbShard) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { + return l.iterator(util.BytesPrefix(prefix)) +} + +func (l *LeveldbShard) iterator(slice *util.Range) ethdb.Iterator { + iters := make([]iterator.Iterator, l.dbCount) + + for i, db := range l.dbs { + iter := db.NewIterator(slice, nil) + iters[i] = iter + } + + return iterator.NewMergedIterator(iters, comparer.DefaultComparer, true) +} + +// Stat returns a particular internal stat of the database. +func (l *LeveldbShard) Stat(property string) (string, error) { + sb := strings.Builder{} + + for i, db := range l.dbs { + getProperty, err := db.GetProperty(property) + if err != nil { + return "", err + } + + sb.WriteString(fmt.Sprintf("=== shard %02d ===\n", i)) + sb.WriteString(getProperty) + sb.WriteString("\n") + } + + return sb.String(), nil +} + +// Compact flattens the underlying data store for the given key range. In essence, +// deleted and overwritten versions are discarded, and the data is rearranged to +// reduce the cost of operations needed to access them. +// +// A nil start is treated as a key before all keys in the data store; a nil limit +// is treated as a key after all keys in the data store. If both is nil then it +// will compact entire data store. +func (l *LeveldbShard) Compact(start []byte, limit []byte) (err error) { + return parallelRunAndReturnErr(int(l.dbCount), func(i int) error { + return l.dbs[i].CompactRange(util.Range{Start: start, Limit: limit}) + }) +} + +// Close all the DB +func (l *LeveldbShard) Close() error { + for _, db := range l.dbs { + err := db.Close() + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/shardchain/leveldb_shard/shard_batch.go b/internal/shardchain/leveldb_shard/shard_batch.go new file mode 100644 index 000000000..4ca81218e --- /dev/null +++ b/internal/shardchain/leveldb_shard/shard_batch.go @@ -0,0 +1,118 @@ +package leveldb_shard + +import ( + "github.com/ethereum/go-ethereum/ethdb" + "github.com/syndtr/goleveldb/leveldb" + "runtime" + "sync" +) + +var batchesPool = sync.Pool{ + New: func() interface{} { + return &leveldb.Batch{} + }, +} + +type LeveldbShardBatch struct { + shard *LeveldbShard + batches []*leveldb.Batch + batchesCount uint32 +} + +func NewLeveldbShardBatch(shard *LeveldbShard) *LeveldbShardBatch { + shardBatch := &LeveldbShardBatch{ + batches: make([]*leveldb.Batch, shard.dbCount), + batchesCount: shard.dbCount, + shard: shard, + } + + for i := uint32(0); i < shard.dbCount; i++ { + shardBatch.batches[i] = batchesPool.Get().(*leveldb.Batch) + } + + runtime.SetFinalizer(shardBatch, func(o *LeveldbShardBatch) { + for _, batch := range o.batches { + batch.Reset() + batchesPool.Put(batch) + } + + o.batches = nil + }) + + return shardBatch +} + +func (l *LeveldbShardBatch) mapBatch(key []byte) *leveldb.Batch { + return l.batches[mapDBIndex(key, l.batchesCount)] +} + +// Put inserts the given value into the key-value data store. +func (l *LeveldbShardBatch) Put(key []byte, value []byte) error { + l.mapBatch(key).Put(key, value) + return nil +} + +// Delete removes the key from the key-value data store. +func (l *LeveldbShardBatch) Delete(key []byte) error { + l.mapBatch(key).Delete(key) + return nil +} + +// ValueSize retrieves the amount of data queued up for writing. +func (l *LeveldbShardBatch) ValueSize() int { + size := 0 + for _, batch := range l.batches { + size += batch.Len() + } + return size +} + +// Write flushes any accumulated data to disk. +func (l *LeveldbShardBatch) Write() (err error) { + return parallelRunAndReturnErr(int(l.batchesCount), func(i int) error { + return l.shard.dbs[i].Write(l.batches[i], nil) + }) +} + +// Reset resets the batch for reuse. +func (l *LeveldbShardBatch) Reset() { + for _, batch := range l.batches { + batch.Reset() + } +} + +// Replay replays the batch contents. +func (l *LeveldbShardBatch) Replay(w ethdb.KeyValueWriter) error { + for _, batch := range l.batches { + err := batch.Replay(&replayer{writer: w}) + if err != nil { + return err + } + } + + return nil +} + +// replayer is a small wrapper to implement the correct replay methods. +type replayer struct { + writer ethdb.KeyValueWriter + failure error +} + +// Put inserts the given value into the key-value data store. +func (r *replayer) Put(key, value []byte) { + // If the replay already failed, stop executing ops + if r.failure != nil { + return + } + r.failure = r.writer.Put(key, value) +} + +// Delete removes the key from the key-value data store. +func (r *replayer) Delete(key []byte) { + // If the replay already failed, stop executing ops + if r.failure != nil { + return + } + r.failure = r.writer.Delete(key) +} diff --git a/internal/shardchain/local_cache/hack.go b/internal/shardchain/local_cache/hack.go new file mode 100644 index 000000000..9d102cf19 --- /dev/null +++ b/internal/shardchain/local_cache/hack.go @@ -0,0 +1,22 @@ +package local_cache + +import ( + "reflect" + "unsafe" +) + +func String(b []byte) (s string) { + if len(b) == 0 { + return "" + } + return *(*string)(unsafe.Pointer(&b)) +} + +func StringBytes(s string) []byte { + var b []byte + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + hdr.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data + hdr.Cap = len(s) + hdr.Len = len(s) + return b +} diff --git a/internal/shardchain/local_cache/local_cache_batch.go b/internal/shardchain/local_cache/local_cache_batch.go new file mode 100644 index 000000000..f144faa0e --- /dev/null +++ b/internal/shardchain/local_cache/local_cache_batch.go @@ -0,0 +1,82 @@ +package local_cache + +import ( + "github.com/ethereum/go-ethereum/ethdb" + "sync" +) + +type LocalCacheBatch struct { + db *LocalCacheDatabase + lock sync.Mutex + + size int + batchWriteKey [][]byte + batchWriteValue [][]byte + batchDeleteKey [][]byte +} + +func newLocalCacheBatch(db *LocalCacheDatabase) *LocalCacheBatch { + return &LocalCacheBatch{db: db} +} + +func (b *LocalCacheBatch) Put(key []byte, value []byte) error { + b.lock.Lock() + defer b.lock.Unlock() + + b.batchWriteKey = append(b.batchWriteKey, key) + b.batchWriteValue = append(b.batchWriteValue, value) + b.size += len(key) + len(value) + return nil +} + +func (b *LocalCacheBatch) Delete(key []byte) error { + b.lock.Lock() + defer b.lock.Unlock() + + b.batchDeleteKey = append(b.batchDeleteKey, key) + b.size += len(key) + return nil +} + +func (b *LocalCacheBatch) ValueSize() int { + return b.size +} + +func (b *LocalCacheBatch) Write() error { + b.lock.Lock() + defer b.lock.Unlock() + + return b.db.batchWrite(b) +} + +func (b *LocalCacheBatch) Reset() { + b.lock.Lock() + defer b.lock.Unlock() + + b.batchWriteKey = b.batchWriteKey[:0] + b.batchWriteValue = b.batchWriteValue[:0] + b.batchDeleteKey = b.batchDeleteKey[:0] + b.size = 0 +} + +func (b *LocalCacheBatch) Replay(w ethdb.KeyValueWriter) error { + if len(b.batchWriteKey) > 0 { + for i, key := range b.batchWriteKey { + err := w.Put(key, b.batchWriteValue[i]) + if err != nil { + return err + } + } + } + + if len(b.batchDeleteKey) > 0 { + for _, key := range b.batchDeleteKey { + err := w.Delete(key) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/internal/shardchain/local_cache/local_cache_database.go b/internal/shardchain/local_cache/local_cache_database.go new file mode 100644 index 000000000..e877fee4f --- /dev/null +++ b/internal/shardchain/local_cache/local_cache_database.go @@ -0,0 +1,111 @@ +package local_cache + +import ( + "bytes" + "github.com/allegro/bigcache" + "github.com/ethereum/go-ethereum/ethdb" + "log" + "time" +) + +type cacheWrapper struct { + *bigcache.BigCache +} + +func (c *cacheWrapper) Put(key []byte, value []byte) error { + return c.BigCache.Set(String(key), value) +} + +func (c *cacheWrapper) Delete(key []byte) error { + return c.BigCache.Delete(String(key)) +} + +type LocalCacheDatabase struct { + ethdb.KeyValueStore + + enableReadCache bool + + deleteMap map[string]bool + readCache *cacheWrapper +} + +func NewLocalCacheDatabase(remoteDB ethdb.KeyValueStore) *LocalCacheDatabase { + config := bigcache.DefaultConfig(10 * time.Minute) + config.HardMaxCacheSize = 512 + config.MaxEntriesInWindow = 2000 * 10 * 60 + cache, _ := bigcache.NewBigCache(config) + + db := &LocalCacheDatabase{ + KeyValueStore: remoteDB, + + enableReadCache: true, + deleteMap: make(map[string]bool), + readCache: &cacheWrapper{cache}, + } + + go func() { + for range time.Tick(time.Second) { + log.Printf("cache: %#v %d (%d)", cache.Stats(), cache.Len(), cache.Capacity()) + } + }() + + return db +} + +func (c *LocalCacheDatabase) Has(key []byte) (bool, error) { + return c.KeyValueStore.Has(key) +} + +func (c *LocalCacheDatabase) Get(key []byte) (ret []byte, err error) { + if c.enableReadCache { + if bytes.Compare(key, []byte("LastBlock")) != 0 { + strKey := String(key) + ret, err = c.readCache.Get(strKey) + if err == nil { + return ret, nil + } + + defer func() { + if err == nil { + _ = c.readCache.Set(strKey, ret) + } + }() + } + } + + return c.KeyValueStore.Get(key) +} + +func (c *LocalCacheDatabase) Put(key []byte, value []byte) error { + if c.enableReadCache { + _ = c.readCache.Put(key, value) + } + + return c.KeyValueStore.Put(key, value) +} + +func (c *LocalCacheDatabase) Delete(key []byte) error { + if c.enableReadCache { + _ = c.readCache.Delete(key) + } + + return c.KeyValueStore.Delete(key) +} + +func (c *LocalCacheDatabase) NewBatch() ethdb.Batch { + return newLocalCacheBatch(c) +} + +func (c *LocalCacheDatabase) batchWrite(b *LocalCacheBatch) error { + if c.enableReadCache { + _ = b.Replay(c.readCache) + } + + batch := c.KeyValueStore.NewBatch() + err := b.Replay(batch) + if err != nil { + return err + } + + return batch.Write() +}