Merge pull request #4066 from LuttyYang/leveldb_shard

leveldb shard
pull/4075/head
Leo Chen 3 years ago committed by GitHub
commit 7644a366fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      cmd/harmony/config_migrations_test.go
  2. 7
      cmd/harmony/config_test.go
  3. 7
      cmd/harmony/default.go
  4. 56
      cmd/harmony/flags.go
  5. 53
      cmd/harmony/flags_test.go
  6. 14
      cmd/harmony/main.go
  7. 2
      go.mod
  8. 9
      internal/configs/harmony/harmony.go
  9. 35
      internal/shardchain/dbfactory.go
  10. 37
      internal/shardchain/leveldb_shard/common.go
  11. 200
      internal/shardchain/leveldb_shard/shard.go
  12. 119
      internal/shardchain/leveldb_shard/shard_batch.go
  13. 22
      internal/shardchain/local_cache/hack.go
  14. 83
      internal/shardchain/local_cache/local_cache_batch.go
  15. 119
      internal/shardchain/local_cache/local_cache_database.go
  16. 6
      rosetta/infra/Dockerfile
  17. 8
      rosetta/infra/rclone.conf
  18. 5
      rosetta/infra/run.sh

@ -284,6 +284,13 @@ Version = "1.0.4"
LegacyServer = true
MinPeers = 6
[ShardData]
EnableShardData = false
DiskCount = 8
ShardCount = 4
CacheTime = 10
CacheSize = 512
[TxPool]
BlacklistFile = "./.hmy/blacklist.txt"

@ -96,6 +96,13 @@ Version = "1.0.4"
LegacyServer = true
MinPeers = 6
[ShardData]
EnableShardData = false
DiskCount = 8
ShardCount = 4
CacheTime = 10
CacheSize = 512
[WS]
Enabled = true
IP = "127.0.0.1"

@ -88,6 +88,13 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
},
},
DNSSync: getDefaultDNSSyncConfig(defNetworkType),
ShardData: harmonyconfig.ShardDataConfig{
EnableShardData: false,
DiskCount: 8,
ShardCount: 4,
CacheTime: 10,
CacheSize: 512,
},
}
var defaultSysConfig = harmonyconfig.SysConfig{

@ -214,6 +214,14 @@ var (
syncDiscHighFlag,
syncDiscBatchFlag,
}
shardDataFlags = []cli.Flag{
enableShardDataFlag,
diskCountFlag,
shardCountFlag,
cacheTimeFlag,
cacheSizeFlag,
}
)
var (
@ -313,6 +321,7 @@ func getRootFlags() []cli.Flag {
flags = append(flags, legacyMiscFlags...)
flags = append(flags, prometheusFlags...)
flags = append(flags, syncFlags...)
flags = append(flags, shardDataFlags...)
return flags
}
@ -1608,3 +1617,50 @@ func applySyncFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
config.Sync.DiscBatch = cli.GetIntFlagValue(cmd, syncDiscBatchFlag)
}
}
// shard data flags
var (
enableShardDataFlag = cli.BoolFlag{
Name: "sharddata.enable",
Usage: "whether use multi-database mode of levelDB",
DefValue: defaultConfig.ShardData.EnableShardData,
}
diskCountFlag = cli.IntFlag{
Name: "sharddata.disk_count",
Usage: "the count of disks you want to storage block data",
DefValue: defaultConfig.ShardData.DiskCount,
}
shardCountFlag = cli.IntFlag{
Name: "sharddata.shard_count",
Usage: "the count of shards you want to split in each disk",
DefValue: defaultConfig.ShardData.ShardCount,
}
cacheTimeFlag = cli.IntFlag{
Name: "sharddata.cache_time",
Usage: "local cache save time (minute)",
DefValue: defaultConfig.ShardData.CacheTime,
}
cacheSizeFlag = cli.IntFlag{
Name: "sharddata.cache_size",
Usage: "local cache storage size (MB)",
DefValue: defaultConfig.ShardData.CacheSize,
}
)
func applyShardDataFlags(cmd *cobra.Command, cfg *harmonyconfig.HarmonyConfig) {
if cli.IsFlagChanged(cmd, enableShardDataFlag) {
cfg.ShardData.EnableShardData = cli.GetBoolFlagValue(cmd, enableShardDataFlag)
}
if cli.IsFlagChanged(cmd, diskCountFlag) {
cfg.ShardData.DiskCount = cli.GetIntFlagValue(cmd, diskCountFlag)
}
if cli.IsFlagChanged(cmd, shardCountFlag) {
cfg.ShardData.ShardCount = cli.GetIntFlagValue(cmd, shardCountFlag)
}
if cli.IsFlagChanged(cmd, cacheTimeFlag) {
cfg.ShardData.CacheTime = cli.GetIntFlagValue(cmd, cacheTimeFlag)
}
if cli.IsFlagChanged(cmd, cacheSizeFlag) {
cfg.ShardData.CacheSize = cli.GetIntFlagValue(cmd, cacheSizeFlag)
}
}

@ -141,6 +141,13 @@ func TestHarmonyFlags(t *testing.T) {
Gateway: "https://gateway.harmony.one",
},
Sync: defaultMainnetSyncConfig,
ShardData: harmonyconfig.ShardDataConfig{
EnableShardData: false,
DiskCount: 8,
ShardCount: 4,
CacheTime: 10,
CacheSize: 512,
},
},
},
}
@ -1245,6 +1252,52 @@ func TestSyncFlags(t *testing.T) {
}
}
func TestShardDataFlags(t *testing.T) {
tests := []struct {
args []string
expConfig harmonyconfig.ShardDataConfig
expErr error
}{
{
args: []string{},
expConfig: defaultConfig.ShardData,
},
{
args: []string{"--sharddata.enable",
"--sharddata.disk_count", "8",
"--sharddata.shard_count", "4",
"--sharddata.cache_time", "10",
"--sharddata.cache_size", "512",
},
expConfig: harmonyconfig.ShardDataConfig{
EnableShardData: true,
DiskCount: 8,
ShardCount: 4,
CacheTime: 10,
CacheSize: 512,
},
},
}
for i, test := range tests {
ts := newFlagTestSuite(t, shardDataFlags, func(command *cobra.Command, config *harmonyconfig.HarmonyConfig) {
applyShardDataFlags(command, config)
})
hc, err := ts.run(test.args)
if assErr := assertError(err, test.expErr); assErr != nil {
t.Fatalf("Test %v: %v", i, assErr)
}
if err != nil || test.expErr != nil {
continue
}
if !reflect.DeepEqual(hc.ShardData, test.expConfig) {
t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.ShardData, test.expConfig)
}
ts.tearDown()
}
}
type flagTestSuite struct {
t *testing.T

@ -231,6 +231,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
applyRevertFlags(cmd, config)
applyPrometheusFlags(cmd, config)
applySyncFlags(cmd, config)
applyShardDataFlags(cmd, config)
}
func setupNodeLog(config harmonyconfig.HarmonyConfig) {
@ -655,7 +656,18 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
}
// Current node.
chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir}
var chainDBFactory shardchain.DBFactory
if hc.ShardData.EnableShardData {
chainDBFactory = &shardchain.LDBShardFactory{
RootDir: nodeConfig.DBDir,
DiskCount: hc.ShardData.DiskCount,
ShardCount: hc.ShardData.ShardCount,
CacheTime: hc.ShardData.CacheTime,
CacheSize: hc.ShardData.CacheSize,
}
} else {
chainDBFactory = &shardchain.LDBFactory{RootDir: nodeConfig.DBDir}
}
currentNode := node.New(myHost, currentConsensus, chainDBFactory, blacklist, nodeConfig.ArchiveModes(), &hc)

@ -5,7 +5,7 @@ go 1.16
require (
github.com/VictoriaMetrics/fastcache v1.5.7 // indirect
github.com/Workiva/go-datastructures v1.0.50
github.com/allegro/bigcache v1.2.1 // indirect
github.com/allegro/bigcache v1.2.1
github.com/aristanetworks/goarista v0.0.0-20190607111240-52c2a7864a08 // indirect
github.com/aws/aws-sdk-go v1.30.1
github.com/beevik/ntp v0.3.0

@ -28,6 +28,7 @@ type HarmonyConfig struct {
Legacy *LegacyConfig `toml:",omitempty"`
Prometheus *PrometheusConfig `toml:",omitempty"`
DNSSync DnsSync
ShardData ShardDataConfig
}
type DnsSync struct {
@ -65,6 +66,14 @@ type GeneralConfig struct {
EnablePruneBeaconChain bool
}
type ShardDataConfig struct {
EnableShardData bool
DiskCount int
ShardCount int
CacheTime int
CacheSize int
}
type ConsensusConfig struct {
MinPeers int
AggregateSig bool

@ -3,12 +3,22 @@ package shardchain
import (
"fmt"
"path"
"path/filepath"
"time"
"github.com/harmony-one/harmony/internal/shardchain/leveldb_shard"
"github.com/harmony-one/harmony/internal/shardchain/local_cache"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
)
const (
LDBDirPrefix = "harmony_db"
LDBShardDirPrefix = "harmony_sharddb"
)
// DBFactory is a blockchain database factory.
type DBFactory interface {
// NewChainDB returns a new database for the blockchain for
@ -23,7 +33,7 @@ type LDBFactory struct {
// NewChainDB returns a new LDB for the blockchain for given shard.
func (f *LDBFactory) NewChainDB(shardID uint32) (ethdb.Database, error) {
dir := path.Join(f.RootDir, fmt.Sprintf("harmony_db_%d", shardID))
dir := path.Join(f.RootDir, fmt.Sprintf("%s_%d", LDBDirPrefix, shardID))
return rawdb.NewLevelDBDatabase(dir, 256, 1024, "")
}
@ -34,3 +44,26 @@ type MemDBFactory struct{}
func (f *MemDBFactory) NewChainDB(shardID uint32) (ethdb.Database, error) {
return rawdb.NewMemoryDatabase(), nil
}
// LDBShardFactory is a merged Multi-LDB-backed blockchain database factory.
type LDBShardFactory struct {
RootDir string // directory in which to put shard databases in.
DiskCount int
ShardCount int
CacheTime int
CacheSize int
}
// NewChainDB returns a new memDB for the blockchain for given shard.
func (f *LDBShardFactory) NewChainDB(shardID uint32) (ethdb.Database, error) {
dir := filepath.Join(f.RootDir, fmt.Sprintf("%s_%d", LDBShardDirPrefix, shardID))
shard, err := leveldb_shard.NewLeveldbShard(dir, f.DiskCount, f.ShardCount)
if err != nil {
return nil, err
}
return rawdb.NewDatabase(local_cache.NewLocalCacheDatabase(shard, local_cache.CacheConfig{
CacheTime: time.Duration(f.CacheTime) * time.Minute,
CacheSize: f.CacheSize,
})), nil
}

@ -0,0 +1,37 @@
package leveldb_shard
import (
"hash/crc32"
"sync"
"sync/atomic"
)
func mapDBIndex(key []byte, dbCount uint32) uint32 {
return crc32.ChecksumIEEE(key) % dbCount
}
func parallelRunAndReturnErr(parallelNum int, cb func(index int) error) error {
wg := sync.WaitGroup{}
errAtomic := atomic.Value{}
for i := 0; i < parallelNum; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
err := cb(i)
if err != nil {
errAtomic.Store(err)
}
}(i)
}
wg.Wait()
if err := errAtomic.Load(); err != nil {
return errAtomic.Load().(error)
} else {
return nil
}
}

@ -0,0 +1,200 @@
package leveldb_shard
import (
"bytes"
"encoding/binary"
"fmt"
"path/filepath"
"strings"
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
type LeveldbShard struct {
dbs []*leveldb.DB
dbCount uint32
}
var shardIdxKey = []byte("__DB_SHARED_INDEX__")
func NewLeveldbShard(savePath string, diskCount int, diskShards int) (shard *LeveldbShard, err error) {
shard = &LeveldbShard{
dbs: make([]*leveldb.DB, diskCount*diskShards),
dbCount: uint32(diskCount * diskShards),
}
// clean when error
defer func() {
if err != nil {
for _, db := range shard.dbs {
if db != nil {
_ = db.Close()
}
}
shard = nil
}
}()
levelDBOptions := &opt.Options{
OpenFilesCacheCapacity: 128,
WriteBuffer: 8 << 20, //8MB, max memory occupyv = 8*2*diskCount*diskShards
BlockCacheCapacity: 16 << 20, //16MB
Filter: filter.NewBloomFilter(8),
DisableSeeksCompaction: true,
}
// async open
wg := sync.WaitGroup{}
for i := 0; i < diskCount; i++ {
for j := 0; j < diskShards; j++ {
shardPath := filepath.Join(savePath, fmt.Sprintf("disk%02d", i), fmt.Sprintf("block%02d", j))
dbIndex := i*diskShards + j
wg.Add(1)
go func() {
defer wg.Done()
ldb, openErr := leveldb.OpenFile(shardPath, levelDBOptions)
if openErr != nil {
err = openErr
return
}
indexByte := make([]byte, 8)
binary.BigEndian.PutUint64(indexByte, uint64(dbIndex))
inDBIndex, getErr := ldb.Get(shardIdxKey, nil)
if getErr != nil {
if getErr == leveldb.ErrNotFound {
putErr := ldb.Put(shardIdxKey, indexByte, nil)
if putErr != nil {
err = putErr
return
}
} else {
err = getErr
return
}
} else if bytes.Compare(indexByte, inDBIndex) != 0 {
err = fmt.Errorf("db shard index error, need %v, got %v", indexByte, inDBIndex)
return
}
shard.dbs[dbIndex] = ldb
}()
}
}
wg.Wait()
return shard, err
}
func (l *LeveldbShard) mapDB(key []byte) *leveldb.DB {
return l.dbs[mapDBIndex(key, l.dbCount)]
}
// Has retrieves if a key is present in the key-value data store.
func (l *LeveldbShard) Has(key []byte) (bool, error) {
return l.mapDB(key).Has(key, nil)
}
// Get retrieves the given key if it's present in the key-value data store.
func (l *LeveldbShard) Get(key []byte) ([]byte, error) {
return l.mapDB(key).Get(key, nil)
}
// Put inserts the given value into the key-value data store.
func (l *LeveldbShard) Put(key []byte, value []byte) error {
return l.mapDB(key).Put(key, value, nil)
}
// Delete removes the key from the key-value data store.
func (l *LeveldbShard) Delete(key []byte) error {
return l.mapDB(key).Delete(key, nil)
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
func (l *LeveldbShard) NewBatch() ethdb.Batch {
return NewLeveldbShardBatch(l)
}
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
// contained within the key-value database.
func (l *LeveldbShard) NewIterator() ethdb.Iterator {
return l.iterator(nil)
}
// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
func (l *LeveldbShard) NewIteratorWithStart(start []byte) ethdb.Iterator {
return l.iterator(&util.Range{Start: start})
}
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
func (l *LeveldbShard) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
return l.iterator(util.BytesPrefix(prefix))
}
func (l *LeveldbShard) iterator(slice *util.Range) ethdb.Iterator {
iters := make([]iterator.Iterator, l.dbCount)
for i, db := range l.dbs {
iter := db.NewIterator(slice, nil)
iters[i] = iter
}
return iterator.NewMergedIterator(iters, comparer.DefaultComparer, true)
}
// Stat returns a particular internal stat of the database.
func (l *LeveldbShard) Stat(property string) (string, error) {
sb := strings.Builder{}
for i, db := range l.dbs {
getProperty, err := db.GetProperty(property)
if err != nil {
return "", err
}
sb.WriteString(fmt.Sprintf("=== shard %02d ===\n", i))
sb.WriteString(getProperty)
sb.WriteString("\n")
}
return sb.String(), nil
}
// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (l *LeveldbShard) Compact(start []byte, limit []byte) (err error) {
return parallelRunAndReturnErr(int(l.dbCount), func(i int) error {
return l.dbs[i].CompactRange(util.Range{Start: start, Limit: limit})
})
}
// Close all the DB
func (l *LeveldbShard) Close() error {
for _, db := range l.dbs {
err := db.Close()
if err != nil {
return err
}
}
return nil
}

@ -0,0 +1,119 @@
package leveldb_shard
import (
"runtime"
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb"
)
var batchesPool = sync.Pool{
New: func() interface{} {
return &leveldb.Batch{}
},
}
type LeveldbShardBatch struct {
shard *LeveldbShard
batches []*leveldb.Batch
batchesCount uint32
}
func NewLeveldbShardBatch(shard *LeveldbShard) *LeveldbShardBatch {
shardBatch := &LeveldbShardBatch{
batches: make([]*leveldb.Batch, shard.dbCount),
batchesCount: shard.dbCount,
shard: shard,
}
for i := uint32(0); i < shard.dbCount; i++ {
shardBatch.batches[i] = batchesPool.Get().(*leveldb.Batch)
}
runtime.SetFinalizer(shardBatch, func(o *LeveldbShardBatch) {
for _, batch := range o.batches {
batch.Reset()
batchesPool.Put(batch)
}
o.batches = nil
})
return shardBatch
}
func (l *LeveldbShardBatch) mapBatch(key []byte) *leveldb.Batch {
return l.batches[mapDBIndex(key, l.batchesCount)]
}
// Put inserts the given value into the key-value data store.
func (l *LeveldbShardBatch) Put(key []byte, value []byte) error {
l.mapBatch(key).Put(key, value)
return nil
}
// Delete removes the key from the key-value data store.
func (l *LeveldbShardBatch) Delete(key []byte) error {
l.mapBatch(key).Delete(key)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (l *LeveldbShardBatch) ValueSize() int {
size := 0
for _, batch := range l.batches {
size += batch.Len()
}
return size
}
// Write flushes any accumulated data to disk.
func (l *LeveldbShardBatch) Write() (err error) {
return parallelRunAndReturnErr(int(l.batchesCount), func(i int) error {
return l.shard.dbs[i].Write(l.batches[i], nil)
})
}
// Reset resets the batch for reuse.
func (l *LeveldbShardBatch) Reset() {
for _, batch := range l.batches {
batch.Reset()
}
}
// Replay replays the batch contents.
func (l *LeveldbShardBatch) Replay(w ethdb.KeyValueWriter) error {
for _, batch := range l.batches {
err := batch.Replay(&replayer{writer: w})
if err != nil {
return err
}
}
return nil
}
// replayer is a small wrapper to implement the correct replay methods.
type replayer struct {
writer ethdb.KeyValueWriter
failure error
}
// Put inserts the given value into the key-value data store.
func (r *replayer) Put(key, value []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (r *replayer) Delete(key []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
r.failure = r.writer.Delete(key)
}

@ -0,0 +1,22 @@
package local_cache
import (
"reflect"
"unsafe"
)
func String(b []byte) (s string) {
if len(b) == 0 {
return ""
}
return *(*string)(unsafe.Pointer(&b))
}
func StringBytes(s string) []byte {
var b []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
hdr.Cap = len(s)
hdr.Len = len(s)
return b
}

@ -0,0 +1,83 @@
package local_cache
import (
"sync"
"github.com/ethereum/go-ethereum/ethdb"
)
type LocalCacheBatch struct {
db *LocalCacheDatabase
lock sync.Mutex
size int
batchWriteKey [][]byte
batchWriteValue [][]byte
batchDeleteKey [][]byte
}
func newLocalCacheBatch(db *LocalCacheDatabase) *LocalCacheBatch {
return &LocalCacheBatch{db: db}
}
func (b *LocalCacheBatch) Put(key []byte, value []byte) error {
b.lock.Lock()
defer b.lock.Unlock()
b.batchWriteKey = append(b.batchWriteKey, key)
b.batchWriteValue = append(b.batchWriteValue, value)
b.size += len(key) + len(value)
return nil
}
func (b *LocalCacheBatch) Delete(key []byte) error {
b.lock.Lock()
defer b.lock.Unlock()
b.batchDeleteKey = append(b.batchDeleteKey, key)
b.size += len(key)
return nil
}
func (b *LocalCacheBatch) ValueSize() int {
return b.size
}
func (b *LocalCacheBatch) Write() error {
b.lock.Lock()
defer b.lock.Unlock()
return b.db.batchWrite(b)
}
func (b *LocalCacheBatch) Reset() {
b.lock.Lock()
defer b.lock.Unlock()
b.batchWriteKey = b.batchWriteKey[:0]
b.batchWriteValue = b.batchWriteValue[:0]
b.batchDeleteKey = b.batchDeleteKey[:0]
b.size = 0
}
func (b *LocalCacheBatch) Replay(w ethdb.KeyValueWriter) error {
if len(b.batchWriteKey) > 0 {
for i, key := range b.batchWriteKey {
err := w.Put(key, b.batchWriteValue[i])
if err != nil {
return err
}
}
}
if len(b.batchDeleteKey) > 0 {
for _, key := range b.batchDeleteKey {
err := w.Delete(key)
if err != nil {
return err
}
}
}
return nil
}

@ -0,0 +1,119 @@
package local_cache
import (
"bytes"
"time"
"github.com/harmony-one/harmony/internal/utils"
"go.uber.org/zap"
"github.com/allegro/bigcache"
"github.com/ethereum/go-ethereum/ethdb"
)
type cacheWrapper struct {
*bigcache.BigCache
}
type CacheConfig struct {
CacheTime time.Duration
CacheSize int
}
func (c *cacheWrapper) Put(key []byte, value []byte) error {
return c.BigCache.Set(String(key), value)
}
func (c *cacheWrapper) Delete(key []byte) error {
return c.BigCache.Delete(String(key))
}
type LocalCacheDatabase struct {
ethdb.KeyValueStore
enableReadCache bool
deleteMap map[string]bool
readCache *cacheWrapper
}
func NewLocalCacheDatabase(remoteDB ethdb.KeyValueStore, cacheConfig CacheConfig) *LocalCacheDatabase {
config := bigcache.DefaultConfig(cacheConfig.CacheTime)
config.HardMaxCacheSize = cacheConfig.CacheSize
config.MaxEntriesInWindow = cacheConfig.CacheSize * 4 * int(cacheConfig.CacheTime.Seconds())
cache, _ := bigcache.NewBigCache(config)
db := &LocalCacheDatabase{
KeyValueStore: remoteDB,
enableReadCache: true,
deleteMap: make(map[string]bool),
readCache: &cacheWrapper{cache},
}
go func() {
for range time.Tick(time.Minute) {
utils.GetLogger().Info("local-cache", zap.Any("stats", cache.Stats()), zap.Int("count", cache.Len()), zap.Int("size", cache.Capacity()))
}
}()
return db
}
func (c *LocalCacheDatabase) Has(key []byte) (bool, error) {
return c.KeyValueStore.Has(key)
}
func (c *LocalCacheDatabase) Get(key []byte) (ret []byte, err error) {
if c.enableReadCache {
if bytes.Compare(key, []byte("LastBlock")) != 0 {
strKey := String(key)
ret, err = c.readCache.Get(strKey)
if err == nil {
return ret, nil
}
defer func() {
if err == nil {
_ = c.readCache.Set(strKey, ret)
}
}()
}
}
return c.KeyValueStore.Get(key)
}
func (c *LocalCacheDatabase) Put(key []byte, value []byte) error {
if c.enableReadCache {
_ = c.readCache.Put(key, value)
}
return c.KeyValueStore.Put(key, value)
}
func (c *LocalCacheDatabase) Delete(key []byte) error {
if c.enableReadCache {
_ = c.readCache.Delete(key)
}
return c.KeyValueStore.Delete(key)
}
func (c *LocalCacheDatabase) NewBatch() ethdb.Batch {
return newLocalCacheBatch(c)
}
func (c *LocalCacheDatabase) batchWrite(b *LocalCacheBatch) error {
if c.enableReadCache {
_ = b.Replay(c.readCache)
}
batch := c.KeyValueStore.NewBatch()
err := b.Replay(batch)
if err != nil {
return err
}
return batch.Write()
}

@ -24,7 +24,8 @@ RUN go mod tidy
RUN make linux_static && \
cp ./bin/harmony /root/harmony && \
cp ./rosetta/infra/run.sh /root/run.sh
cp ./rosetta/infra/run.sh /root/run.sh && \
cp ./rosetta/infra/rclone.conf /root/rclone.conf
RUN cp ./rosetta/infra/harmony-pstn.conf /root/harmony-pstn.conf && \
cp ./rosetta/infra/harmony-mainnet.conf /root/harmony-mainnet.conf && \
@ -34,13 +35,14 @@ RUN cp ./rosetta/infra/harmony-pstn.conf /root/harmony-pstn.conf && \
FROM ubuntu:latest
RUN apt update -y && \
apt install libgmp-dev libssl-dev -y && \
apt install libgmp-dev libssl-dev rclone ca-certificates -y && \
apt -y clean all
WORKDIR /root
COPY --from=build /root/harmony /root/harmony
COPY --from=build /root/run.sh /root/run.sh
COPY --from=build /root/rclone.conf /root/.config/rclone/rclone.conf
COPY --from=build /root/harmony-pstn.conf /root/harmony-pstn.conf
COPY --from=build /root/harmony-mainnet.conf /root/harmony-mainnet.conf
COPY --from=build /root/rosetta_local_fix.csv /root/rosetta_local_fix.csv

@ -0,0 +1,8 @@
[release]
type = s3
provider = AWS
env_auth = false
region = us-west-1
acl = public-read
server_side_encryption = AES256
storage_class = REDUCED_REDUNDANCY

@ -5,6 +5,11 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
DATA="$DIR/data"
LOGS="$DATA/logs"
BASE_ARGS=(--http.ip "0.0.0.0" --ws.ip "0.0.0.0" --http.rosetta --node_type "explorer" --datadir "$DATA" --log.dir "$LOGS")
DATA_NAME="${DATA_NAME:=harmony_db_0}"
if [ -n "$RCLONE_DB_0_URL" ]; then
rclone -P -L sync $RCLONE_DB_0_URL $DATA/$DATA_NAME --multi-thread-streams 4 --transfers=8
fi
mkdir -p "$LOGS"
echo -e NODE ARGS: \" "$@" "${BASE_ARGS[@]}" \"

Loading…
Cancel
Save