diff --git a/api/service/explorer/interface.go b/api/service/explorer/interface.go index b043ee3f9..a8e179415 100644 --- a/api/service/explorer/interface.go +++ b/api/service/explorer/interface.go @@ -1,11 +1,14 @@ package explorer import ( + "path" + "github.com/ethereum/go-ethereum/common" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/filter" - "github.com/syndtr/goleveldb/leveldb/opt" - levelutil "github.com/syndtr/goleveldb/leveldb/util" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/leveldb" + tikvCommon "github.com/harmony-one/harmony/internal/tikv/common" + "github.com/harmony-one/harmony/internal/tikv/prefix" + "github.com/harmony-one/harmony/internal/tikv/remote" ) // database is an adapter for *leveldb.DB @@ -32,54 +35,56 @@ type batch interface { ValueSize() int } -// lvlDB is the adapter for leveldb.Database -type lvlDB struct { - db *leveldb.DB +// explorerDB is the adapter for explorer node +type explorerDB struct { + db ethdb.KeyValueStore } -func newLvlDB(dbPath string) (database, error) { - // https://github.com/ethereum/go-ethereum/blob/master/ethdb/leveldb/leveldb.go#L98 options. - // We had 0 for handles and cache params before, so set 0s for all of them. Filter opt is the same. - options := &opt.Options{ - OpenFilesCacheCapacity: 500, - BlockCacheCapacity: 8 * 1024 * 1024, // 8 MiB - WriteBuffer: 4 * 1024 * 1024, // 4 MiB - Filter: filter.NewBloomFilter(10), +// newExplorerLvlDB new explorer storage using leveldb +func newExplorerLvlDB(dbPath string) (database, error) { + db, err := leveldb.New(dbPath, 16, 500, "explorer_db") + if err != nil { + return nil, err } - db, err := leveldb.OpenFile(dbPath, options) + return &explorerDB{db}, nil +} + +// newExplorerTiKv new explorer storage using leveldb +func newExplorerTiKv(pdAddr []string, dbPath string, readOnly bool) (database, error) { + prefixStr := append([]byte(path.Base(dbPath)), '/') + db, err := remote.NewRemoteDatabase(pdAddr, readOnly) if err != nil { return nil, err } - return &lvlDB{db}, nil + return &explorerDB{ + db: tikvCommon.ToEthKeyValueStore( + prefix.NewPrefixDatabase(prefixStr, db), + ), + }, nil } -func (db *lvlDB) Put(key, val []byte) error { - return db.db.Put(key, val, nil) +func (db *explorerDB) Put(key, val []byte) error { + return db.db.Put(key, val) } -func (db *lvlDB) Get(key []byte) ([]byte, error) { - return db.db.Get(key, nil) +func (db *explorerDB) Get(key []byte) ([]byte, error) { + return db.db.Get(key) } -func (db *lvlDB) Has(key []byte) (bool, error) { - return db.db.Has(key, nil) +func (db *explorerDB) Has(key []byte) (bool, error) { + return db.db.Has(key) } -func (db *lvlDB) NewBatch() batch { - batch := new(leveldb.Batch) - return &lvlBatch{ - batch: batch, - db: db.db, - } +func (db *explorerDB) NewBatch() batch { + return db.db.NewBatch() } -func (db *lvlDB) NewPrefixIterator(prefix []byte) iterator { - rng := levelutil.BytesPrefix(prefix) - it := db.db.NewIterator(rng, nil) +func (db *explorerDB) NewPrefixIterator(prefix []byte) iterator { + it := db.db.NewIteratorWithPrefix(prefix) return it } -func (db *lvlDB) NewSizedIterator(start []byte, size int) iterator { +func (db *explorerDB) NewSizedIterator(start []byte, size int) iterator { return db.newSizedIterator(start, size) } @@ -89,9 +94,8 @@ type sizedIterator struct { sizeLimit int } -func (db *lvlDB) newSizedIterator(start []byte, size int) *sizedIterator { - rng := &levelutil.Range{Start: start, Limit: nil} - it := db.db.NewIterator(rng, nil) +func (db *explorerDB) newSizedIterator(start []byte, size int) *sizedIterator { + it := db.db.NewIteratorWithStart(start) return &sizedIterator{ it: it, curIndex: 0, @@ -112,31 +116,6 @@ func (it *sizedIterator) Value() []byte { return it.it.Value() } func (it *sizedIterator) Release() { it.it.Release() } func (it *sizedIterator) Error() error { return it.it.Error() } -// Note: lvlBatch is not thread safe -type lvlBatch struct { - batch *leveldb.Batch - db *leveldb.DB - valueSize int -} - -func (b *lvlBatch) Put(key, val []byte) error { - b.batch.Put(key, val) - b.valueSize += len(val) - return nil -} - -func (b *lvlBatch) Write() error { - if err := b.db.Write(b.batch, nil); err != nil { - return err - } - b.valueSize = 0 - return nil -} - -func (b *lvlBatch) ValueSize() int { - return b.valueSize -} - type iterator interface { Next() bool Key() []byte diff --git a/api/service/explorer/interface_test.go b/api/service/explorer/interface_test.go index bedf6cb11..71414a17c 100644 --- a/api/service/explorer/interface_test.go +++ b/api/service/explorer/interface_test.go @@ -73,7 +73,7 @@ func TestLevelDBPrefixIterator(t *testing.T) { func newTestLevelDB(t *testing.T, i int) database { dbDir := tempTestDir(t, i) - db, err := newLvlDB(dbDir) + db, err := newExplorerLvlDB(dbDir) if err != nil { t.Fatal(err) } diff --git a/api/service/explorer/migration_test.go b/api/service/explorer/migration_test.go index 795059355..146a6f45e 100644 --- a/api/service/explorer/migration_test.go +++ b/api/service/explorer/migration_test.go @@ -40,11 +40,6 @@ type migrationDBFactory struct { func (f *migrationDBFactory) makeDB(numAddr int) database { db := newTestLevelDB(f.t, 0) - for i := 0; i != 10000; i++ { - if err := writeCheckpoint(db, uint64(i)); err != nil { - f.t.Fatal(err) - } - } for i := 0; i != numAddr; i++ { addr := f.newAddress() addrInfo := &Address{ID: string(addr)} diff --git a/api/service/explorer/schema.go b/api/service/explorer/schema.go index c0d9f5583..09848573d 100644 --- a/api/service/explorer/schema.go +++ b/api/service/explorer/schema.go @@ -3,8 +3,8 @@ package explorer import ( "encoding/binary" "fmt" - "math/big" + "github.com/RoaringBitmap/roaring/roaring64" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" goversion "github.com/hashicorp/go-version" @@ -15,26 +15,39 @@ import ( const ( LegAddressPrefix = "ad_" - CheckpointPrefix = "dc" + CheckpointBitmap = "checkpoint_bitmap" TracePrefix = "tr_" oneAddrByteLen = 42 // byte size of string "one1..." ) -// Common schema -// GetCheckpointKey ... -func GetCheckpointKey(blockNum *big.Int) []byte { - return []byte(fmt.Sprintf("%s_%x", CheckpointPrefix, blockNum)) -} +// readCheckpointBitmap read explorer checkpoint bitmap from storage +func readCheckpointBitmap(db databaseReader) (*roaring64.Bitmap, error) { + bitmapByte, err := db.Get([]byte(CheckpointBitmap)) + if err != nil { + if err == leveldb.ErrNotFound { + return roaring64.NewBitmap(), nil + } + return nil, err + } -func isBlockComputedInDB(db databaseReader, bn uint64) (bool, error) { - key := GetCheckpointKey(new(big.Int).SetUint64(bn)) - return db.Has(key) + rb := roaring64.NewBitmap() + err = rb.UnmarshalBinary(bitmapByte) + if err != nil { + return nil, err + } + + return rb, nil } -func writeCheckpoint(db databaseWriter, bn uint64) error { - blockCheckpoint := GetCheckpointKey(new(big.Int).SetUint64(bn)) - return db.Put(blockCheckpoint, []byte{}) +// writeCheckpointBitmap write explorer checkpoint bitmap to storage +func writeCheckpointBitmap(db databaseWriter, rb *roaring64.Bitmap) error { + bitmapByte, err := rb.MarshalBinary() + if err != nil { + return err + } + + return db.Put([]byte(CheckpointBitmap), bitmapByte) } func getTraceResultKey(key []byte) []byte { diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index 487aab568..b5ffa1b8b 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -11,6 +11,9 @@ import ( "strconv" "time" + "github.com/RoaringBitmap/roaring/roaring64" + harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" + ethCommon "github.com/ethereum/go-ethereum/common" "github.com/gorilla/mux" msg_pb "github.com/harmony-one/harmony/api/proto/message" @@ -42,20 +45,21 @@ type HTTPError struct { // Service is the struct for explorer service. type Service struct { - router *mux.Router - IP string - Port string - storage *storage - server *http.Server - messageChan chan *msg_pb.Message - blockchain core.BlockChain - backend hmy.NodeAPI + router *mux.Router + IP string + Port string + storage *storage + server *http.Server + messageChan chan *msg_pb.Message + blockchain core.BlockChain + backend hmy.NodeAPI + harmonyConfig *harmonyconfig.HarmonyConfig } // New returns explorer service. -func New(selfPeer *p2p.Peer, bc core.BlockChain, backend hmy.NodeAPI) *Service { +func New(harmonyConfig *harmonyconfig.HarmonyConfig, selfPeer *p2p.Peer, bc core.BlockChain, backend hmy.NodeAPI) *Service { dbPath := defaultDBPath(selfPeer.IP, selfPeer.Port) - storage, err := newStorage(bc, dbPath) + storage, err := newStorage(harmonyConfig, bc, dbPath) if err != nil { utils.Logger().Fatal().Err(err).Msg("cannot open explorer DB") } @@ -335,6 +339,11 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } +// GetCheckpointBitmap get explorer checkpoint bitmap +func (s *Service) GetCheckpointBitmap() *roaring64.Bitmap { + return s.storage.rb.Clone() +} + func defaultDBPath(ip, port string) string { return path.Join(nodeconfig.GetDefaultConfig().DBDir, "explorer_storage_"+ip+"_"+port) } diff --git a/api/service/explorer/storage.go b/api/service/explorer/storage.go index 901ae3b11..72d028180 100644 --- a/api/service/explorer/storage.go +++ b/api/service/explorer/storage.go @@ -9,6 +9,10 @@ import ( "sync" "time" + "github.com/RoaringBitmap/roaring/roaring64" + harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" + "github.com/harmony-one/harmony/internal/tikv" + "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/abool" "github.com/harmony-one/harmony/core" @@ -22,7 +26,8 @@ import ( ) const ( - numWorker = 8 + numWorker = 8 + changedSaveCount = 100 ) // ErrExplorerNotReady is the error when querying explorer db data when @@ -33,6 +38,7 @@ type ( storage struct { db database bc core.BlockChain + rb *roaring64.Bitmap // TODO: optimize this with priority queue tm *taskManager @@ -55,17 +61,39 @@ type ( } ) -func newStorage(bc core.BlockChain, dbPath string) (*storage, error) { - utils.Logger().Info().Msg("explorer storage folder: " + dbPath) - db, err := newLvlDB(dbPath) +func newExplorerDB(hc *harmonyconfig.HarmonyConfig, dbPath string) (database, error) { + if hc.General.RunElasticMode { + // init the storage using tikv + dbPath = fmt.Sprintf("explorer_tikv_%d", hc.General.ShardID) + readOnly := hc.TiKV.Role == tikv.RoleReader + utils.Logger().Info().Msg("explorer storage in tikv: " + dbPath) + return newExplorerTiKv(hc.TiKV.PDAddr, dbPath, readOnly) + } else { + // or leveldb + utils.Logger().Info().Msg("explorer storage folder: " + dbPath) + return newExplorerLvlDB(dbPath) + } +} + +func newStorage(hc *harmonyconfig.HarmonyConfig, bc core.BlockChain, dbPath string) (*storage, error) { + db, err := newExplorerDB(hc, dbPath) + if err != nil { + utils.Logger().Error().Err(err).Msg("Failed to create new explorer database") + return nil, err + } + + // load checkpoint roaring bitmap from storage + // roaring bitmap is a very high compression bitmap, in our scene, 1 million blocks use almost 1kb storage + bitmap, err := readCheckpointBitmap(db) if err != nil { - utils.Logger().Error().Err(err).Msg("Failed to create new database") return nil, err } + return &storage{ db: db, bc: bc, - tm: newTaskManager(), + rb: bitmap, + tm: newTaskManager(bitmap), resultC: make(chan blockResult, numWorker), resultT: make(chan *traceResult, numWorker), available: abool.New(), @@ -79,6 +107,7 @@ func (s *storage) Start() { } func (s *storage) Close() { + _ = writeCheckpointBitmap(s.db, s.rb) close(s.closeC) } @@ -182,14 +211,18 @@ type taskManager struct { blocksLP []*types.Block // blocks with low priorities lock sync.Mutex + rb *roaring64.Bitmap + rbChangedCount int + C chan struct{} T chan *traceResult } -func newTaskManager() *taskManager { +func newTaskManager(bitmap *roaring64.Bitmap) *taskManager { return &taskManager{ - C: make(chan struct{}, numWorker), - T: make(chan *traceResult, numWorker), + rb: bitmap, + C: make(chan struct{}, numWorker), + T: make(chan *traceResult, numWorker), } } @@ -245,6 +278,30 @@ func (tm *taskManager) PullTask() *types.Block { return nil } +// markBlockDone mark block processed done when explorer computed one block +func (tm *taskManager) markBlockDone(btc batch, blockNum uint64) { + tm.lock.Lock() + defer tm.lock.Unlock() + + if tm.rb.CheckedAdd(blockNum) { + tm.rbChangedCount++ + + // every 100 change write once + if tm.rbChangedCount == changedSaveCount { + tm.rbChangedCount = 0 + _ = writeCheckpointBitmap(btc, tm.rb) + } + } +} + +// markBlockDone check block is processed done +func (tm *taskManager) hasBlockDone(blockNum uint64) bool { + tm.lock.Lock() + defer tm.lock.Unlock() + + return tm.rb.Contains(blockNum) +} + func (s *storage) makeWorkersAndStart() { workers := make([]*blockComputer, 0, numWorker) for i := 0; i != numWorker; i++ { @@ -321,9 +378,8 @@ LOOP: } func (bc *blockComputer) computeBlock(b *types.Block) (*blockResult, error) { - is, err := isBlockComputedInDB(bc.db, b.NumberU64()) - if is || err != nil { - return nil, err + if bc.tm.hasBlockDone(b.NumberU64()) { + return nil, nil } btc := bc.db.NewBatch() @@ -333,7 +389,7 @@ func (bc *blockComputer) computeBlock(b *types.Block) (*blockResult, error) { for _, stk := range b.StakingTransactions() { bc.computeStakingTx(btc, b, stk) } - _ = writeCheckpoint(btc, b.NumberU64()) + bc.tm.markBlockDone(btc, b.NumberU64()) return &blockResult{ btc: btc, bn: b.NumberU64(), diff --git a/api/service/prometheus/service.go b/api/service/prometheus/service.go index 43f33d9ad..01654495e 100644 --- a/api/service/prometheus/service.go +++ b/api/service/prometheus/service.go @@ -8,6 +8,7 @@ import ( "net/http" "runtime/debug" "runtime/pprof" + "strings" "sync" "time" @@ -30,13 +31,18 @@ type Config struct { Legacy bool // legacy or not, legacy is harmony internal node NodeType string // node type, validator or exlorer node Shard uint32 // shard id, used as job suffix - Instance string //identifier of the instance in prometheus metrics + Instance string // identifier of the instance in prometheus metrics + TikvRole string // use for tikv explorer node } func (p Config) String() string { return fmt.Sprintf("%v, %v:%v, %v/%v, %v/%v/%v/%v:%v", p.Enabled, p.IP, p.Port, p.EnablePush, p.Gateway, p.Network, p.Legacy, p.NodeType, p.Shard, p.Instance) } +func (p Config) IsUsedTiKV() bool { + return p.TikvRole != "" +} + // Service provides Prometheus metrics via the /metrics route. This route will // show all the metrics registered with the Prometheus DefaultRegisterer. type Service struct { @@ -63,8 +69,9 @@ var ( func (s *Service) getJobName() string { var node string - // legacy nodes are harmony nodes: s0,s1,s2,s3 - if s.config.Legacy { + if s.config.IsUsedTiKV() { // tikv node must be explorer node, eg: te_reader0, te_writer0 + node = "te_" + strings.ToLower(s.config.TikvRole) + } else if s.config.Legacy { // legacy nodes are harmony nodes: s0,s1,s2,s3 node = "s" } else { if s.config.NodeType == "validator" { diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 5b402271b..845a9234e 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -15,6 +15,10 @@ import ( "syscall" "time" + "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" + "github.com/harmony-one/harmony/internal/tikv/redis_helper" + "github.com/harmony-one/harmony/internal/tikv/statedb_cache" + "github.com/harmony-one/harmony/api/service/crosslink_sending" rosetta_common "github.com/harmony-one/harmony/rosetta/common" @@ -297,6 +301,11 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { os.Exit(1) } + if hc.General.RunElasticMode && hc.TiKV == nil { + fmt.Fprintf(os.Stderr, "Use TIKV MUST HAS TIKV CONFIG") + os.Exit(1) + } + // Update ethereum compatible chain ids params.UpdateEthChainIDByShard(nodeConfig.ShardID) @@ -427,6 +436,8 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { currentNode.StartGRPCSyncClient() } + currentNode.NodeSyncing() + if err := currentNode.StartServices(); err != nil { fmt.Fprint(os.Stderr, err.Error()) os.Exit(-1) @@ -680,7 +691,9 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi // Current node. var chainDBFactory shardchain.DBFactory - if hc.ShardData.EnableShardData { + if hc.General.RunElasticMode { + chainDBFactory = setupTiKV(hc) + } else if hc.ShardData.EnableShardData { chainDBFactory = &shardchain.LDBShardFactory{ RootDir: nodeConfig.DBDir, DiskCount: hc.ShardData.DiskCount, @@ -754,6 +767,28 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi return currentNode } +func setupTiKV(hc harmonyconfig.HarmonyConfig) shardchain.DBFactory { + err := redis_helper.Init(hc.TiKV.StateDBRedisServerAddr) + if err != nil { + panic("can not connect to redis: " + err.Error()) + } + + factory := &shardchain.TiKvFactory{ + PDAddr: hc.TiKV.PDAddr, + Role: hc.TiKV.Role, + CacheConfig: statedb_cache.StateDBCacheConfig{ + CacheSizeInMB: hc.TiKV.StateDBCacheSizeInMB, + CachePersistencePath: hc.TiKV.StateDBCachePersistencePath, + RedisServerAddr: hc.TiKV.StateDBRedisServerAddr, + RedisLRUTimeInDay: hc.TiKV.StateDBRedisLRUTimeInDay, + DebugHitRate: hc.TiKV.Debug, + }, + } + + tikv_manage.SetDefaultTiKVFactory(factory) + return factory +} + func processNodeType(hc harmonyconfig.HarmonyConfig, currentNode *node.Node, currentConsensus *consensus.Consensus) { switch hc.General.NodeType { case nodeTypeExplorer: @@ -796,6 +831,11 @@ func setupPrometheusService(node *node.Node, hc harmonyconfig.HarmonyConfig, sid Shard: sid, Instance: myHost.GetID().Pretty(), } + + if hc.General.RunElasticMode { + prometheusConfig.TikvRole = hc.TiKV.Role + } + p := prometheus.NewService(prometheusConfig) node.RegisterService(service.Prometheus, p) } diff --git a/core/blockchain.go b/core/blockchain.go index 0953208f2..7300603d7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1,6 +1,7 @@ package core import ( + "github.com/harmony-one/harmony/internal/tikv/redis_helper" "math/big" "github.com/ethereum/go-ethereum/common" @@ -331,4 +332,15 @@ type BlockChain interface { payout reward.Reader, state *state.DB, ) (status WriteStatus, err error) + + // ========== Only For Tikv Start ========== + + // return true if is tikv writer master + IsTikvWriterMaster() bool + // RedisPreempt used for tikv mode, get the redis preempt instance + RedisPreempt() *redis_helper.RedisPreempt + // SyncFromTiKVWriter used for tikv mode, all reader or follower writer used to sync block from master writer + SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) error + + // ========== Only For Tikv End ========== } diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index 6cd082ea9..56ae457dd 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -22,13 +22,19 @@ import ( "encoding/json" "fmt" "io" + "log" "math/big" "os" + "strconv" "strings" "sync" "sync/atomic" "time" + harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" + "github.com/harmony-one/harmony/internal/tikv" + "github.com/harmony-one/harmony/internal/tikv/redis_helper" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" @@ -95,6 +101,7 @@ const ( maxTimeFutureBlocks = 30 badBlockLimit = 10 triesInMemory = 128 + triesInRedis = 1000 shardCacheLimit = 10 commitsCacheLimit = 10 epochCacheLimit = 10 @@ -129,6 +136,14 @@ type BlockChainImpl struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping + // The following two variables are used to clean up the cache of redis in tikv mode. + // This can improve the cache hit rate of redis + latestCleanCacheNum uint64 // The most recently cleaned cache of block num + cleanCacheChan chan uint64 // Used to notify blocks that will be cleaned up + + // redisPreempt used in tikv mode, write nodes preempt for write permissions and publish updates to reader nodes + redisPreempt *redis_helper.RedisPreempt + hc *HeaderChain trace bool // atomic? traceFeed event.Feed // send trace_block result to explorer @@ -196,7 +211,7 @@ func NewBlockChainWithOptions( // available in the database. It initialises the default Ethereum validator and // Processor. func NewBlockChain( - db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, + db ethdb.Database, stateCache state.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus_engine.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, ) (*BlockChainImpl, error) { @@ -235,7 +250,7 @@ func newBlockChainWithOptions( cacheConfig: cacheConfig, db: db, triegc: prque.New(nil), - stateCache: state.NewDatabase(db), + stateCache: stateCache, quit: make(chan struct{}), shouldPreserve: shouldPreserve, bodyCache: bodyCache, @@ -355,6 +370,7 @@ func (bc *BlockChainImpl) loadLastState() error { } } // Everything seems to be fine, set as the head block + bc.latestCleanCacheNum = currentBlock.NumberU64() - triesInRedis bc.currentBlock.Store(currentBlock) headBlockGauge.Update(int64(currentBlock.NumberU64())) @@ -689,6 +705,30 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error { return nil } +// tikvFastForward writes a new head block in tikv mode, used for reader node or follower writer node +func (bc *BlockChainImpl) tikvFastForward(block *types.Block, logs []*types.Log) error { + bc.currentBlock.Store(block) + headBlockGauge.Update(int64(block.NumberU64())) + + if err := bc.hc.SetCurrentHeader(block.Header()); err != nil { + return errors.Wrap(err, "HeaderChain SetCurrentHeader") + } + + bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(block.NumberU64())) + + var events []interface{} + events = append(events, ChainEvent{block, block.Hash(), logs}) + events = append(events, ChainHeadEvent{block}) + + if block.NumberU64() > triesInRedis { + bc.latestCleanCacheNum = block.NumberU64() - triesInRedis + } + + bc.PostChainEvents(events, logs) + return nil +} + // insert injects a new head block into the current block chain. This method // assumes that the block is indeed a true head. It will also reset the head // header and the head fast sync block to this very same block if they are older @@ -846,6 +886,10 @@ func (bc *BlockChainImpl) TrieNode(hash common.Hash) ([]byte, error) { } func (bc *BlockChainImpl) Stop() { + if bc == nil { + return + } + if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } @@ -1184,6 +1228,14 @@ func (bc *BlockChainImpl) WriteBlockWithState( } return NonStatTy, err } + + // clean block tire info in redis, used for tikv mode + if block.NumberU64() > triesInRedis { + select { + case bc.cleanCacheChan <- block.NumberU64() - triesInRedis: + default: + } + } } else { // Full but not archive node, do proper garbage collection triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive @@ -1300,8 +1352,17 @@ func (bc *BlockChainImpl) GetMaxGarbageCollectedBlockNumber() int64 { } func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { + // if in tikv mode, writer node need preempt master or come be a follower + if bc.isInitTiKV() && !bc.tikvPreemptMaster(bc.rangeBlock(chain)) { + return len(chain), nil + } + n, events, logs, err := bc.insertChain(chain, verifyHeaders) bc.PostChainEvents(events, logs) + if bc.isInitTiKV() && err != nil { + // if has some error, master writer node will release the permission + _, _ = bc.redisPreempt.Unlock() + } return n, err } @@ -1542,6 +1603,14 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block + // used for tikv mode, writer node will publish update to all reader node + if bc.isInitTiKV() { + err = redis_helper.PublishShardUpdate(bc.ShardID(), block.NumberU64(), logs) + if err != nil { + utils.Logger().Info().Err(err).Msg("redis publish shard update error") + } + } + // Only count canonical blocks for GC processing time bc.gcproc += proctime } @@ -3028,6 +3097,172 @@ func (bc *BlockChainImpl) IsEnablePruneBeaconChainFeature() bool { return bc.pruneBeaconChainEnable } +// SyncFromTiKVWriter used for tikv mode, all reader or follower writer used to sync block from master writer +func (bc *BlockChainImpl) SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) error { + head := rawdb.ReadHeadBlockHash(bc.db) + dbBlock := bc.GetBlockByHash(head) + currentBlock := bc.CurrentBlock() + + if dbBlock == nil || currentBlock == nil { + return nil + } + + currentBlockNum := currentBlock.NumberU64() + if currentBlockNum < newBlkNum { + start := time.Now() + for i := currentBlockNum; i <= newBlkNum; i++ { + blk := bc.GetBlockByNumber(i) + if blk == nil { + // cluster synchronization may be in progress + utils.Logger().Warn(). + Uint64("currentBlockNum", i). + Msg("[tikv sync] sync from writer got block nil, cluster synchronization may be in progress") + return nil + } + + err := bc.tikvFastForward(blk, logs) + if err != nil { + return err + } + } + utils.Logger().Info(). + Uint64("currentBlockNum", currentBlockNum). + Dur("usedTime", time.Now().Sub(start)). + Msg("[tikv sync] sync from writer") + } + + return nil +} + +// tikvCleanCache used for tikv mode, clean block tire data from redis +func (bc *BlockChainImpl) tikvCleanCache() { + var count int + for to := range bc.cleanCacheChan { + for i := bc.latestCleanCacheNum + 1; i <= to; i++ { + // build previous block statedb + fromBlock := bc.GetBlockByNumber(i) + fromTrie, err := state.New(fromBlock.Root(), bc.stateCache) + if err != nil { + continue + } + + // build current block statedb + toBlock := bc.GetBlockByNumber(i + 1) + toTrie, err := state.New(toBlock.Root(), bc.stateCache) + if err != nil { + continue + } + + // diff two statedb and delete redis cache + start := time.Now() + count, err = fromTrie.DiffAndCleanCache(bc.ShardID(), toTrie) + if err != nil { + utils.Logger().Warn(). + Err(err). + Msg("[tikv clean cache] error") + break + } + + utils.Logger().Info(). + Uint64("blockNum", i). + Int("removeCacheEntriesCount", count). + Dur("usedTime", time.Now().Sub(start)). + Msg("[tikv clean cache] success") + bc.latestCleanCacheNum = i + } + } +} + +func (bc *BlockChainImpl) isInitTiKV() bool { + return bc.redisPreempt != nil +} + +// tikvPreemptMaster used for tikv mode, writer node need preempt master or come be a follower +func (bc *BlockChainImpl) tikvPreemptMaster(fromBlock, toBlock uint64) bool { + for { + // preempt master + if ok, _ := bc.redisPreempt.TryLock(60); ok { + return true + } + + // follower + if bc.CurrentBlock().NumberU64() >= toBlock { + return false + } + + time.Sleep(time.Second) + } +} + +// rangeBlock get the block range of blocks +func (bc *BlockChainImpl) rangeBlock(blocks types.Blocks) (uint64, uint64) { + if len(blocks) == 0 { + return 0, 0 + } + max := blocks[0].NumberU64() + min := max + for _, tmpBlock := range blocks { + if tmpBlock.NumberU64() > max { + max = tmpBlock.NumberU64() + } else if tmpBlock.NumberU64() < min { + min = tmpBlock.NumberU64() + } + } + + return min, max +} + +// RedisPreempt used for tikv mode, get the redis preempt instance +func (bc *BlockChainImpl) RedisPreempt() *redis_helper.RedisPreempt { + if bc == nil { + return nil + } + return bc.redisPreempt +} + +func (bc *BlockChainImpl) IsTikvWriterMaster() bool { + if bc == nil || bc.redisPreempt == nil { + return false + } + + return bc.redisPreempt.LastLockStatus() +} + +// InitTiKV used for tikv mode, init the tikv mode +func (bc *BlockChainImpl) InitTiKV(conf *harmonyconfig.TiKVConfig) { + bc.cleanCacheChan = make(chan uint64, 10) + + if conf.Role == tikv.RoleWriter { + // only writer need preempt permission + bc.redisPreempt = redis_helper.CreatePreempt(fmt.Sprintf("shard_%d_preempt", bc.ShardID())) + } + + if conf.Debug { + // used for init redis cache + // If redis is empty, the hit rate will be too low and the synchronization block speed will be slow + // set LOAD_PRE_FETCH is yes can significantly improve this. + if os.Getenv("LOAD_PRE_FETCH") == "yes" { + if trie, err := state.New(bc.CurrentBlock().Root(), bc.stateCache); err == nil { + trie.Prefetch(512) + } else { + log.Println("LOAD_PRE_FETCH ERR: ", err) + } + } + + // If redis is empty, there is no need to clear the cache of nearly 1000 blocks + // set CLEAN_INIT is the latest block num can skip slow and unneeded cleanup process + if os.Getenv("CLEAN_INIT") != "" { + from, err := strconv.Atoi(os.Getenv("CLEAN_INIT")) + if err == nil { + bc.latestCleanCacheNum = uint64(from) + } + } + } + + // start clean block tire data process + go bc.tikvCleanCache() +} + var ( leveldbErrSpec = "leveldb" tooManyOpenFilesErrStr = "Too many open files" diff --git a/core/evm_test.go b/core/evm_test.go index 1f63adad7..0e564d18f 100644 --- a/core/evm_test.go +++ b/core/evm_test.go @@ -46,7 +46,7 @@ func getTestEnvironment(testBankKey ecdsa.PrivateKey) (*BlockChainImpl, *state.D genesis := gspec.MustCommit(database) // fake blockchain - chain, _ := NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) + chain, _ := NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil) db, _ := chain.StateAt(genesis.Root()) // make a fake block header (use epoch 1 so that locked tokens can be tested) diff --git a/core/state/prefeth.go b/core/state/prefeth.go new file mode 100644 index 000000000..50094fa63 --- /dev/null +++ b/core/state/prefeth.go @@ -0,0 +1,167 @@ +package state + +import ( + "bytes" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + "github.com/harmony-one/harmony/internal/utils" +) + +type prefetchJob struct { + accountAddr []byte + account *Account + start, end []byte +} + +// Prefetch If redis is empty, the hit rate will be too low and the synchronization block speed will be slow +// this function will parallel load the latest block statedb to redis +// this function used by debug or first time to init tikv cluster +func (s *DB) Prefetch(parallel int) { + wg := sync.WaitGroup{} + + jobChan := make(chan *prefetchJob, 10000) + waitWorker := int64(parallel) + + // start parallel workers + for i := 0; i < parallel; i++ { + go func() { + defer wg.Done() + for job := range jobChan { + atomic.AddInt64(&waitWorker, -1) + s.prefetchWorker(job, jobChan) + atomic.AddInt64(&waitWorker, 1) + } + }() + + wg.Add(1) + } + + // add first jobs + for i := 0; i < 255; i++ { + start := []byte{byte(i)} + end := []byte{byte(i + 1)} + if i == parallel-1 { + end = nil + } else if i == 0 { + start = nil + } + + jobChan <- &prefetchJob{ + start: start, + end: end, + } + } + + // wait all worker done + start := time.Now() + log.Println("Prefetch start") + var sleepCount int + for sleepCount < 3 { + time.Sleep(time.Second) + waitWorkerCount := atomic.LoadInt64(&waitWorker) + if waitWorkerCount >= int64(parallel) { + sleepCount++ + } else { + sleepCount = 0 + } + } + + close(jobChan) + wg.Wait() + end := time.Now() + log.Println("Prefetch end, use", end.Sub(start)) +} + +// prefetchWorker used to process one job +func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) { + if job.account == nil { + // scan one account + nodeIterator := s.trie.NodeIterator(job.start) + it := trie.NewIterator(nodeIterator) + + for it.Next() { + if job.end != nil && bytes.Compare(it.Key, job.end) >= 0 { + return + } + + // build account data from main trie tree + var data Account + if err := rlp.DecodeBytes(it.Value, &data); err != nil { + panic(err) + } + addrBytes := s.trie.GetKey(it.Key) + addr := common.BytesToAddress(addrBytes) + obj := newObject(s, addr, data) + if data.CodeHash != nil { + obj.Code(s.db) + } + + // build account trie tree + storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(nil)) + storageJob := &prefetchJob{ + accountAddr: addrBytes, + account: &data, + } + + // fetch data + s.prefetchAccountStorage(jobs, storageJob, storageIt) + } + } else { + // scan main trie tree + obj := newObject(s, common.BytesToAddress(job.accountAddr), *job.account) + storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(job.start)) + + // fetch data + s.prefetchAccountStorage(jobs, job, storageIt) + } +} + +// prefetchAccountStorage used for fetch account storage +func (s *DB) prefetchAccountStorage(jobs chan *prefetchJob, job *prefetchJob, it *trie.Iterator) { + start := time.Now() + count := 0 + + for it.Next() { + if job.end != nil && bytes.Compare(it.Key, job.end) >= 0 { + return + } + + count++ + + // if fetch one account job used more than 15s, then split the job + if count%10000 == 0 && time.Now().Sub(start) > 15*time.Second { + middle := utils.BytesMiddle(it.Key, job.end) + startJob := &prefetchJob{ + accountAddr: job.accountAddr, + account: job.account, + start: it.Key, + end: middle, + } + endJob := &prefetchJob{ + accountAddr: job.accountAddr, + account: job.account, + start: middle, + end: job.end, + } + + select { + case jobs <- endJob: + select { + case jobs <- startJob: + return + default: + job.end = startJob.end + start = time.Now() + } + default: + log.Println("job full, continue") + } + } + } +} diff --git a/core/state/tikv_clean.go b/core/state/tikv_clean.go new file mode 100644 index 000000000..a62d42901 --- /dev/null +++ b/core/state/tikv_clean.go @@ -0,0 +1,80 @@ +package state + +import ( + "bytes" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" +) + +var secureKeyPrefix = []byte("secure-key-") + +// DiffAndCleanCache clean block tire data from redis, Used to reduce redis storage and increase hit rate +func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) { + // create difference iterator + it, _ := trie.NewDifferenceIterator(to.trie.NodeIterator(nil), s.trie.NodeIterator(nil)) + db, err := tikv_manage.GetDefaultTiKVFactory().NewCacheStateDB(shardId) + if err != nil { + return 0, err + } + + batch := db.NewBatch() + wg := &sync.WaitGroup{} + count := uint64(0) + for it.Next(true) { + if !it.Leaf() { + // delete it if trie leaf node + atomic.AddUint64(&count, 1) + _ = batch.Delete(it.Hash().Bytes()) + } else { + + // build account data + addrBytes := s.trie.GetKey(it.LeafKey()) + addr := common.BytesToAddress(addrBytes) + + var fromAccount, toAccount Account + if err := rlp.DecodeBytes(it.LeafBlob(), &fromAccount); err != nil { + continue + } + + if toByte, err := to.trie.TryGet(addrBytes); err != nil { + continue + } else if err := rlp.DecodeBytes(toByte, &toAccount); err != nil { + continue + } + + // if account not changed, skip + if bytes.Compare(fromAccount.Root.Bytes(), toAccount.Root.Bytes()) == 0 { + continue + } + + // create account difference iterator + fromAccountTrie := newObject(s, addr, fromAccount).getTrie(s.db) + toAccountTrie := newObject(to, addr, toAccount).getTrie(to.db) + accountIt, _ := trie.NewDifferenceIterator(toAccountTrie.NodeIterator(nil), fromAccountTrie.NodeIterator(nil)) + + // parallel to delete data + wg.Add(1) + go func() { + defer wg.Done() + for accountIt.Next(true) { + atomic.AddUint64(&count, 1) + + if !accountIt.Leaf() { + _ = batch.Delete(accountIt.Hash().Bytes()) + } else { + _ = batch.Delete(append(append([]byte{}, secureKeyPrefix...), accountIt.LeafKey()...)) + } + } + }() + } + } + + wg.Wait() + + return int(count), db.ReplayCache(batch) +} diff --git a/core/tx_pool.go b/core/tx_pool.go index c481886fa..2efaf3be0 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -170,6 +170,8 @@ type TxPoolConfig struct { Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + AddEvent func(tx types.PoolTransaction, local bool) // Fire add event + Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction AllowedTxs map[common.Address]AllowedTxData // Set of allowed transactions can break the blocklist } @@ -958,7 +960,14 @@ func (pool *TxPool) pendingEpoch() *big.Int { // If a newly added transaction is marked as local, its sending account will be // whitelisted, preventing any associated transaction from being dropped out of // the pool due to pricing constraints. -func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { +func (pool *TxPool) add(tx types.PoolTransaction, local bool) (replaced bool, err error) { + defer func() { + if err == nil && pool.config.AddEvent != nil { + // used for tikv mode, writer will publish txpool change to all reader, this makes the state consistent + pool.config.AddEvent(tx, local) + } + }() + logger := utils.Logger().With().Stack().Logger() // If the transaction is in the error sink, remove it as it may succeed if pool.txErrorSink.Contains(tx.Hash().String()) { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index a618d1d8b..b951bcea6 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -161,7 +161,7 @@ func createBlockChain() *BlockChainImpl { genesis := gspec.MustCommit(database) _ = genesis engine := chain2.NewEngine() - blockchain, _ := NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) + blockchain, _ := NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil) return blockchain } diff --git a/go.mod b/go.mod index d766636c7..3e3cd932e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/harmony-one/harmony go 1.18 require ( + github.com/RoaringBitmap/roaring v1.1.0 + github.com/VictoriaMetrics/fastcache v1.5.7 github.com/Workiva/go-datastructures v1.0.50 github.com/allegro/bigcache v1.2.1 github.com/aws/aws-sdk-go v1.30.1 @@ -13,6 +15,8 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.7.1 github.com/ethereum/go-ethereum v1.9.25 + github.com/fjl/memsize v0.0.0-20180929194037-2a09253e352a // indirect + github.com/go-redis/redis/v8 v8.11.5 github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/golangci/golangci-lint v1.22.2 @@ -38,7 +42,7 @@ require ( github.com/pborman/uuid v1.2.0 github.com/pelletier/go-toml v1.9.3 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.10.0 + github.com/prometheus/client_golang v1.11.0 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 github.com/rjeczalik/notify v0.9.2 github.com/rs/cors v1.7.0 @@ -48,13 +52,15 @@ require ( github.com/spf13/viper v1.6.1 github.com/stretchr/testify v1.7.0 github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca + github.com/tikv/client-go/v2 v2.0.1 github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee go.uber.org/ratelimit v0.1.0 - go.uber.org/zap v1.16.0 + go.uber.org/zap v1.20.0 golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba - google.golang.org/grpc v1.33.2 + golang.org/x/tools v0.1.7 // indirect + google.golang.org/grpc v1.43.0 google.golang.org/protobuf v1.26.0 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce diff --git a/hmy/blockchain.go b/hmy/blockchain.go index 1a306671a..f72be5d83 100644 --- a/hmy/blockchain.go +++ b/hmy/blockchain.go @@ -5,6 +5,8 @@ import ( "fmt" "math/big" + v3 "github.com/harmony-one/harmony/block/v3" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/event" @@ -174,10 +176,20 @@ func (hmy *Harmony) GetPreStakingBlockRewards( // GetLatestChainHeaders .. func (hmy *Harmony) GetLatestChainHeaders() *block.HeaderPair { - return &block.HeaderPair{ - BeaconHeader: hmy.BeaconChain.CurrentHeader(), - ShardHeader: hmy.BlockChain.CurrentHeader(), + pair := &block.HeaderPair{ + BeaconHeader: &block.Header{Header: v3.NewHeader()}, + ShardHeader: &block.Header{Header: v3.NewHeader()}, + } + + if hmy.BeaconChain != nil { + pair.BeaconHeader = hmy.BeaconChain.CurrentHeader() } + + if hmy.BlockChain != nil { + pair.ShardHeader = hmy.BlockChain.CurrentHeader() + } + + return pair } // GetLastCrossLinks .. diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index 7154f535b..1ad5f0e72 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -27,6 +27,7 @@ type HarmonyConfig struct { Revert *RevertConfig `toml:",omitempty"` Legacy *LegacyConfig `toml:",omitempty"` Prometheus *PrometheusConfig `toml:",omitempty"` + TiKV *TiKVConfig `toml:",omitempty"` DNSSync DnsSync ShardData ShardDataConfig } @@ -66,6 +67,18 @@ type GeneralConfig struct { DataDir string TraceEnable bool EnablePruneBeaconChain bool + RunElasticMode bool +} + +type TiKVConfig struct { + Debug bool + + PDAddr []string + Role string + StateDBCacheSizeInMB uint32 + StateDBCachePersistencePath string + StateDBRedisServerAddr []string + StateDBRedisLRUTimeInDay uint32 } type ShardDataConfig struct { diff --git a/internal/shardchain/dbfactory_tikv.go b/internal/shardchain/dbfactory_tikv.go new file mode 100644 index 000000000..b1bcbcf47 --- /dev/null +++ b/internal/shardchain/dbfactory_tikv.go @@ -0,0 +1,126 @@ +package shardchain + +import ( + "fmt" + "io" + "sync" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/harmony-one/harmony/internal/tikv" + tikvCommon "github.com/harmony-one/harmony/internal/tikv/common" + "github.com/harmony-one/harmony/internal/tikv/prefix" + "github.com/harmony-one/harmony/internal/tikv/remote" + "github.com/harmony-one/harmony/internal/tikv/statedb_cache" + + "github.com/ethereum/go-ethereum/ethdb" +) + +const ( + LDBTiKVPrefix = "harmony_tikv" +) + +type TiKvCacheConfig struct { + StateDBCacheSizeInMB uint32 + StateDBCachePersistencePath string + StateDBRedisServerAddr string + StateDBRedisLRUTimeInDay uint32 +} + +// TiKvFactory is a memory-backed blockchain database factory. +type TiKvFactory struct { + cacheDBMap sync.Map + + PDAddr []string + Role string + CacheConfig statedb_cache.StateDBCacheConfig +} + +// getStateDB create statedb storage use tikv +func (f *TiKvFactory) getRemoteDB(shardID uint32) (*prefix.PrefixDatabase, error) { + key := fmt.Sprintf("remote_%d_%s", shardID, f.Role) + if db, ok := f.cacheDBMap.Load(key); ok { + return db.(*prefix.PrefixDatabase), nil + } else { + prefixStr := []byte(fmt.Sprintf("%s_%d/", LDBTiKVPrefix, shardID)) + remoteDatabase, err := remote.NewRemoteDatabase(f.PDAddr, f.Role == tikv.RoleReader) + if err != nil { + return nil, err + } + + tmpDB := prefix.NewPrefixDatabase(prefixStr, remoteDatabase) + if loadedDB, loaded := f.cacheDBMap.LoadOrStore(key, tmpDB); loaded { + _ = tmpDB.Close() + return loadedDB.(*prefix.PrefixDatabase), nil + } + + return tmpDB, nil + } +} + +// getStateDB create statedb storage with local memory cahce +func (f *TiKvFactory) getStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) { + key := fmt.Sprintf("state_db_%d_%s", shardID, f.Role) + if db, ok := f.cacheDBMap.Load(key); ok { + return db.(*statedb_cache.StateDBCacheDatabase), nil + } else { + db, err := f.getRemoteDB(shardID) + if err != nil { + return nil, err + } + + tmpDB, err := statedb_cache.NewStateDBCacheDatabase(db, f.CacheConfig, f.Role == tikv.RoleReader) + if err != nil { + return nil, err + } + + if loadedDB, loaded := f.cacheDBMap.LoadOrStore(key, tmpDB); loaded { + _ = tmpDB.Close() + return loadedDB.(*statedb_cache.StateDBCacheDatabase), nil + } + + return tmpDB, nil + } +} + +// NewChainDB returns a new memDB for the blockchain for given shard. +func (f *TiKvFactory) NewChainDB(shardID uint32) (ethdb.Database, error) { + var database ethdb.KeyValueStore + + db, err := f.getRemoteDB(shardID) + if err != nil { + return nil, err + } + + database = tikvCommon.ToEthKeyValueStore(db) + + return rawdb.NewDatabase(database), nil +} + +// NewStateDB create shard tikv database +func (f *TiKvFactory) NewStateDB(shardID uint32) (ethdb.Database, error) { + var database ethdb.KeyValueStore + + cacheDatabase, err := f.getStateDB(shardID) + if err != nil { + return nil, err + } + + database = tikvCommon.ToEthKeyValueStore(cacheDatabase) + + return rawdb.NewDatabase(database), nil +} + +// NewCacheStateDB create shard statedb storage with memory cache +func (f *TiKvFactory) NewCacheStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) { + return f.getStateDB(shardID) +} + +// CloseAllDB close all tikv database +func (f *TiKvFactory) CloseAllDB() { + f.cacheDBMap.Range(func(_, value interface{}) bool { + if closer, ok := value.(io.Closer); ok { + _ = closer.Close() + } + return true + }) +} diff --git a/internal/shardchain/shardchains.go b/internal/shardchain/shardchains.go index 4c9d127e8..aa327aae8 100644 --- a/internal/shardchain/shardchains.go +++ b/internal/shardchain/shardchains.go @@ -4,6 +4,10 @@ import ( "math/big" "sync" + "github.com/harmony-one/harmony/core/state" + harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" + "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" + "github.com/harmony-one/harmony/shard" "github.com/ethereum/go-ethereum/common" @@ -33,13 +37,14 @@ type Collection interface { // CollectionImpl is the main implementation of the shard chain collection. // See the Collection interface for details. type CollectionImpl struct { - dbFactory DBFactory - dbInit DBInitializer - engine engine.Engine - mtx sync.Mutex - pool map[uint32]core.BlockChain - disableCache map[uint32]bool - chainConfig *params.ChainConfig + dbFactory DBFactory + dbInit DBInitializer + engine engine.Engine + mtx sync.Mutex + pool map[uint32]core.BlockChain + disableCache map[uint32]bool + chainConfig *params.ChainConfig + harmonyconfig *harmonyconfig.HarmonyConfig } // NewCollection creates and returns a new shard chain collection. @@ -49,16 +54,18 @@ type CollectionImpl struct { // dbInit is the shard chain initializer to use when the database returned by // the factory is brand new (empty). func NewCollection( + harmonyconfig *harmonyconfig.HarmonyConfig, dbFactory DBFactory, dbInit DBInitializer, engine engine.Engine, chainConfig *params.ChainConfig, ) *CollectionImpl { return &CollectionImpl{ - dbFactory: dbFactory, - dbInit: dbInit, - engine: engine, - pool: make(map[uint32]core.BlockChain), - disableCache: make(map[uint32]bool), - chainConfig: chainConfig, + harmonyconfig: harmonyconfig, + dbFactory: dbFactory, + dbInit: dbInit, + engine: engine, + pool: make(map[uint32]core.BlockChain), + disableCache: make(map[uint32]bool), + chainConfig: chainConfig, } } @@ -105,7 +112,6 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c // For beacon chain inside a shard chain, need to reset the eth chainID to shard 0's eth chainID in the config chainConfig.EthCompatibleChainID = big.NewInt(chainConfig.EthCompatibleShard0ChainID.Int64()) } - opts := core.Options{} if len(options) == 1 { opts = options[0] @@ -114,8 +120,10 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c if opts.EpochChain { bc, err = core.NewEpochChain(db, &chainConfig, sc.engine, vm.Config{}) } else { + stateCache, err := initStateCache(db, sc, shardID) + bc, err = core.NewBlockChainWithOptions( - db, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil, opts, + db, stateCache, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil, opts, ) } @@ -124,9 +132,28 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c } db = nil // don't close sc.pool[shardID] = bc + + if sc.harmonyconfig.General.RunElasticMode { + // init the tikv mode + bc.InitTiKV(sc.harmonyconfig.TiKV) + } + return bc, nil } +func initStateCache(db ethdb.Database, sc *CollectionImpl, shardID uint32) (state.Database, error) { + if sc.harmonyconfig.General.RunElasticMode { + // used for tikv mode, init state db using tikv storage + stateDB, err := tikv_manage.GetDefaultTiKVFactory().NewStateDB(shardID) + if err != nil { + return nil, err + } + return state.NewDatabaseWithCache(stateDB, 64), nil + } else { + return state.NewDatabase(db), nil + } +} + // DisableCache disables caching mode for newly opened chains. // It does not affect already open chains. For best effect, // use this immediately after creating collection. diff --git a/internal/shardchain/tikv_manage/factory_manage.go b/internal/shardchain/tikv_manage/factory_manage.go new file mode 100644 index 000000000..e3893af76 --- /dev/null +++ b/internal/shardchain/tikv_manage/factory_manage.go @@ -0,0 +1,28 @@ +package tikv_manage + +import ( + "sync" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/harmony-one/harmony/internal/tikv/statedb_cache" +) + +type TiKvFactory interface { + NewChainDB(shardID uint32) (ethdb.Database, error) + NewStateDB(shardID uint32) (ethdb.Database, error) + NewCacheStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) + CloseAllDB() +} + +var tikvInit = sync.Once{} +var tikvFactory TiKvFactory + +func SetDefaultTiKVFactory(factory TiKvFactory) { + tikvInit.Do(func() { + tikvFactory = factory + }) +} + +func GetDefaultTiKVFactory() (factory TiKvFactory) { + return tikvFactory +} diff --git a/internal/tikv/byte_alloc/alloc.go b/internal/tikv/byte_alloc/alloc.go new file mode 100644 index 000000000..1d4031f1a --- /dev/null +++ b/internal/tikv/byte_alloc/alloc.go @@ -0,0 +1,73 @@ +// from https://github.com/xtaci/smux/blob/master/alloc.go +package byte_alloc + +import ( + "sync" +) + +// magic number +var debruijinPos = [...]byte{0, 9, 1, 10, 13, 21, 2, 29, 11, 14, 16, 18, 22, 25, 3, 30, 8, 12, 20, 28, 15, 17, 24, 7, 19, 27, 23, 6, 26, 5, 4, 31} + +// Allocator for incoming frames, optimized to prevent overwriting after zeroing +type Allocator struct { + buffers []sync.Pool +} + +// NewAllocator initiates a []byte allocator for frames less than 1048576 bytes, +// the waste(memory fragmentation) of space allocation is guaranteed to be +// no more than 50%. +func NewAllocator() *Allocator { + alloc := new(Allocator) + alloc.buffers = make([]sync.Pool, 21) // 1B -> 1M + for k := range alloc.buffers { + size := 1 << uint32(k) + alloc.buffers[k].New = func() interface{} { + return make([]byte, size) + } + } + return alloc +} + +// Get a []byte from pool with most appropriate cap +func (alloc *Allocator) Get(size int) []byte { + if size <= 0 { + return nil + } + + if size > 1048576 { + return make([]byte, size) + } + + bits := msb(size) + if size == 1< 65536 || bufCap != 1<> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + return debruijinPos[(v*0x07C4ACDD)>>27] +} diff --git a/internal/tikv/byte_alloc/defailt.go b/internal/tikv/byte_alloc/defailt.go new file mode 100644 index 000000000..6725d8239 --- /dev/null +++ b/internal/tikv/byte_alloc/defailt.go @@ -0,0 +1,15 @@ +package byte_alloc + +var defaultAllocator *Allocator + +func init() { + defaultAllocator = NewAllocator() +} + +func Get(size int) []byte { + return defaultAllocator.Get(size) +} + +func Put(buf []byte) { + defaultAllocator.Put(buf) +} diff --git a/internal/tikv/common/errors.go b/internal/tikv/common/errors.go new file mode 100644 index 000000000..62670b259 --- /dev/null +++ b/internal/tikv/common/errors.go @@ -0,0 +1,10 @@ +package common + +import ( + "errors" + + "github.com/syndtr/goleveldb/leveldb" +) + +var ErrEmptyKey = errors.New("empty key is not supported") +var ErrNotFound = leveldb.ErrNotFound diff --git a/internal/tikv/common/hack.go b/internal/tikv/common/hack.go new file mode 100644 index 000000000..6c52ebfaa --- /dev/null +++ b/internal/tikv/common/hack.go @@ -0,0 +1,20 @@ +package common + +import ( + "unsafe" +) + +// String converts byte slice to string. +func String(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} + +// StringBytes converts string to byte slice. +func StringBytes(s string) []byte { + return *(*[]byte)(unsafe.Pointer( + &struct { + string + Cap int + }{s, len(s)}, + )) +} diff --git a/internal/tikv/common/tikv_store.go b/internal/tikv/common/tikv_store.go new file mode 100644 index 000000000..2ad342ad8 --- /dev/null +++ b/internal/tikv/common/tikv_store.go @@ -0,0 +1,41 @@ +package common + +import ( + "io" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +type TiKVStore interface { + ethdb.KeyValueReader + ethdb.KeyValueWriter + ethdb.Batcher + ethdb.Stater + ethdb.Compacter + io.Closer + + NewIterator(start, end []byte) ethdb.Iterator +} + +// TiKVStoreWrapper simple wrapper to covert to ethdb.KeyValueStore +type TiKVStoreWrapper struct { + TiKVStore +} + +func (t *TiKVStoreWrapper) NewIterator() ethdb.Iterator { + return t.TiKVStore.NewIterator(nil, nil) +} + +func (t *TiKVStoreWrapper) NewIteratorWithStart(start []byte) ethdb.Iterator { + return t.TiKVStore.NewIterator(start, nil) +} + +func (t *TiKVStoreWrapper) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { + bytesPrefix := util.BytesPrefix(prefix) + return t.TiKVStore.NewIterator(bytesPrefix.Start, bytesPrefix.Limit) +} + +func ToEthKeyValueStore(store TiKVStore) ethdb.KeyValueStore { + return &TiKVStoreWrapper{TiKVStore: store} +} diff --git a/internal/tikv/consts.go b/internal/tikv/consts.go new file mode 100644 index 000000000..952fc0e55 --- /dev/null +++ b/internal/tikv/consts.go @@ -0,0 +1,6 @@ +package tikv + +const ( + RoleReader = "Reader" + RoleWriter = "Writer" +) diff --git a/internal/tikv/prefix/prefix_batch.go b/internal/tikv/prefix/prefix_batch.go new file mode 100644 index 000000000..733496cbe --- /dev/null +++ b/internal/tikv/prefix/prefix_batch.go @@ -0,0 +1,42 @@ +package prefix + +import "github.com/ethereum/go-ethereum/ethdb" + +type PrefixBatch struct { + prefix []byte + batch ethdb.Batch +} + +func newPrefixBatch(prefix []byte, batch ethdb.Batch) *PrefixBatch { + return &PrefixBatch{prefix: prefix, batch: batch} +} + +// Put inserts the given value into the key-value data store. +func (p *PrefixBatch) Put(key []byte, value []byte) error { + return p.batch.Put(append(append([]byte{}, p.prefix...), key...), value) +} + +// Delete removes the key from the key-value data store. +func (p *PrefixBatch) Delete(key []byte) error { + return p.batch.Delete(append(append([]byte{}, p.prefix...), key...)) +} + +// ValueSize retrieves the amount of data queued up for writing. +func (p *PrefixBatch) ValueSize() int { + return p.batch.ValueSize() +} + +// Write flushes any accumulated data to disk. +func (p *PrefixBatch) Write() error { + return p.batch.Write() +} + +// Reset resets the batch for reuse. +func (p *PrefixBatch) Reset() { + p.batch.Reset() +} + +// Replay replays the batch contents. +func (p *PrefixBatch) Replay(w ethdb.KeyValueWriter) error { + return p.batch.Replay(newPrefixBatchReplay(p.prefix, w)) +} diff --git a/internal/tikv/prefix/prefix_batch_replay.go b/internal/tikv/prefix/prefix_batch_replay.go new file mode 100644 index 000000000..6f84fa207 --- /dev/null +++ b/internal/tikv/prefix/prefix_batch_replay.go @@ -0,0 +1,35 @@ +package prefix + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/ethdb" +) + +type PrefixBatchReplay struct { + prefix []byte + prefixLen int + w ethdb.KeyValueWriter +} + +func newPrefixBatchReplay(prefix []byte, w ethdb.KeyValueWriter) *PrefixBatchReplay { + return &PrefixBatchReplay{prefix: prefix, prefixLen: len(prefix), w: w} +} + +// Put inserts the given value into the key-value data store. +func (p *PrefixBatchReplay) Put(key []byte, value []byte) error { + if bytes.HasPrefix(key, p.prefix) { + return p.w.Put(key[p.prefixLen:], value) + } else { + return p.w.Put(key, value) + } +} + +// Delete removes the key from the key-value data store. +func (p *PrefixBatchReplay) Delete(key []byte) error { + if bytes.HasPrefix(key, p.prefix) { + return p.w.Delete(key[p.prefixLen:]) + } else { + return p.w.Delete(key) + } +} diff --git a/internal/tikv/prefix/prefix_database.go b/internal/tikv/prefix/prefix_database.go new file mode 100644 index 000000000..7bfe9d945 --- /dev/null +++ b/internal/tikv/prefix/prefix_database.go @@ -0,0 +1,109 @@ +package prefix + +import ( + "github.com/ethereum/go-ethereum/ethdb" + "github.com/harmony-one/harmony/internal/tikv/byte_alloc" + "github.com/harmony-one/harmony/internal/tikv/common" +) + +// PrefixDatabase is a wrapper to split the storage with prefix +type PrefixDatabase struct { + prefix []byte + db common.TiKVStore + keysPool *byte_alloc.Allocator +} + +func NewPrefixDatabase(prefix []byte, db common.TiKVStore) *PrefixDatabase { + return &PrefixDatabase{ + prefix: prefix, + db: db, + keysPool: byte_alloc.NewAllocator(), + } +} + +// makeKey use to create a key with prefix, keysPool can reduce gc pressure +func (p *PrefixDatabase) makeKey(keys []byte) []byte { + prefixLen := len(p.prefix) + byt := p.keysPool.Get(len(keys) + prefixLen) + copy(byt, p.prefix) + copy(byt[prefixLen:], keys) + + return byt +} + +// Has retrieves if a key is present in the key-value data store. +func (p *PrefixDatabase) Has(key []byte) (bool, error) { + return p.db.Has(p.makeKey(key)) +} + +// Get retrieves the given key if it's present in the key-value data store. +func (p *PrefixDatabase) Get(key []byte) ([]byte, error) { + return p.db.Get(p.makeKey(key)) +} + +// Put inserts the given value into the key-value data store. +func (p *PrefixDatabase) Put(key []byte, value []byte) error { + return p.db.Put(p.makeKey(key), value) +} + +// Delete removes the key from the key-value data store. +func (p *PrefixDatabase) Delete(key []byte) error { + return p.db.Delete(p.makeKey(key)) +} + +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called. +func (p *PrefixDatabase) NewBatch() ethdb.Batch { + return newPrefixBatch(p.prefix, p.db.NewBatch()) +} + +// buildLimitUsePrefix build the limit byte from start byte, useful generating from prefix works +func (p *PrefixDatabase) buildLimitUsePrefix() []byte { + var limit []byte + for i := len(p.prefix) - 1; i >= 0; i-- { + c := p.prefix[i] + if c < 0xff { + limit = make([]byte, i+1) + copy(limit, p.prefix) + limit[i] = c + 1 + break + } + } + + return limit +} + +// NewIterator creates a binary-alphabetical iterator over the start to end keyspace +// contained within the key-value database. +func (p *PrefixDatabase) NewIterator(start, end []byte) ethdb.Iterator { + start = append(p.prefix, start...) + + if len(end) == 0 { + end = p.buildLimitUsePrefix() + } else { + end = append(p.prefix, end...) + } + + return newPrefixIterator(p.prefix, p.db.NewIterator(start, end)) +} + +// Stat returns a particular internal stat of the database. +func (p *PrefixDatabase) Stat(property string) (string, error) { + return p.db.Stat(property) +} + +// Compact flattens the underlying data store for the given key range. In essence, +// deleted and overwritten versions are discarded, and the data is rearranged to +// reduce the cost of operations needed to access them. +// +// A nil start is treated as a key before all keys in the data store; a nil limit +// is treated as a key after all keys in the data store. If both is nil then it +// will compact entire data store. +func (p *PrefixDatabase) Compact(start []byte, limit []byte) error { + return p.db.Compact(start, limit) +} + +// Close the storage +func (p *PrefixDatabase) Close() error { + return p.db.Close() +} diff --git a/internal/tikv/prefix/prefix_iterator.go b/internal/tikv/prefix/prefix_iterator.go new file mode 100644 index 000000000..7f8562603 --- /dev/null +++ b/internal/tikv/prefix/prefix_iterator.go @@ -0,0 +1,53 @@ +package prefix + +import ( + "github.com/ethereum/go-ethereum/ethdb" +) + +type PrefixIterator struct { + prefix []byte + prefixLen int + + it ethdb.Iterator +} + +func newPrefixIterator(prefix []byte, it ethdb.Iterator) *PrefixIterator { + return &PrefixIterator{prefix: prefix, prefixLen: len(prefix), it: it} +} + +// Next moves the iterator to the next key/value pair. It returns whether the +// iterator is exhausted. +func (i *PrefixIterator) Next() bool { + return i.it.Next() +} + +// Error returns any accumulated error. Exhausting all the key/value pairs +// is not considered to be an error. +func (i *PrefixIterator) Error() error { + return i.it.Error() +} + +// Key returns the key of the current key/value pair, or nil if done. The caller +// should not modify the contents of the returned slice, and its contents may +// change on the next call to Next. +func (i *PrefixIterator) Key() []byte { + key := i.it.Key() + if len(key) < len(i.prefix) { + return nil + } + + return key[i.prefixLen:] +} + +// Value returns the value of the current key/value pair, or nil if done. The +// caller should not modify the contents of the returned slice, and its contents +// may change on the next call to Next. +func (i *PrefixIterator) Value() []byte { + return i.it.Value() +} + +// Release releases associated resources. Release should always succeed and can +// be called multiple times without causing error. +func (i *PrefixIterator) Release() { + i.it.Release() +} diff --git a/internal/tikv/redis_helper/common.go b/internal/tikv/redis_helper/common.go new file mode 100644 index 000000000..9a65a6884 --- /dev/null +++ b/internal/tikv/redis_helper/common.go @@ -0,0 +1,34 @@ +package redis_helper + +import ( + "context" + + "github.com/go-redis/redis/v8" +) + +var redisInstance *redis.ClusterClient + +// Init used to init redis instance in tikv mode +func Init(serverAddr []string) error { + option := &redis.ClusterOptions{ + Addrs: serverAddr, + PoolSize: 2, + } + + rdb := redis.NewClusterClient(option) + err := rdb.Ping(context.Background()).Err() + if err != nil { + return err + } + + redisInstance = rdb + return nil +} + +// Close disconnect redis instance in tikv mode +func Close() error { + if redisInstance != nil { + return redisInstance.Close() + } + return nil +} diff --git a/internal/tikv/redis_helper/lock.go b/internal/tikv/redis_helper/lock.go new file mode 100644 index 000000000..6f92b7acc --- /dev/null +++ b/internal/tikv/redis_helper/lock.go @@ -0,0 +1,72 @@ +package redis_helper + +import ( + "context" + "crypto/rand" + "encoding/base64" + + "github.com/go-redis/redis/v8" +) + +type RedisPreempt struct { + key string + password string + lockScript, unlockScript *redis.Script + lastLockStatus bool +} + +// CreatePreempt used to create a redis preempt instance +func CreatePreempt(key string) *RedisPreempt { + p := &RedisPreempt{ + key: key, + } + p.init() + + return p +} + +// init redis preempt instance and some script +func (p *RedisPreempt) init() { + byt := make([]byte, 18) + _, _ = rand.Read(byt) + p.password = base64.StdEncoding.EncodeToString(byt) + p.lockScript = redis.NewScript(` + if redis.call('get',KEYS[1]) == ARGV[1] then + return redis.call('expire', KEYS[1], ARGV[2]) + else + return redis.call('set', KEYS[1], ARGV[1], 'ex', ARGV[2], 'nx') + end + `) + p.unlockScript = redis.NewScript(` + if redis.call('get',KEYS[1]) == ARGV[1] then + return redis.call('del', KEYS[1]) + else + return 0 + end + `) +} + +// TryLock attempt to lock the master for ttlSecond +func (p *RedisPreempt) TryLock(ttlSecond int) (ok bool, err error) { + ok, err = p.lockScript.Run(context.Background(), redisInstance, []string{p.key}, p.password, ttlSecond).Bool() + p.lastLockStatus = ok + return +} + +// Unlock try to release the master permission +func (p *RedisPreempt) Unlock() (bool, error) { + if p == nil { + return false, nil + } + + return p.unlockScript.Run(context.Background(), redisInstance, []string{p.key}, p.password).Bool() +} + +// LastLockStatus get the last preempt status +func (p *RedisPreempt) LastLockStatus() bool { + if p == nil { + return false + } + + return p.lastLockStatus +} diff --git a/internal/tikv/redis_helper/pubsub.go b/internal/tikv/redis_helper/pubsub.go new file mode 100644 index 000000000..86fbea01f --- /dev/null +++ b/internal/tikv/redis_helper/pubsub.go @@ -0,0 +1,130 @@ +package redis_helper + +import ( + "context" + "fmt" + "io" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/utils" + stakingTypes "github.com/harmony-one/harmony/staking/types" + "github.com/pkg/errors" +) + +// BlockUpdate block update event +type BlockUpdate struct { + BlkNum uint64 + Logs []*types.Log +} + +// SubscribeShardUpdate subscribe block update event +func SubscribeShardUpdate(shardID uint32, cb func(blkNum uint64, logs []*types.Log)) { + pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("shard_update_%d", shardID)) + for message := range pubsub.Channel() { + block := &BlockUpdate{} + err := rlp.DecodeBytes([]byte(message.Payload), block) + if err != nil { + utils.Logger().Info().Err(err).Msg("redis subscribe shard update error") + continue + } + cb(block.BlkNum, block.Logs) + } +} + +// PublishShardUpdate publish block update event +func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error { + msg, err := rlp.EncodeToBytes(&BlockUpdate{ + BlkNum: blkNum, + Logs: logs, + }) + if err != nil { + return err + } + return redisInstance.Publish(context.Background(), fmt.Sprintf("shard_update_%d", shardID), msg).Err() +} + +//TxPoolUpdate tx pool update event +type TxPoolUpdate struct { + typ string + Local bool + Tx types.PoolTransaction +} + +// DecodeRLP decode struct from binary stream +func (t *TxPoolUpdate) DecodeRLP(stream *rlp.Stream) error { + if err := stream.Decode(&t.typ); err != nil { + return err + } + if err := stream.Decode(&t.Local); err != nil { + return err + } + + switch t.typ { + case "types.EthTransaction": + var tmp = &types.EthTransaction{} + if err := stream.Decode(tmp); err != nil { + return err + } + t.Tx = tmp + case "types.Transaction": + var tmp = &types.Transaction{} + if err := stream.Decode(tmp); err != nil { + return err + } + t.Tx = tmp + case "stakingTypes.StakingTransaction": + var tmp = &stakingTypes.StakingTransaction{} + if err := stream.Decode(tmp); err != nil { + return err + } + t.Tx = tmp + default: + return errors.New("unknown txpool type") + } + return nil +} + +// EncodeRLP encode struct to binary stream +func (t *TxPoolUpdate) EncodeRLP(w io.Writer) error { + switch t.Tx.(type) { + case *types.EthTransaction: + t.typ = "types.EthTransaction" + case *types.Transaction: + t.typ = "types.Transaction" + case *stakingTypes.StakingTransaction: + t.typ = "stakingTypes.StakingTransaction" + } + + if err := rlp.Encode(w, t.typ); err != nil { + return err + } + if err := rlp.Encode(w, t.Local); err != nil { + return err + } + return rlp.Encode(w, t.Tx) +} + +// SubscribeTxPoolUpdate subscribe tx pool update event +func SubscribeTxPoolUpdate(shardID uint32, cb func(tx types.PoolTransaction, local bool)) { + pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("txpool_update_%d", shardID)) + for message := range pubsub.Channel() { + txu := &TxPoolUpdate{} + err := rlp.DecodeBytes([]byte(message.Payload), &txu) + if err != nil { + utils.Logger().Info().Err(err).Msg("redis subscribe shard update error") + continue + } + cb(txu.Tx, txu.Local) + } +} + +// PublishTxPoolUpdate publish tx pool update event +func PublishTxPoolUpdate(shardID uint32, tx types.PoolTransaction, local bool) error { + txu := &TxPoolUpdate{Local: local, Tx: tx} + msg, err := rlp.EncodeToBytes(txu) + if err != nil { + return err + } + return redisInstance.Publish(context.Background(), fmt.Sprintf("txpool_update_%d", shardID), msg).Err() +} diff --git a/internal/tikv/remote/remote_batch.go b/internal/tikv/remote/remote_batch.go new file mode 100644 index 000000000..ca9572a30 --- /dev/null +++ b/internal/tikv/remote/remote_batch.go @@ -0,0 +1,119 @@ +package remote + +import ( + "bytes" + "context" + "sync" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/harmony-one/harmony/internal/tikv/common" +) + +type RemoteBatch struct { + db *RemoteDatabase + lock sync.Mutex + + size int + batchWriteKey [][]byte + batchWriteValue [][]byte + batchDeleteKey [][]byte +} + +func newRemoteBatch(db *RemoteDatabase) *RemoteBatch { + return &RemoteBatch{db: db} +} + +// Put inserts the given value into the key-value data store. +func (b *RemoteBatch) Put(key []byte, value []byte) error { + if len(key) == 0 { + return common.ErrEmptyKey + } + + if len(value) == 0 { + value = EmptyValueStub + } + + b.lock.Lock() + defer b.lock.Unlock() + + b.batchWriteKey = append(b.batchWriteKey, key) + b.batchWriteValue = append(b.batchWriteValue, value) + b.size += len(key) + len(value) + return nil +} + +// Delete removes the key from the key-value data store. +func (b *RemoteBatch) Delete(key []byte) error { + b.lock.Lock() + defer b.lock.Unlock() + + b.batchDeleteKey = append(b.batchDeleteKey, key) + b.size += len(key) + return nil +} + +// ValueSize retrieves the amount of data queued up for writing. +func (b *RemoteBatch) ValueSize() int { + return b.size +} + +// Write flushes any accumulated data to disk. +func (b *RemoteBatch) Write() error { + b.lock.Lock() + defer b.lock.Unlock() + + if len(b.batchWriteKey) > 0 { + err := b.db.client.BatchPut(context.Background(), b.batchWriteKey, b.batchWriteValue) + if err != nil { + return err + } + } + + if len(b.batchDeleteKey) > 0 { + err := b.db.client.BatchDelete(context.Background(), b.batchDeleteKey) + if err != nil { + return err + } + } + + return nil +} + +// Reset resets the batch for reuse. +func (b *RemoteBatch) Reset() { + b.lock.Lock() + defer b.lock.Unlock() + + b.batchWriteKey = b.batchWriteKey[:0] + b.batchWriteValue = b.batchWriteValue[:0] + b.batchDeleteKey = b.batchDeleteKey[:0] + b.size = 0 +} + +// Replay replays the batch contents. +func (b *RemoteBatch) Replay(w ethdb.KeyValueWriter) error { + for i, key := range b.batchWriteKey { + if bytes.Compare(b.batchWriteValue[i], EmptyValueStub) == 0 { + err := w.Put(key, []byte{}) + if err != nil { + return err + } + } else { + err := w.Put(key, b.batchWriteValue[i]) + if err != nil { + return err + } + } + } + + if len(b.batchDeleteKey) > 0 { + for _, key := range b.batchDeleteKey { + err := w.Delete(key) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/internal/tikv/remote/remote_database.go b/internal/tikv/remote/remote_database.go new file mode 100644 index 000000000..915e12375 --- /dev/null +++ b/internal/tikv/remote/remote_database.go @@ -0,0 +1,139 @@ +package remote + +import ( + "bytes" + "context" + "runtime/trace" + "sync/atomic" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/harmony-one/harmony/internal/tikv/common" + "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/rawkv" +) + +var EmptyValueStub = []byte("HarmonyTiKVEmptyValueStub") + +type RemoteDatabase struct { + client *rawkv.Client + readOnly bool + isClose uint64 +} + +func NewRemoteDatabase(pdAddr []string, readOnly bool) (*RemoteDatabase, error) { + client, err := rawkv.NewClient(context.Background(), pdAddr, config.DefaultConfig().Security) + if err != nil { + return nil, err + } + + db := &RemoteDatabase{ + client: client, + readOnly: readOnly, + } + + return db, nil +} + +// ReadOnly set storage to readonly mode +func (d *RemoteDatabase) ReadOnly() { + d.readOnly = true +} + +// Has retrieves if a key is present in the key-value data store. +func (d *RemoteDatabase) Has(key []byte) (bool, error) { + data, err := d.Get(key) + if err != nil { + if err == common.ErrNotFound { + return false, nil + } + return false, err + } else { + return len(data) != 0, nil + } +} + +// Get retrieves the given key if it's present in the key-value data store. +func (d *RemoteDatabase) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, common.ErrEmptyKey + } + + region := trace.StartRegion(context.Background(), "tikv Get") + defer region.End() + + get, err := d.client.Get(context.Background(), key) + if err != nil { + return nil, err + } + + if len(get) == 0 { + return nil, common.ErrNotFound + } + + if len(get) == len(EmptyValueStub) && bytes.Compare(get, EmptyValueStub) == 0 { + get = get[:0] + } + + return get, nil +} + +// Put inserts the given value into the key-value data store. +func (d *RemoteDatabase) Put(key []byte, value []byte) error { + if len(key) == 0 { + return common.ErrEmptyKey + } + if d.readOnly { + return nil + } + + if len(value) == 0 { + value = EmptyValueStub + } + + return d.client.Put(context.Background(), key, value) +} + +// Delete removes the key from the key-value data store. +func (d *RemoteDatabase) Delete(key []byte) error { + if len(key) == 0 { + return common.ErrEmptyKey + } + if d.readOnly { + return nil + } + + return d.client.Delete(context.Background(), key) +} + +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called. +func (d *RemoteDatabase) NewBatch() ethdb.Batch { + if d.readOnly { + return newNopRemoteBatch(d) + } + + return newRemoteBatch(d) +} + +func (d *RemoteDatabase) NewIterator(start, end []byte) ethdb.Iterator { + return newRemoteIterator(d, start, end) +} + +// Stat returns a particular internal stat of the database. +func (d *RemoteDatabase) Stat(property string) (string, error) { + return "", common.ErrNotFound +} + +// Compact tikv current not supprot manual compact +func (d *RemoteDatabase) Compact(start []byte, limit []byte) error { + return nil +} + +// Close disconnect the tikv +func (d *RemoteDatabase) Close() error { + if atomic.CompareAndSwapUint64(&d.isClose, 0, 1) { + return d.client.Close() + } + + return nil +} diff --git a/internal/tikv/remote/remote_iterator.go b/internal/tikv/remote/remote_iterator.go new file mode 100644 index 000000000..7eaf34cf2 --- /dev/null +++ b/internal/tikv/remote/remote_iterator.go @@ -0,0 +1,110 @@ +package remote + +import ( + "context" + "sync" +) + +const ( + iteratorOnce = 300 +) + +type RemoteIterator struct { + db *RemoteDatabase + lock sync.Mutex + + limit []byte + start []byte + + err error + end bool + pos int + + keys, values [][]byte + currentKey, currentValue []byte +} + +func newRemoteIterator(db *RemoteDatabase, start, limit []byte) *RemoteIterator { + return &RemoteIterator{ + db: db, + start: start, + limit: limit, + } +} + +// Next moves the iterator to the next key/value pair. It returns whether the +// iterator is exhausted. +func (i *RemoteIterator) Next() bool { + if i.end { + return false + } + + if i.keys == nil { + if next := i.scanNext(); !next { + return false + } + } + + i.currentKey = i.keys[i.pos] + i.currentValue = i.values[i.pos] + i.pos++ + + if i.pos >= len(i.keys) { + i.scanNext() + } + + return true +} + +// scanNext real scan from tikv, and cache it +func (i *RemoteIterator) scanNext() bool { + keys, values, err := i.db.client.Scan(context.Background(), i.start, i.limit, iteratorOnce) + if err != nil { + i.err = err + i.end = true + return false + } + + if len(keys) == 0 { + i.end = true + return false + } else { + i.start = append(keys[len(keys)-1], 0) + } + + i.pos = 0 + i.keys = keys + i.values = values + return true +} + +// Error returns any accumulated error. Exhausting all the key/value pairs +// is not considered to be an error. +func (i *RemoteIterator) Error() error { + return i.err +} + +// Key returns the key of the current key/value pair, or nil if done. The caller +// should not modify the contents of the returned slice, and its contents may +// change on the next call to Next. +func (i *RemoteIterator) Key() []byte { + return i.currentKey +} + +// Value returns the value of the current key/value pair, or nil if done. The +// caller should not modify the contents of the returned slice, and its contents +// may change on the next call to Next. +func (i *RemoteIterator) Value() []byte { + return i.currentValue +} + +// Release releases associated resources. Release should always succeed and can +// be called multiple times without causing error. +func (i *RemoteIterator) Release() { + i.db = nil + i.end = true + i.keys = nil + i.values = nil + i.currentKey = nil + i.currentValue = nil +} diff --git a/internal/tikv/remote/remote_nop_batch.go b/internal/tikv/remote/remote_nop_batch.go new file mode 100644 index 000000000..d58cbf9c7 --- /dev/null +++ b/internal/tikv/remote/remote_nop_batch.go @@ -0,0 +1,36 @@ +package remote + +import ( + "github.com/ethereum/go-ethereum/ethdb" +) + +// NopRemoteBatch on readonly mode, write operator will be reject +type NopRemoteBatch struct { +} + +func newNopRemoteBatch(db *RemoteDatabase) *NopRemoteBatch { + return &NopRemoteBatch{} +} + +func (b *NopRemoteBatch) Put(key []byte, value []byte) error { + return nil +} + +func (b *NopRemoteBatch) Delete(key []byte) error { + return nil +} + +func (b *NopRemoteBatch) ValueSize() int { + return 0 +} + +func (b *NopRemoteBatch) Write() error { + return nil +} + +func (b *NopRemoteBatch) Reset() { +} + +func (b *NopRemoteBatch) Replay(w ethdb.KeyValueWriter) error { + return nil +} diff --git a/internal/tikv/statedb_cache/statedb_cache_batch.go b/internal/tikv/statedb_cache/statedb_cache_batch.go new file mode 100644 index 000000000..4be66f589 --- /dev/null +++ b/internal/tikv/statedb_cache/statedb_cache_batch.go @@ -0,0 +1,45 @@ +package statedb_cache + +import ( + "github.com/ethereum/go-ethereum/ethdb" +) + +type StateDBCacheBatch struct { + db *StateDBCacheDatabase + remoteBatch ethdb.Batch +} + +func newStateDBCacheBatch(db *StateDBCacheDatabase, batch ethdb.Batch) *StateDBCacheBatch { + return &StateDBCacheBatch{db: db, remoteBatch: batch} +} + +func (b *StateDBCacheBatch) Put(key []byte, value []byte) error { + return b.remoteBatch.Put(key, value) +} + +func (b *StateDBCacheBatch) Delete(key []byte) error { + return b.remoteBatch.Delete(key) +} + +func (b *StateDBCacheBatch) ValueSize() int { + // In ethdb, the size of each commit is too small, this controls the submission frequency + return b.remoteBatch.ValueSize() / 40 +} + +func (b *StateDBCacheBatch) Write() (err error) { + defer func() { + if err == nil { + _ = b.db.cacheWrite(b) + } + }() + + return b.remoteBatch.Write() +} + +func (b *StateDBCacheBatch) Reset() { + b.remoteBatch.Reset() +} + +func (b *StateDBCacheBatch) Replay(w ethdb.KeyValueWriter) error { + return b.remoteBatch.Replay(w) +} diff --git a/internal/tikv/statedb_cache/statedb_cache_database.go b/internal/tikv/statedb_cache/statedb_cache_database.go new file mode 100644 index 000000000..15f9a2f56 --- /dev/null +++ b/internal/tikv/statedb_cache/statedb_cache_database.go @@ -0,0 +1,373 @@ +package statedb_cache + +import ( + "context" + "log" + "runtime" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/ethdb" + redis "github.com/go-redis/redis/v8" + "github.com/harmony-one/harmony/internal/tikv/common" +) + +const ( + maxMemoryEntrySize = 64 * 1024 + maxPipelineEntriesCount = 10000 + cacheMinGapInSecond = 600 +) + +type cacheWrapper struct { + *fastcache.Cache +} + +// Put inserts the given value into the key-value data store. +func (c *cacheWrapper) Put(key []byte, value []byte) error { + c.Cache.Set(key, value) + return nil +} + +// Delete removes the key from the key-value data store. +func (c *cacheWrapper) Delete(key []byte) error { + c.Cache.Del(key) + return nil +} + +type redisPipelineWrapper struct { + redis.Pipeliner + + expiredTime time.Duration +} + +// Put inserts the given value into the key-value data store. +func (c *redisPipelineWrapper) Put(key []byte, value []byte) error { + c.SetEX(context.Background(), string(key), value, c.expiredTime) + return nil +} + +// Delete removes the key from the key-value data store. +func (c *redisPipelineWrapper) Delete(key []byte) error { + c.Del(context.Background(), string(key)) + return nil +} + +type StateDBCacheConfig struct { + CacheSizeInMB uint32 + CachePersistencePath string + RedisServerAddr []string + RedisLRUTimeInDay uint32 + DebugHitRate bool +} + +type StateDBCacheDatabase struct { + config StateDBCacheConfig + enableReadCache bool + isClose uint64 + remoteDB common.TiKVStore + + // l1cache (memory) + l1Cache *cacheWrapper + + // l2cache (redis) + l2Cache *redis.ClusterClient + l2ReadOnly bool + l2ExpiredTime time.Duration + l2NextExpiredTime time.Time + l2ExpiredRefresh chan string + + // stats + l1HitCount uint64 + l2HitCount uint64 + missCount uint64 + notFoundOrErrorCount uint64 +} + +func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfig, readOnly bool) (*StateDBCacheDatabase, error) { + // create or load memory cache from file + cache := fastcache.LoadFromFileOrNew(config.CachePersistencePath, int(config.CacheSizeInMB)*1024*1024) + + // create options + option := &redis.ClusterOptions{ + Addrs: config.RedisServerAddr, + PoolSize: 32, + IdleTimeout: 60 * time.Second, + } + + if readOnly { + option.PoolSize = 16 + option.ReadOnly = true + } + + // check redis connection + rdb := redis.NewClusterClient(option) + err := rdb.Ping(context.Background()).Err() + if err != nil { + return nil, err + } + + // reload redis cluster slots status + rdb.ReloadState(context.Background()) + + db := &StateDBCacheDatabase{ + config: config, + enableReadCache: true, + remoteDB: remoteDB, + l1Cache: &cacheWrapper{cache}, + l2Cache: rdb, + l2ReadOnly: readOnly, + l2ExpiredTime: time.Duration(config.RedisLRUTimeInDay) * 24 * time.Hour, + } + + if !readOnly { + // Read a copy of the memory hit data into redis to improve the hit rate + // refresh read time to prevent recycling by lru + db.l2ExpiredRefresh = make(chan string, 100000) + go db.startL2ExpiredRefresh() + } + + // print debug info + if config.DebugHitRate { + go func() { + for range time.Tick(5 * time.Second) { + db.cacheStatusPrint() + } + }() + } + + return db, nil +} + +// Has retrieves if a key is present in the key-value data store. +func (c *StateDBCacheDatabase) Has(key []byte) (bool, error) { + return c.remoteDB.Has(key) +} + +// Get retrieves the given key if it's present in the key-value data store. +func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) { + if c.enableReadCache { + var ok bool + + // first, get data from memory cache + keyStr := string(key) + if ret, ok = c.l1Cache.HasGet(nil, key); ok { + if !c.l2ReadOnly { + select { + // refresh read time to prevent recycling by lru + case c.l2ExpiredRefresh <- keyStr: + default: + } + } + + atomic.AddUint64(&c.l1HitCount, 1) + return ret, nil + } + + defer func() { + if err == nil { + if len(ret) < maxMemoryEntrySize { + // set data to memory db if loaded from redis cache or leveldb + c.l1Cache.Set(key, ret) + } + } + }() + + timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFunc() + + // load data from redis cache + data := c.l2Cache.Get(timeoutCtx, keyStr) + if data.Err() == nil { + atomic.AddUint64(&c.l2HitCount, 1) + return data.Bytes() + } else if data.Err() != redis.Nil { + log.Printf("[Get WARN]Redis Error: %v", data.Err()) + } + + if !c.l2ReadOnly { + defer func() { + // set data to redis db if loaded from leveldb + if err == nil { + c.l2Cache.SetEX(context.Background(), keyStr, ret, c.l2ExpiredTime) + } + }() + } + } + + ret, err = c.remoteDB.Get(key) + if err != nil { + atomic.AddUint64(&c.notFoundOrErrorCount, 1) + } else { + atomic.AddUint64(&c.missCount, 1) + } + + return +} + +// Put inserts the given value into the key-value data store. +func (c *StateDBCacheDatabase) Put(key []byte, value []byte) (err error) { + if c.enableReadCache { + defer func() { + if err == nil { + if len(value) < maxMemoryEntrySize { + // memory db only accept the small data + c.l1Cache.Set(key, value) + } + if !c.l2ReadOnly { + timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFunc() + + // send the data to redis + res := c.l2Cache.SetEX(timeoutCtx, string(key), value, c.l2ExpiredTime) + if res.Err() == nil { + log.Printf("[Put WARN]Redis Error: %v", res.Err()) + } + } + } + }() + } + + return c.remoteDB.Put(key, value) +} + +// Delete removes the key from the key-value data store. +func (c *StateDBCacheDatabase) Delete(key []byte) (err error) { + if c.enableReadCache { + defer func() { + if err == nil { + c.l1Cache.Del(key) + if !c.l2ReadOnly { + c.l2Cache.Del(context.Background(), string(key)) + } + } + }() + } + + return c.remoteDB.Delete(key) +} + +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called. +func (c *StateDBCacheDatabase) NewBatch() ethdb.Batch { + return newStateDBCacheBatch(c, c.remoteDB.NewBatch()) +} + +// Stat returns a particular internal stat of the database. +func (c *StateDBCacheDatabase) Stat(property string) (string, error) { + switch property { + case "tikv.save.cache": + _ = c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU()) + } + return c.remoteDB.Stat(property) +} + +func (c *StateDBCacheDatabase) Compact(start []byte, limit []byte) error { + return c.remoteDB.Compact(start, limit) +} + +func (c *StateDBCacheDatabase) NewIterator(start, end []byte) ethdb.Iterator { + return c.remoteDB.NewIterator(start, end) +} + +// Close disconnect the redis and save memory cache to file +func (c *StateDBCacheDatabase) Close() error { + if atomic.CompareAndSwapUint64(&c.isClose, 0, 1) { + err := c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU()) + if err != nil { + log.Printf("save file to '%s' error: %v", c.config.CachePersistencePath, err) + } + + if !c.l2ReadOnly { + close(c.l2ExpiredRefresh) + time.Sleep(time.Second) + + for range c.l2ExpiredRefresh { + // nop, clear chan + } + } + + _ = c.l2Cache.Close() + + return c.remoteDB.Close() + } + + return nil +} + +// cacheWrite write batch to cache +func (c *StateDBCacheDatabase) cacheWrite(b ethdb.Batch) error { + if !c.l2ReadOnly { + pipeline := c.l2Cache.Pipeline() + + err := b.Replay(&redisPipelineWrapper{Pipeliner: pipeline, expiredTime: c.l2ExpiredTime}) + if err != nil { + return err + } + + _, err = pipeline.Exec(context.Background()) + if err != nil { + log.Printf("[BatchWrite WARN]Redis Error: %v", err) + } + } + + return b.Replay(c.l1Cache) +} + +func (c *StateDBCacheDatabase) refreshL2ExpiredTime() { + unix := time.Now().Add(c.l2ExpiredTime).Unix() + unix = unix - (unix % cacheMinGapInSecond) + cacheMinGapInSecond + c.l2NextExpiredTime = time.Unix(unix, 0) +} + +// startL2ExpiredRefresh batch refresh redis cache +func (c *StateDBCacheDatabase) startL2ExpiredRefresh() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + pipeline := c.l2Cache.Pipeline() + lastWrite := time.Now() + + for { + select { + case key, ok := <-c.l2ExpiredRefresh: + if !ok { + return + } + + pipeline.ExpireAt(context.Background(), key, c.l2NextExpiredTime) + if pipeline.Len() > maxPipelineEntriesCount { + _, _ = pipeline.Exec(context.Background()) + lastWrite = time.Now() + } + case now := <-ticker.C: + c.refreshL2ExpiredTime() + + if pipeline.Len() > 0 && now.Sub(lastWrite) > time.Second { + _, _ = pipeline.Exec(context.Background()) + lastWrite = time.Now() + } + } + } +} + +// print stats to stdout +func (c *StateDBCacheDatabase) cacheStatusPrint() { + var state = &fastcache.Stats{} + state.Reset() + c.l1Cache.UpdateStats(state) + + stats := c.l2Cache.PoolStats() + log.Printf("redis: TotalConns: %d, IdleConns: %d, StaleConns: %d", stats.TotalConns, stats.IdleConns, stats.StaleConns) + total := float64(c.l1HitCount + c.l2HitCount + c.missCount + c.notFoundOrErrorCount) + + log.Printf( + "total: l1Hit: %d(%.2f%%), l2Hit: %d(%.2f%%), miss: %d(%.2f%%), notFoundOrErrorCount: %d, fastcache: GetCalls: %d, SetCalls: %d, Misses: %d, EntriesCount: %d, BytesSize: %d", + c.l1HitCount, float64(c.l1HitCount)/total*100, c.l2HitCount, float64(c.l2HitCount)/total*100, c.missCount, float64(c.missCount)/total*100, c.notFoundOrErrorCount, + state.GetCalls, state.SetCalls, state.Misses, state.EntriesCount, state.BytesSize, + ) +} + +func (c *StateDBCacheDatabase) ReplayCache(batch ethdb.Batch) error { + return c.cacheWrite(batch) +} diff --git a/internal/utils/bytes.go b/internal/utils/bytes.go index 065dd543f..c1618192a 100644 --- a/internal/utils/bytes.go +++ b/internal/utils/bytes.go @@ -1,6 +1,9 @@ package utils -import "encoding/hex" +import ( + "encoding/hex" + "math/big" +) // use to look up number of 1 bit in 4 bits var halfByteLookup = [16]int{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4} @@ -44,3 +47,36 @@ func CountOneBits(arr []byte) int64 { } return int64(count) } + +func BytesMiddle(a, b []byte) []byte { + if a == nil && b == nil { + return []byte{128} + } + + if len(a) > len(b) { + tmp := make([]byte, len(a)) + if b == nil { + for i, _ := range tmp { + tmp[i] = 255 + } + } + copy(tmp, b) + b = tmp + } else if len(a) < len(b) { + tmp := make([]byte, len(b)) + if a == nil { + for i, _ := range tmp { + tmp[i] = 0 + } + } + copy(tmp, a) + a = tmp + } + + aI := big.NewInt(0).SetBytes(a) + bI := big.NewInt(0).SetBytes(b) + + aI.Add(aI, bI) + aI.Div(aI, big.NewInt(2)) + return aI.Bytes() +} diff --git a/node/node.go b/node/node.go index 1d8a50abb..ebe419a04 100644 --- a/node/node.go +++ b/node/node.go @@ -3,12 +3,17 @@ package node import ( "context" "fmt" + "log" "math/big" "os" "strings" "sync" "time" + "github.com/harmony-one/harmony/internal/shardchain/tikv_manage" + "github.com/harmony-one/harmony/internal/tikv" + "github.com/harmony-one/harmony/internal/tikv/redis_helper" + "github.com/ethereum/go-ethereum/rlp" harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" "github.com/harmony-one/harmony/internal/utils/crosslinks" @@ -152,6 +157,11 @@ func (node *Node) Blockchain() core.BlockChain { // Beaconchain returns the beaconchain from node. func (node *Node) Beaconchain() core.BlockChain { + // tikv mode not have the BeaconChain storage + if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID { + return nil + } + return node.chain(shard.BeaconChainShardID, core.Options{}) } @@ -222,6 +232,13 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) []error { Msg("[addPendingTransactions] Node out of sync, ignoring transactions") return nil } + + // in tikv mode, reader only accept the pending transaction from writer node, ignore the p2p message + if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.TiKV.Role == tikv.RoleReader { + log.Printf("skip reader addPendingTransactions: %#v", newTxs) + return nil + } + poolTxs := types.PoolTransactions{} errs := []error{} acceptCx := node.Blockchain().Config().AcceptsCrossTx(node.Blockchain().CurrentHeader().Epoch()) @@ -434,6 +451,11 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) ( case proto_node.Sync: nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "block_sync"}).Inc() + // in tikv mode, not need BeaconChain message + if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID { + return nil, 0, errIgnoreBeaconMsg + } + // checks whether the beacon block is larger than current block number blocksPayload := payload[p2pNodeMsgPrefixSize+1:] var blocks []*types.Block @@ -997,7 +1019,7 @@ func New( engine := chain.NewEngine() collection := shardchain.NewCollection( - chainDBFactory, &core.GenesisInitializer{NetworkType: node.NodeConfig.GetNetworkType()}, engine, &chainConfig, + harmonyconfig, chainDBFactory, &core.GenesisInitializer{NetworkType: node.NodeConfig.GetNetworkType()}, engine, &chainConfig, ) for shardID, archival := range isArchival { @@ -1022,16 +1044,19 @@ func New( } if b1, b2 := beaconChain == nil, blockchain == nil; b1 || b2 { - var err error - if b2 { - shardID := node.NodeConfig.ShardID - // HACK get the real error reason - _, err = node.shardChains.ShardChain(shardID) - } else { - _, err = node.shardChains.ShardChain(shard.BeaconChainShardID) + // in tikv mode, not need BeaconChain + if !(node.HarmonyConfig != nil && node.HarmonyConfig.General.RunElasticMode) || node.HarmonyConfig.General.ShardID == shard.BeaconChainShardID { + var err error + if b2 { + shardID := node.NodeConfig.ShardID + // HACK get the real error reason + _, err = node.shardChains.ShardChain(shardID) + } else { + _, err = node.shardChains.ShardChain(shard.BeaconChainShardID) + } + fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err) + os.Exit(-1) } - fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err) - os.Exit(-1) } node.BlockChannel = make(chan *types.Block) @@ -1053,6 +1078,16 @@ func New( txPoolConfig.Blacklist = blacklist txPoolConfig.AllowedTxs = allowedTxs txPoolConfig.Journal = fmt.Sprintf("%v/%v", node.NodeConfig.DBDir, txPoolConfig.Journal) + txPoolConfig.AddEvent = func(tx types.PoolTransaction, local bool) { + // in tikv mode, writer will publish tx pool update to all reader + if node.Blockchain().IsTikvWriterMaster() { + err := redis_helper.PublishTxPoolUpdate(uint32(harmonyconfig.General.ShardID), tx, local) + if err != nil { + utils.Logger().Info().Err(err).Msg("redis publish txpool update error") + } + } + } + node.TxPool = core.NewTxPool(txPoolConfig, node.Blockchain().Config(), blockchain, node.TransactionErrorSink) node.CxPool = core.NewCxPool(core.CxPoolSize) node.Worker = worker.New(node.Blockchain().Config(), blockchain, engine) @@ -1060,6 +1095,13 @@ func New( node.deciderCache, _ = lru.New(16) node.committeeCache, _ = lru.New(16) + // in tikv mode, not need BeaconChain + if !node.HarmonyConfig.General.RunElasticMode && node.Blockchain().ShardID() != shard.BeaconChainShardID { + node.BeaconWorker = worker.New( + node.Beaconchain().Config(), beaconChain, engine, + ) + } + node.pendingCXReceipts = map[string]*types.CXReceiptsProof{} node.proposedBlock = map[uint64]*types.Block{} node.Consensus.VerifiedNewBlock = make(chan *types.Block, 1) @@ -1108,8 +1150,11 @@ func New( }() } - // update reward values now that node is ready - node.updateInitialRewardValues() + // in tikv mode, not need BeaconChain + if !(node.HarmonyConfig != nil && node.HarmonyConfig.General.RunElasticMode) || node.HarmonyConfig.General.ShardID == shard.BeaconChainShardID { + // update reward values now that node is ready + node.updateInitialRewardValues() + } // init metrics initMetrics() @@ -1273,6 +1318,18 @@ func (node *Node) ShutDown() { node.Blockchain().Stop() node.Beaconchain().Stop() + if node.HarmonyConfig.General.RunElasticMode { + _, _ = node.Blockchain().RedisPreempt().Unlock() + _, _ = node.Beaconchain().RedisPreempt().Unlock() + + _ = redis_helper.Close() + time.Sleep(time.Second) + + if storage := tikv_manage.GetDefaultTiKVFactory(); storage != nil { + storage.CloseAllDB() + } + } + const msg = "Successfully shut down!\n" utils.Logger().Print(msg) fmt.Print(msg) @@ -1365,3 +1422,38 @@ func (node *Node) GetAddresses(epoch *big.Int) map[string]common.Address { func (node *Node) IsRunningBeaconChain() bool { return node.NodeConfig.ShardID == shard.BeaconChainShardID } + +// syncFromTiKVWriter used for tikv mode, subscribe data from tikv writer +func (node *Node) syncFromTiKVWriter() { + bc := node.Blockchain() + + // subscribe block update + go redis_helper.SubscribeShardUpdate(bc.ShardID(), func(blkNum uint64, logs []*types.Log) { + err := bc.SyncFromTiKVWriter(blkNum, logs) + if err != nil { + utils.Logger().Warn(). + Err(err). + Msg("cannot sync block from tikv writer") + return + } + }) + + // subscribe txpool update + if node.HarmonyConfig.TiKV.Role == tikv.RoleReader { + go redis_helper.SubscribeTxPoolUpdate(bc.ShardID(), func(tx types.PoolTransaction, local bool) { + var err error + if local { + err = node.TxPool.AddLocal(tx) + } else { + err = node.TxPool.AddRemote(tx) + } + if err != nil { + utils.Logger().Debug(). + Err(err). + Interface("tx", tx). + Msg("cannot sync txpool from tikv writer") + return + } + }) + } +} diff --git a/node/node_explorer.go b/node/node_explorer.go index 0b263b89a..09993291f 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -5,6 +5,8 @@ import ( "encoding/json" "sync" + "github.com/harmony-one/harmony/internal/tikv" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" msg_pb "github.com/harmony-one/harmony/api/proto/message" @@ -145,6 +147,11 @@ func (node *Node) TraceLoopForExplorer() { // AddNewBlockForExplorer add new block for explorer. func (node *Node) AddNewBlockForExplorer(block *types.Block) { + if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.TiKV.Role == tikv.RoleReader { + node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64()) + return + } + utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node") if _, err := node.Blockchain().InsertChain([]*types.Block{block}, false); err == nil { @@ -153,23 +160,38 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) { } // Clean up the blocks to avoid OOM. node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64()) - // Do dump all blocks from state syncing for explorer one time - // TODO: some blocks can be dumped before state syncing finished. - // And they would be dumped again here. Please fix it. - once.Do(func() { - utils.Logger().Info().Int64("starting height", int64(block.NumberU64())-1). - Msg("[Explorer] Populating explorer data from state synced blocks") - go func() { - exp, err := node.getExplorerService() - if err != nil { - // shall be unreachable - utils.Logger().Fatal().Err(err).Msg("critical error in explorer node") - } - for blockHeight := int64(block.NumberU64()) - 1; blockHeight >= 0; blockHeight-- { - exp.DumpCatchupBlock(node.Blockchain().GetBlockByNumber(uint64(blockHeight))) - } - }() - }) + + // if in tikv mode, only master writer node need dump all explorer block + if !node.HarmonyConfig.General.RunElasticMode || node.Blockchain().IsTikvWriterMaster() { + // Do dump all blocks from state syncing for explorer one time + // TODO: some blocks can be dumped before state syncing finished. + // And they would be dumped again here. Please fix it. + once.Do(func() { + utils.Logger().Info().Int64("starting height", int64(block.NumberU64())-1). + Msg("[Explorer] Populating explorer data from state synced blocks") + go func() { + exp, err := node.getExplorerService() + if err != nil { + // shall be unreachable + utils.Logger().Fatal().Err(err).Msg("critical error in explorer node") + } + + if block.NumberU64() == 0 { + return + } + + // get checkpoint bitmap and flip all bit + bitmap := exp.GetCheckpointBitmap() + bitmap.Flip(0, block.NumberU64()) + + // find all not processed block and dump it + iterator := bitmap.ReverseIterator() + for iterator.HasNext() { + exp.DumpCatchupBlock(node.Blockchain().GetBlockByNumber(iterator.Next())) + } + }() + }) + } } else { utils.Logger().Error().Err(err).Msg("[Explorer] Error when adding new block for explorer node") } @@ -177,17 +199,20 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) { // ExplorerMessageHandler passes received message in node_handler to explorer service. func (node *Node) commitBlockForExplorer(block *types.Block) { - if block.ShardID() != node.NodeConfig.ShardID { - return - } - // Dump new block into level db. - utils.Logger().Info().Uint64("blockNum", block.NumberU64()).Msg("[Explorer] Committing block into explorer DB") - exp, err := node.getExplorerService() - if err != nil { - // shall be unreachable - utils.Logger().Fatal().Err(err).Msg("critical error in explorer node") + // if in tikv mode, only master writer node need dump explorer block + if !node.HarmonyConfig.General.RunElasticMode || (node.HarmonyConfig.TiKV.Role == tikv.RoleWriter && node.Blockchain().IsTikvWriterMaster()) { + if block.ShardID() != node.NodeConfig.ShardID { + return + } + // Dump new block into level db. + utils.Logger().Info().Uint64("blockNum", block.NumberU64()).Msg("[Explorer] Committing block into explorer DB") + exp, err := node.getExplorerService() + if err != nil { + // shall be unreachable + utils.Logger().Fatal().Err(err).Msg("critical error in explorer node") + } + exp.DumpNewBlock(block) } - exp.DumpNewBlock(block) curNum := block.NumberU64() if curNum-100 > 0 { diff --git a/node/node_syncing.go b/node/node_syncing.go index fa16faa04..63631daea 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/harmony-one/harmony/internal/tikv" + prom "github.com/harmony-one/harmony/api/service/prometheus" "github.com/prometheus/client_golang/prometheus" @@ -216,6 +218,10 @@ func (node *Node) doBeaconSyncing() { return } + if node.HarmonyConfig.General.RunElasticMode { + return + } + if !node.NodeConfig.Downloader { // If Downloader is not working, we need also deal with blocks from beaconBlockChannel go func(node *Node) { @@ -319,7 +325,22 @@ func (node *Node) StartGRPCSyncClient() { Msg("SupportBeaconSyncing") go node.doBeaconSyncing() } - node.supportSyncing() +} + +// NodeSyncing makes sure to start all the processes needed to sync the node based on different configuration factors. +func (node *Node) NodeSyncing() { + if node.HarmonyConfig.General.RunElasticMode { + node.syncFromTiKVWriter() // this is for both reader and backup writers + + if node.HarmonyConfig.TiKV.Role == tikv.RoleReader { + node.Consensus.UpdateConsensusInformation() + } + if node.HarmonyConfig.TiKV.Role == tikv.RoleWriter { + node.supportSyncing() // the writer needs to be in sync with it's other peers + } + } else if !node.HarmonyConfig.General.IsOffline && node.HarmonyConfig.DNSSync.Client { + node.supportSyncing() // for non-writer-reader mode a.k.a tikv nodes + } } // supportSyncing keeps sleeping until it's doing consensus or it's a leader. diff --git a/node/service_setup.go b/node/service_setup.go index b11a1e12d..89acaf8fb 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -27,7 +27,7 @@ func (node *Node) RegisterValidatorServices() { func (node *Node) RegisterExplorerServices() { // Register explorer service. node.serviceManager.Register( - service.SupportExplorer, explorer.New(&node.SelfPeer, node.Blockchain(), node), + service.SupportExplorer, explorer.New(node.HarmonyConfig, &node.SelfPeer, node.Blockchain(), node), ) } diff --git a/node/worker/worker_test.go b/node/worker/worker_test.go index 6b9b42151..cbb806bfb 100644 --- a/node/worker/worker_test.go +++ b/node/worker/worker_test.go @@ -5,6 +5,8 @@ import ( "math/rand" "testing" + "github.com/harmony-one/harmony/core/state" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/common" @@ -43,7 +45,7 @@ func TestNewWorker(t *testing.T) { genesis := gspec.MustCommit(database) _ = genesis - chain, err := core.NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) + chain, err := core.NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil) if err != nil { t.Error(err) @@ -70,7 +72,7 @@ func TestCommitTransactions(t *testing.T) { ) gspec.MustCommit(database) - chain, _ := core.NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) + chain, _ := core.NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil) // Create a new worker worker := New(params.TestChainConfig, chain, engine) diff --git a/rosetta/services/call_service.go b/rosetta/services/call_service.go index 6bee731fe..46f528d7b 100644 --- a/rosetta/services/call_service.go +++ b/rosetta/services/call_service.go @@ -85,7 +85,7 @@ func (c *CallAPIService) Call( func NewCallAPIService(hmy *hmy.Harmony, limiterEnable bool, rateLimit int) server.CallAPIServicer { return &CallAPIService{ hmy: hmy, - publicContractAPI: rpc2.NewPublicContractAPI(hmy, rpc2.V2), + publicContractAPI: rpc2.NewPublicContractAPI(hmy, rpc2.V2, limiterEnable, rateLimit), publicStakingAPI: rpc2.NewPublicStakingAPI(hmy, rpc2.V2), publicBlockChainAPI: rpc2.NewPublicBlockchainAPI(hmy, rpc2.V2, limiterEnable, rateLimit), } diff --git a/rpc/contract.go b/rpc/contract.go index 6bf3a393e..337bea7cd 100644 --- a/rpc/contract.go +++ b/rpc/contract.go @@ -35,11 +35,16 @@ type PublicContractService struct { } // NewPublicContractAPI creates a new API for the RPC interface -func NewPublicContractAPI(hmy *hmy.Harmony, version Version) rpc.API { +func NewPublicContractAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API { + var limiter *rate.Limiter + if limiterEnable { + limiter = rate.NewLimiter(rate.Limit(limit), limit) + } + return rpc.API{ Namespace: version.Namespace(), Version: APIVersion, - Service: &PublicContractService{hmy, version, rate.NewLimiter(200, 1500)}, + Service: &PublicContractService{hmy, version, limiter}, Public: true, } } diff --git a/rpc/pool.go b/rpc/pool.go index 2fa0ab58c..b0c83fea7 100644 --- a/rpc/pool.go +++ b/rpc/pool.go @@ -37,11 +37,15 @@ type PublicPoolService struct { } // NewPublicPoolAPI creates a new API for the RPC interface -func NewPublicPoolAPI(hmy *hmy.Harmony, version Version) rpc.API { +func NewPublicPoolAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API { + var limiter *rate.Limiter + if limiterEnable { + limiter = rate.NewLimiter(rate.Limit(limit), limit) + } return rpc.API{ Namespace: version.Namespace(), Version: APIVersion, - Service: &PublicPoolService{hmy, version, rate.NewLimiter(2, 5)}, + Service: &PublicPoolService{hmy, version, limiter}, Public: true, } } diff --git a/rpc/rpc.go b/rpc/rpc.go index e06f87993..2c94f0a5e 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -158,12 +158,12 @@ func getAPIs(hmy *hmy.Harmony, config nodeconfig.RPCServerConfig) []rpc.API { NewPublicHarmonyAPI(hmy, V2), NewPublicBlockchainAPI(hmy, V1, config.RateLimiterEnabled, config.RequestsPerSecond), NewPublicBlockchainAPI(hmy, V2, config.RateLimiterEnabled, config.RequestsPerSecond), - NewPublicContractAPI(hmy, V1), - NewPublicContractAPI(hmy, V2), + NewPublicContractAPI(hmy, V1, config.RateLimiterEnabled, config.RequestsPerSecond), + NewPublicContractAPI(hmy, V2, config.RateLimiterEnabled, config.RequestsPerSecond), NewPublicTransactionAPI(hmy, V1), NewPublicTransactionAPI(hmy, V2), - NewPublicPoolAPI(hmy, V1), - NewPublicPoolAPI(hmy, V2), + NewPublicPoolAPI(hmy, V1, config.RateLimiterEnabled, config.RequestsPerSecond), + NewPublicPoolAPI(hmy, V2, config.RateLimiterEnabled, config.RequestsPerSecond), } // Legacy methods (subject to removal) @@ -185,9 +185,9 @@ func getAPIs(hmy *hmy.Harmony, config nodeconfig.RPCServerConfig) []rpc.API { publicAPIs = append(publicAPIs, NewPublicHarmonyAPI(hmy, Eth), NewPublicBlockchainAPI(hmy, Eth, config.RateLimiterEnabled, config.RequestsPerSecond), - NewPublicContractAPI(hmy, Eth), + NewPublicContractAPI(hmy, Eth, config.RateLimiterEnabled, config.RequestsPerSecond), NewPublicTransactionAPI(hmy, Eth), - NewPublicPoolAPI(hmy, Eth), + NewPublicPoolAPI(hmy, Eth, config.RateLimiterEnabled, config.RequestsPerSecond), eth.NewPublicEthService(hmy, "eth"), ) } diff --git a/test/chain/main.go b/test/chain/main.go index b1851645b..7de6d9c98 100644 --- a/test/chain/main.go +++ b/test/chain/main.go @@ -15,6 +15,7 @@ import ( blockfactory "github.com/harmony-one/harmony/block/factory" "github.com/harmony-one/harmony/core" core_state "github.com/harmony-one/harmony/core/state" + harmonyState "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/crypto/hash" @@ -205,7 +206,7 @@ func playFaucetContract(chain core.BlockChain) { func main() { genesis := gspec.MustCommit(database) - chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil) + chain, _ := core.NewBlockChain(database, harmonyState.NewDatabase(database), nil, gspec.Config, chain.Engine(), vm.Config{}, nil) txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain, types.NewTransactionErrorSink()) backend := &testWorkerBackend{ diff --git a/test/chain/reward/main.go b/test/chain/reward/main.go index 6d78f4dd8..8ed6726f3 100644 --- a/test/chain/reward/main.go +++ b/test/chain/reward/main.go @@ -109,7 +109,7 @@ func main() { genesis := gspec.MustCommit(database) _ = genesis engine := chain.NewEngine() - bc, _ := core.NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) + bc, _ := core.NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil) statedb, _ := state.New(common2.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase())) msg := createValidator() statedb.AddBalance(msg.ValidatorAddress, new(big.Int).Mul(big.NewInt(5e18), big.NewInt(2000)))