tested tikv code

pull/4149/head
lutty 3 years ago committed by Lutty
parent 9df446a23d
commit 720f9a6a02
  1. 101
      api/service/explorer/interface.go
  2. 2
      api/service/explorer/interface_test.go
  3. 5
      api/service/explorer/migration_test.go
  4. 39
      api/service/explorer/schema.go
  5. 29
      api/service/explorer/service.go
  6. 82
      api/service/explorer/storage.go
  7. 13
      api/service/prometheus/service.go
  8. 42
      cmd/harmony/main.go
  9. 12
      core/blockchain.go
  10. 239
      core/blockchain_impl.go
  11. 2
      core/evm_test.go
  12. 167
      core/state/prefeth.go
  13. 80
      core/state/tikv_clean.go
  14. 11
      core/tx_pool.go
  15. 2
      core/tx_pool_test.go
  16. 12
      go.mod
  17. 18
      hmy/blockchain.go
  18. 13
      internal/configs/harmony/harmony.go
  19. 126
      internal/shardchain/dbfactory_tikv.go
  20. 57
      internal/shardchain/shardchains.go
  21. 28
      internal/shardchain/tikv_manage/factory_manage.go
  22. 73
      internal/tikv/byte_alloc/alloc.go
  23. 15
      internal/tikv/byte_alloc/defailt.go
  24. 10
      internal/tikv/common/errors.go
  25. 20
      internal/tikv/common/hack.go
  26. 41
      internal/tikv/common/tikv_store.go
  27. 6
      internal/tikv/consts.go
  28. 42
      internal/tikv/prefix/prefix_batch.go
  29. 35
      internal/tikv/prefix/prefix_batch_replay.go
  30. 109
      internal/tikv/prefix/prefix_database.go
  31. 53
      internal/tikv/prefix/prefix_iterator.go
  32. 34
      internal/tikv/redis_helper/common.go
  33. 72
      internal/tikv/redis_helper/lock.go
  34. 130
      internal/tikv/redis_helper/pubsub.go
  35. 119
      internal/tikv/remote/remote_batch.go
  36. 139
      internal/tikv/remote/remote_database.go
  37. 110
      internal/tikv/remote/remote_iterator.go
  38. 36
      internal/tikv/remote/remote_nop_batch.go
  39. 45
      internal/tikv/statedb_cache/statedb_cache_batch.go
  40. 373
      internal/tikv/statedb_cache/statedb_cache_database.go
  41. 38
      internal/utils/bytes.go
  42. 116
      node/node.go
  43. 79
      node/node_explorer.go
  44. 23
      node/node_syncing.go
  45. 2
      node/service_setup.go
  46. 6
      node/worker/worker_test.go
  47. 2
      rosetta/services/call_service.go
  48. 9
      rpc/contract.go
  49. 8
      rpc/pool.go
  50. 12
      rpc/rpc.go
  51. 3
      test/chain/main.go
  52. 2
      test/chain/reward/main.go

@ -1,11 +1,14 @@
package explorer package explorer
import ( import (
"path"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/syndtr/goleveldb/leveldb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb/filter" "github.com/ethereum/go-ethereum/ethdb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt" tikvCommon "github.com/harmony-one/harmony/internal/tikv/common"
levelutil "github.com/syndtr/goleveldb/leveldb/util" "github.com/harmony-one/harmony/internal/tikv/prefix"
"github.com/harmony-one/harmony/internal/tikv/remote"
) )
// database is an adapter for *leveldb.DB // database is an adapter for *leveldb.DB
@ -32,54 +35,56 @@ type batch interface {
ValueSize() int ValueSize() int
} }
// lvlDB is the adapter for leveldb.Database // explorerDB is the adapter for explorer node
type lvlDB struct { type explorerDB struct {
db *leveldb.DB db ethdb.KeyValueStore
} }
func newLvlDB(dbPath string) (database, error) { // newExplorerLvlDB new explorer storage using leveldb
// https://github.com/ethereum/go-ethereum/blob/master/ethdb/leveldb/leveldb.go#L98 options. func newExplorerLvlDB(dbPath string) (database, error) {
// We had 0 for handles and cache params before, so set 0s for all of them. Filter opt is the same. db, err := leveldb.New(dbPath, 16, 500, "explorer_db")
options := &opt.Options{ if err != nil {
OpenFilesCacheCapacity: 500, return nil, err
BlockCacheCapacity: 8 * 1024 * 1024, // 8 MiB
WriteBuffer: 4 * 1024 * 1024, // 4 MiB
Filter: filter.NewBloomFilter(10),
} }
db, err := leveldb.OpenFile(dbPath, options) return &explorerDB{db}, nil
}
// newExplorerTiKv new explorer storage using leveldb
func newExplorerTiKv(pdAddr []string, dbPath string, readOnly bool) (database, error) {
prefixStr := append([]byte(path.Base(dbPath)), '/')
db, err := remote.NewRemoteDatabase(pdAddr, readOnly)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &lvlDB{db}, nil return &explorerDB{
db: tikvCommon.ToEthKeyValueStore(
prefix.NewPrefixDatabase(prefixStr, db),
),
}, nil
} }
func (db *lvlDB) Put(key, val []byte) error { func (db *explorerDB) Put(key, val []byte) error {
return db.db.Put(key, val, nil) return db.db.Put(key, val)
} }
func (db *lvlDB) Get(key []byte) ([]byte, error) { func (db *explorerDB) Get(key []byte) ([]byte, error) {
return db.db.Get(key, nil) return db.db.Get(key)
} }
func (db *lvlDB) Has(key []byte) (bool, error) { func (db *explorerDB) Has(key []byte) (bool, error) {
return db.db.Has(key, nil) return db.db.Has(key)
} }
func (db *lvlDB) NewBatch() batch { func (db *explorerDB) NewBatch() batch {
batch := new(leveldb.Batch) return db.db.NewBatch()
return &lvlBatch{
batch: batch,
db: db.db,
}
} }
func (db *lvlDB) NewPrefixIterator(prefix []byte) iterator { func (db *explorerDB) NewPrefixIterator(prefix []byte) iterator {
rng := levelutil.BytesPrefix(prefix) it := db.db.NewIteratorWithPrefix(prefix)
it := db.db.NewIterator(rng, nil)
return it return it
} }
func (db *lvlDB) NewSizedIterator(start []byte, size int) iterator { func (db *explorerDB) NewSizedIterator(start []byte, size int) iterator {
return db.newSizedIterator(start, size) return db.newSizedIterator(start, size)
} }
@ -89,9 +94,8 @@ type sizedIterator struct {
sizeLimit int sizeLimit int
} }
func (db *lvlDB) newSizedIterator(start []byte, size int) *sizedIterator { func (db *explorerDB) newSizedIterator(start []byte, size int) *sizedIterator {
rng := &levelutil.Range{Start: start, Limit: nil} it := db.db.NewIteratorWithStart(start)
it := db.db.NewIterator(rng, nil)
return &sizedIterator{ return &sizedIterator{
it: it, it: it,
curIndex: 0, curIndex: 0,
@ -112,31 +116,6 @@ func (it *sizedIterator) Value() []byte { return it.it.Value() }
func (it *sizedIterator) Release() { it.it.Release() } func (it *sizedIterator) Release() { it.it.Release() }
func (it *sizedIterator) Error() error { return it.it.Error() } func (it *sizedIterator) Error() error { return it.it.Error() }
// Note: lvlBatch is not thread safe
type lvlBatch struct {
batch *leveldb.Batch
db *leveldb.DB
valueSize int
}
func (b *lvlBatch) Put(key, val []byte) error {
b.batch.Put(key, val)
b.valueSize += len(val)
return nil
}
func (b *lvlBatch) Write() error {
if err := b.db.Write(b.batch, nil); err != nil {
return err
}
b.valueSize = 0
return nil
}
func (b *lvlBatch) ValueSize() int {
return b.valueSize
}
type iterator interface { type iterator interface {
Next() bool Next() bool
Key() []byte Key() []byte

@ -73,7 +73,7 @@ func TestLevelDBPrefixIterator(t *testing.T) {
func newTestLevelDB(t *testing.T, i int) database { func newTestLevelDB(t *testing.T, i int) database {
dbDir := tempTestDir(t, i) dbDir := tempTestDir(t, i)
db, err := newLvlDB(dbDir) db, err := newExplorerLvlDB(dbDir)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -40,11 +40,6 @@ type migrationDBFactory struct {
func (f *migrationDBFactory) makeDB(numAddr int) database { func (f *migrationDBFactory) makeDB(numAddr int) database {
db := newTestLevelDB(f.t, 0) db := newTestLevelDB(f.t, 0)
for i := 0; i != 10000; i++ {
if err := writeCheckpoint(db, uint64(i)); err != nil {
f.t.Fatal(err)
}
}
for i := 0; i != numAddr; i++ { for i := 0; i != numAddr; i++ {
addr := f.newAddress() addr := f.newAddress()
addrInfo := &Address{ID: string(addr)} addrInfo := &Address{ID: string(addr)}

@ -3,8 +3,8 @@ package explorer
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"math/big"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
goversion "github.com/hashicorp/go-version" goversion "github.com/hashicorp/go-version"
@ -15,26 +15,39 @@ import (
const ( const (
LegAddressPrefix = "ad_" LegAddressPrefix = "ad_"
CheckpointPrefix = "dc" CheckpointBitmap = "checkpoint_bitmap"
TracePrefix = "tr_" TracePrefix = "tr_"
oneAddrByteLen = 42 // byte size of string "one1..." oneAddrByteLen = 42 // byte size of string "one1..."
) )
// Common schema // readCheckpointBitmap read explorer checkpoint bitmap from storage
// GetCheckpointKey ... func readCheckpointBitmap(db databaseReader) (*roaring64.Bitmap, error) {
func GetCheckpointKey(blockNum *big.Int) []byte { bitmapByte, err := db.Get([]byte(CheckpointBitmap))
return []byte(fmt.Sprintf("%s_%x", CheckpointPrefix, blockNum)) if err != nil {
} if err == leveldb.ErrNotFound {
return roaring64.NewBitmap(), nil
}
return nil, err
}
func isBlockComputedInDB(db databaseReader, bn uint64) (bool, error) { rb := roaring64.NewBitmap()
key := GetCheckpointKey(new(big.Int).SetUint64(bn)) err = rb.UnmarshalBinary(bitmapByte)
return db.Has(key) if err != nil {
return nil, err
}
return rb, nil
} }
func writeCheckpoint(db databaseWriter, bn uint64) error { // writeCheckpointBitmap write explorer checkpoint bitmap to storage
blockCheckpoint := GetCheckpointKey(new(big.Int).SetUint64(bn)) func writeCheckpointBitmap(db databaseWriter, rb *roaring64.Bitmap) error {
return db.Put(blockCheckpoint, []byte{}) bitmapByte, err := rb.MarshalBinary()
if err != nil {
return err
}
return db.Put([]byte(CheckpointBitmap), bitmapByte)
} }
func getTraceResultKey(key []byte) []byte { func getTraceResultKey(key []byte) []byte {

@ -11,6 +11,9 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/RoaringBitmap/roaring/roaring64"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
ethCommon "github.com/ethereum/go-ethereum/common" ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/gorilla/mux" "github.com/gorilla/mux"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
@ -42,20 +45,21 @@ type HTTPError struct {
// Service is the struct for explorer service. // Service is the struct for explorer service.
type Service struct { type Service struct {
router *mux.Router router *mux.Router
IP string IP string
Port string Port string
storage *storage storage *storage
server *http.Server server *http.Server
messageChan chan *msg_pb.Message messageChan chan *msg_pb.Message
blockchain core.BlockChain blockchain core.BlockChain
backend hmy.NodeAPI backend hmy.NodeAPI
harmonyConfig *harmonyconfig.HarmonyConfig
} }
// New returns explorer service. // New returns explorer service.
func New(selfPeer *p2p.Peer, bc core.BlockChain, backend hmy.NodeAPI) *Service { func New(harmonyConfig *harmonyconfig.HarmonyConfig, selfPeer *p2p.Peer, bc core.BlockChain, backend hmy.NodeAPI) *Service {
dbPath := defaultDBPath(selfPeer.IP, selfPeer.Port) dbPath := defaultDBPath(selfPeer.IP, selfPeer.Port)
storage, err := newStorage(bc, dbPath) storage, err := newStorage(harmonyConfig, bc, dbPath)
if err != nil { if err != nil {
utils.Logger().Fatal().Err(err).Msg("cannot open explorer DB") utils.Logger().Fatal().Err(err).Msg("cannot open explorer DB")
} }
@ -335,6 +339,11 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan s.messageChan = messageChan
} }
// GetCheckpointBitmap get explorer checkpoint bitmap
func (s *Service) GetCheckpointBitmap() *roaring64.Bitmap {
return s.storage.rb.Clone()
}
func defaultDBPath(ip, port string) string { func defaultDBPath(ip, port string) string {
return path.Join(nodeconfig.GetDefaultConfig().DBDir, "explorer_storage_"+ip+"_"+port) return path.Join(nodeconfig.GetDefaultConfig().DBDir, "explorer_storage_"+ip+"_"+port)
} }

@ -9,6 +9,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/RoaringBitmap/roaring/roaring64"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/abool" "github.com/harmony-one/abool"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
@ -22,7 +26,8 @@ import (
) )
const ( const (
numWorker = 8 numWorker = 8
changedSaveCount = 100
) )
// ErrExplorerNotReady is the error when querying explorer db data when // ErrExplorerNotReady is the error when querying explorer db data when
@ -33,6 +38,7 @@ type (
storage struct { storage struct {
db database db database
bc core.BlockChain bc core.BlockChain
rb *roaring64.Bitmap
// TODO: optimize this with priority queue // TODO: optimize this with priority queue
tm *taskManager tm *taskManager
@ -55,17 +61,39 @@ type (
} }
) )
func newStorage(bc core.BlockChain, dbPath string) (*storage, error) { func newExplorerDB(hc *harmonyconfig.HarmonyConfig, dbPath string) (database, error) {
utils.Logger().Info().Msg("explorer storage folder: " + dbPath) if hc.General.RunElasticMode {
db, err := newLvlDB(dbPath) // init the storage using tikv
dbPath = fmt.Sprintf("explorer_tikv_%d", hc.General.ShardID)
readOnly := hc.TiKV.Role == tikv.RoleReader
utils.Logger().Info().Msg("explorer storage in tikv: " + dbPath)
return newExplorerTiKv(hc.TiKV.PDAddr, dbPath, readOnly)
} else {
// or leveldb
utils.Logger().Info().Msg("explorer storage folder: " + dbPath)
return newExplorerLvlDB(dbPath)
}
}
func newStorage(hc *harmonyconfig.HarmonyConfig, bc core.BlockChain, dbPath string) (*storage, error) {
db, err := newExplorerDB(hc, dbPath)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create new explorer database")
return nil, err
}
// load checkpoint roaring bitmap from storage
// roaring bitmap is a very high compression bitmap, in our scene, 1 million blocks use almost 1kb storage
bitmap, err := readCheckpointBitmap(db)
if err != nil { if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create new database")
return nil, err return nil, err
} }
return &storage{ return &storage{
db: db, db: db,
bc: bc, bc: bc,
tm: newTaskManager(), rb: bitmap,
tm: newTaskManager(bitmap),
resultC: make(chan blockResult, numWorker), resultC: make(chan blockResult, numWorker),
resultT: make(chan *traceResult, numWorker), resultT: make(chan *traceResult, numWorker),
available: abool.New(), available: abool.New(),
@ -79,6 +107,7 @@ func (s *storage) Start() {
} }
func (s *storage) Close() { func (s *storage) Close() {
_ = writeCheckpointBitmap(s.db, s.rb)
close(s.closeC) close(s.closeC)
} }
@ -182,14 +211,18 @@ type taskManager struct {
blocksLP []*types.Block // blocks with low priorities blocksLP []*types.Block // blocks with low priorities
lock sync.Mutex lock sync.Mutex
rb *roaring64.Bitmap
rbChangedCount int
C chan struct{} C chan struct{}
T chan *traceResult T chan *traceResult
} }
func newTaskManager() *taskManager { func newTaskManager(bitmap *roaring64.Bitmap) *taskManager {
return &taskManager{ return &taskManager{
C: make(chan struct{}, numWorker), rb: bitmap,
T: make(chan *traceResult, numWorker), C: make(chan struct{}, numWorker),
T: make(chan *traceResult, numWorker),
} }
} }
@ -245,6 +278,30 @@ func (tm *taskManager) PullTask() *types.Block {
return nil return nil
} }
// markBlockDone mark block processed done when explorer computed one block
func (tm *taskManager) markBlockDone(btc batch, blockNum uint64) {
tm.lock.Lock()
defer tm.lock.Unlock()
if tm.rb.CheckedAdd(blockNum) {
tm.rbChangedCount++
// every 100 change write once
if tm.rbChangedCount == changedSaveCount {
tm.rbChangedCount = 0
_ = writeCheckpointBitmap(btc, tm.rb)
}
}
}
// markBlockDone check block is processed done
func (tm *taskManager) hasBlockDone(blockNum uint64) bool {
tm.lock.Lock()
defer tm.lock.Unlock()
return tm.rb.Contains(blockNum)
}
func (s *storage) makeWorkersAndStart() { func (s *storage) makeWorkersAndStart() {
workers := make([]*blockComputer, 0, numWorker) workers := make([]*blockComputer, 0, numWorker)
for i := 0; i != numWorker; i++ { for i := 0; i != numWorker; i++ {
@ -321,9 +378,8 @@ LOOP:
} }
func (bc *blockComputer) computeBlock(b *types.Block) (*blockResult, error) { func (bc *blockComputer) computeBlock(b *types.Block) (*blockResult, error) {
is, err := isBlockComputedInDB(bc.db, b.NumberU64()) if bc.tm.hasBlockDone(b.NumberU64()) {
if is || err != nil { return nil, nil
return nil, err
} }
btc := bc.db.NewBatch() btc := bc.db.NewBatch()
@ -333,7 +389,7 @@ func (bc *blockComputer) computeBlock(b *types.Block) (*blockResult, error) {
for _, stk := range b.StakingTransactions() { for _, stk := range b.StakingTransactions() {
bc.computeStakingTx(btc, b, stk) bc.computeStakingTx(btc, b, stk)
} }
_ = writeCheckpoint(btc, b.NumberU64()) bc.tm.markBlockDone(btc, b.NumberU64())
return &blockResult{ return &blockResult{
btc: btc, btc: btc,
bn: b.NumberU64(), bn: b.NumberU64(),

@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"runtime/debug" "runtime/debug"
"runtime/pprof" "runtime/pprof"
"strings"
"sync" "sync"
"time" "time"
@ -30,13 +31,18 @@ type Config struct {
Legacy bool // legacy or not, legacy is harmony internal node Legacy bool // legacy or not, legacy is harmony internal node
NodeType string // node type, validator or exlorer node NodeType string // node type, validator or exlorer node
Shard uint32 // shard id, used as job suffix Shard uint32 // shard id, used as job suffix
Instance string //identifier of the instance in prometheus metrics Instance string // identifier of the instance in prometheus metrics
TikvRole string // use for tikv explorer node
} }
func (p Config) String() string { func (p Config) String() string {
return fmt.Sprintf("%v, %v:%v, %v/%v, %v/%v/%v/%v:%v", p.Enabled, p.IP, p.Port, p.EnablePush, p.Gateway, p.Network, p.Legacy, p.NodeType, p.Shard, p.Instance) return fmt.Sprintf("%v, %v:%v, %v/%v, %v/%v/%v/%v:%v", p.Enabled, p.IP, p.Port, p.EnablePush, p.Gateway, p.Network, p.Legacy, p.NodeType, p.Shard, p.Instance)
} }
func (p Config) IsUsedTiKV() bool {
return p.TikvRole != ""
}
// Service provides Prometheus metrics via the /metrics route. This route will // Service provides Prometheus metrics via the /metrics route. This route will
// show all the metrics registered with the Prometheus DefaultRegisterer. // show all the metrics registered with the Prometheus DefaultRegisterer.
type Service struct { type Service struct {
@ -63,8 +69,9 @@ var (
func (s *Service) getJobName() string { func (s *Service) getJobName() string {
var node string var node string
// legacy nodes are harmony nodes: s0,s1,s2,s3 if s.config.IsUsedTiKV() { // tikv node must be explorer node, eg: te_reader0, te_writer0
if s.config.Legacy { node = "te_" + strings.ToLower(s.config.TikvRole)
} else if s.config.Legacy { // legacy nodes are harmony nodes: s0,s1,s2,s3
node = "s" node = "s"
} else { } else {
if s.config.NodeType == "validator" { if s.config.NodeType == "validator" {

@ -15,6 +15,10 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/harmony-one/harmony/internal/tikv/statedb_cache"
"github.com/harmony-one/harmony/api/service/crosslink_sending" "github.com/harmony-one/harmony/api/service/crosslink_sending"
rosetta_common "github.com/harmony-one/harmony/rosetta/common" rosetta_common "github.com/harmony-one/harmony/rosetta/common"
@ -297,6 +301,11 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
os.Exit(1) os.Exit(1)
} }
if hc.General.RunElasticMode && hc.TiKV == nil {
fmt.Fprintf(os.Stderr, "Use TIKV MUST HAS TIKV CONFIG")
os.Exit(1)
}
// Update ethereum compatible chain ids // Update ethereum compatible chain ids
params.UpdateEthChainIDByShard(nodeConfig.ShardID) params.UpdateEthChainIDByShard(nodeConfig.ShardID)
@ -427,6 +436,8 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
currentNode.StartGRPCSyncClient() currentNode.StartGRPCSyncClient()
} }
currentNode.NodeSyncing()
if err := currentNode.StartServices(); err != nil { if err := currentNode.StartServices(); err != nil {
fmt.Fprint(os.Stderr, err.Error()) fmt.Fprint(os.Stderr, err.Error())
os.Exit(-1) os.Exit(-1)
@ -680,7 +691,9 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
// Current node. // Current node.
var chainDBFactory shardchain.DBFactory var chainDBFactory shardchain.DBFactory
if hc.ShardData.EnableShardData { if hc.General.RunElasticMode {
chainDBFactory = setupTiKV(hc)
} else if hc.ShardData.EnableShardData {
chainDBFactory = &shardchain.LDBShardFactory{ chainDBFactory = &shardchain.LDBShardFactory{
RootDir: nodeConfig.DBDir, RootDir: nodeConfig.DBDir,
DiskCount: hc.ShardData.DiskCount, DiskCount: hc.ShardData.DiskCount,
@ -754,6 +767,28 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
return currentNode return currentNode
} }
func setupTiKV(hc harmonyconfig.HarmonyConfig) shardchain.DBFactory {
err := redis_helper.Init(hc.TiKV.StateDBRedisServerAddr)
if err != nil {
panic("can not connect to redis: " + err.Error())
}
factory := &shardchain.TiKvFactory{
PDAddr: hc.TiKV.PDAddr,
Role: hc.TiKV.Role,
CacheConfig: statedb_cache.StateDBCacheConfig{
CacheSizeInMB: hc.TiKV.StateDBCacheSizeInMB,
CachePersistencePath: hc.TiKV.StateDBCachePersistencePath,
RedisServerAddr: hc.TiKV.StateDBRedisServerAddr,
RedisLRUTimeInDay: hc.TiKV.StateDBRedisLRUTimeInDay,
DebugHitRate: hc.TiKV.Debug,
},
}
tikv_manage.SetDefaultTiKVFactory(factory)
return factory
}
func processNodeType(hc harmonyconfig.HarmonyConfig, currentNode *node.Node, currentConsensus *consensus.Consensus) { func processNodeType(hc harmonyconfig.HarmonyConfig, currentNode *node.Node, currentConsensus *consensus.Consensus) {
switch hc.General.NodeType { switch hc.General.NodeType {
case nodeTypeExplorer: case nodeTypeExplorer:
@ -796,6 +831,11 @@ func setupPrometheusService(node *node.Node, hc harmonyconfig.HarmonyConfig, sid
Shard: sid, Shard: sid,
Instance: myHost.GetID().Pretty(), Instance: myHost.GetID().Pretty(),
} }
if hc.General.RunElasticMode {
prometheusConfig.TikvRole = hc.TiKV.Role
}
p := prometheus.NewService(prometheusConfig) p := prometheus.NewService(prometheusConfig)
node.RegisterService(service.Prometheus, p) node.RegisterService(service.Prometheus, p)
} }

@ -1,6 +1,7 @@
package core package core
import ( import (
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -331,4 +332,15 @@ type BlockChain interface {
payout reward.Reader, payout reward.Reader,
state *state.DB, state *state.DB,
) (status WriteStatus, err error) ) (status WriteStatus, err error)
// ========== Only For Tikv Start ==========
// return true if is tikv writer master
IsTikvWriterMaster() bool
// RedisPreempt used for tikv mode, get the redis preempt instance
RedisPreempt() *redis_helper.RedisPreempt
// SyncFromTiKVWriter used for tikv mode, all reader or follower writer used to sync block from master writer
SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) error
// ========== Only For Tikv End ==========
} }

@ -22,13 +22,19 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"math/big" "math/big"
"os" "os"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/common/prque"
@ -95,6 +101,7 @@ const (
maxTimeFutureBlocks = 30 maxTimeFutureBlocks = 30
badBlockLimit = 10 badBlockLimit = 10
triesInMemory = 128 triesInMemory = 128
triesInRedis = 1000
shardCacheLimit = 10 shardCacheLimit = 10
commitsCacheLimit = 10 commitsCacheLimit = 10
epochCacheLimit = 10 epochCacheLimit = 10
@ -129,6 +136,14 @@ type BlockChainImpl struct {
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping gcproc time.Duration // Accumulates canonical block processing for trie dumping
// The following two variables are used to clean up the cache of redis in tikv mode.
// This can improve the cache hit rate of redis
latestCleanCacheNum uint64 // The most recently cleaned cache of block num
cleanCacheChan chan uint64 // Used to notify blocks that will be cleaned up
// redisPreempt used in tikv mode, write nodes preempt for write permissions and publish updates to reader nodes
redisPreempt *redis_helper.RedisPreempt
hc *HeaderChain hc *HeaderChain
trace bool // atomic? trace bool // atomic?
traceFeed event.Feed // send trace_block result to explorer traceFeed event.Feed // send trace_block result to explorer
@ -196,7 +211,7 @@ func NewBlockChainWithOptions(
// available in the database. It initialises the default Ethereum validator and // available in the database. It initialises the default Ethereum validator and
// Processor. // Processor.
func NewBlockChain( func NewBlockChain(
db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, db ethdb.Database, stateCache state.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig,
engine consensus_engine.Engine, vmConfig vm.Config, engine consensus_engine.Engine, vmConfig vm.Config,
shouldPreserve func(block *types.Block) bool, shouldPreserve func(block *types.Block) bool,
) (*BlockChainImpl, error) { ) (*BlockChainImpl, error) {
@ -235,7 +250,7 @@ func newBlockChainWithOptions(
cacheConfig: cacheConfig, cacheConfig: cacheConfig,
db: db, db: db,
triegc: prque.New(nil), triegc: prque.New(nil),
stateCache: state.NewDatabase(db), stateCache: stateCache,
quit: make(chan struct{}), quit: make(chan struct{}),
shouldPreserve: shouldPreserve, shouldPreserve: shouldPreserve,
bodyCache: bodyCache, bodyCache: bodyCache,
@ -355,6 +370,7 @@ func (bc *BlockChainImpl) loadLastState() error {
} }
} }
// Everything seems to be fine, set as the head block // Everything seems to be fine, set as the head block
bc.latestCleanCacheNum = currentBlock.NumberU64() - triesInRedis
bc.currentBlock.Store(currentBlock) bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64())) headBlockGauge.Update(int64(currentBlock.NumberU64()))
@ -689,6 +705,30 @@ func (bc *BlockChainImpl) writeHeadBlock(block *types.Block) error {
return nil return nil
} }
// tikvFastForward writes a new head block in tikv mode, used for reader node or follower writer node
func (bc *BlockChainImpl) tikvFastForward(block *types.Block, logs []*types.Log) error {
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
if err := bc.hc.SetCurrentHeader(block.Header()); err != nil {
return errors.Wrap(err, "HeaderChain SetCurrentHeader")
}
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
var events []interface{}
events = append(events, ChainEvent{block, block.Hash(), logs})
events = append(events, ChainHeadEvent{block})
if block.NumberU64() > triesInRedis {
bc.latestCleanCacheNum = block.NumberU64() - triesInRedis
}
bc.PostChainEvents(events, logs)
return nil
}
// insert injects a new head block into the current block chain. This method // insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head // assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block if they are older // header and the head fast sync block to this very same block if they are older
@ -846,6 +886,10 @@ func (bc *BlockChainImpl) TrieNode(hash common.Hash) ([]byte, error) {
} }
func (bc *BlockChainImpl) Stop() { func (bc *BlockChainImpl) Stop() {
if bc == nil {
return
}
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return return
} }
@ -1184,6 +1228,14 @@ func (bc *BlockChainImpl) WriteBlockWithState(
} }
return NonStatTy, err return NonStatTy, err
} }
// clean block tire info in redis, used for tikv mode
if block.NumberU64() > triesInRedis {
select {
case bc.cleanCacheChan <- block.NumberU64() - triesInRedis:
default:
}
}
} else { } else {
// Full but not archive node, do proper garbage collection // Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
@ -1300,8 +1352,17 @@ func (bc *BlockChainImpl) GetMaxGarbageCollectedBlockNumber() int64 {
} }
func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) { func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (int, error) {
// if in tikv mode, writer node need preempt master or come be a follower
if bc.isInitTiKV() && !bc.tikvPreemptMaster(bc.rangeBlock(chain)) {
return len(chain), nil
}
n, events, logs, err := bc.insertChain(chain, verifyHeaders) n, events, logs, err := bc.insertChain(chain, verifyHeaders)
bc.PostChainEvents(events, logs) bc.PostChainEvents(events, logs)
if bc.isInitTiKV() && err != nil {
// if has some error, master writer node will release the permission
_, _ = bc.redisPreempt.Unlock()
}
return n, err return n, err
} }
@ -1542,6 +1603,14 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
events = append(events, ChainEvent{block, block.Hash(), logs}) events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block lastCanon = block
// used for tikv mode, writer node will publish update to all reader node
if bc.isInitTiKV() {
err = redis_helper.PublishShardUpdate(bc.ShardID(), block.NumberU64(), logs)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis publish shard update error")
}
}
// Only count canonical blocks for GC processing time // Only count canonical blocks for GC processing time
bc.gcproc += proctime bc.gcproc += proctime
} }
@ -3028,6 +3097,172 @@ func (bc *BlockChainImpl) IsEnablePruneBeaconChainFeature() bool {
return bc.pruneBeaconChainEnable return bc.pruneBeaconChainEnable
} }
// SyncFromTiKVWriter used for tikv mode, all reader or follower writer used to sync block from master writer
func (bc *BlockChainImpl) SyncFromTiKVWriter(newBlkNum uint64, logs []*types.Log) error {
head := rawdb.ReadHeadBlockHash(bc.db)
dbBlock := bc.GetBlockByHash(head)
currentBlock := bc.CurrentBlock()
if dbBlock == nil || currentBlock == nil {
return nil
}
currentBlockNum := currentBlock.NumberU64()
if currentBlockNum < newBlkNum {
start := time.Now()
for i := currentBlockNum; i <= newBlkNum; i++ {
blk := bc.GetBlockByNumber(i)
if blk == nil {
// cluster synchronization may be in progress
utils.Logger().Warn().
Uint64("currentBlockNum", i).
Msg("[tikv sync] sync from writer got block nil, cluster synchronization may be in progress")
return nil
}
err := bc.tikvFastForward(blk, logs)
if err != nil {
return err
}
}
utils.Logger().Info().
Uint64("currentBlockNum", currentBlockNum).
Dur("usedTime", time.Now().Sub(start)).
Msg("[tikv sync] sync from writer")
}
return nil
}
// tikvCleanCache used for tikv mode, clean block tire data from redis
func (bc *BlockChainImpl) tikvCleanCache() {
var count int
for to := range bc.cleanCacheChan {
for i := bc.latestCleanCacheNum + 1; i <= to; i++ {
// build previous block statedb
fromBlock := bc.GetBlockByNumber(i)
fromTrie, err := state.New(fromBlock.Root(), bc.stateCache)
if err != nil {
continue
}
// build current block statedb
toBlock := bc.GetBlockByNumber(i + 1)
toTrie, err := state.New(toBlock.Root(), bc.stateCache)
if err != nil {
continue
}
// diff two statedb and delete redis cache
start := time.Now()
count, err = fromTrie.DiffAndCleanCache(bc.ShardID(), toTrie)
if err != nil {
utils.Logger().Warn().
Err(err).
Msg("[tikv clean cache] error")
break
}
utils.Logger().Info().
Uint64("blockNum", i).
Int("removeCacheEntriesCount", count).
Dur("usedTime", time.Now().Sub(start)).
Msg("[tikv clean cache] success")
bc.latestCleanCacheNum = i
}
}
}
func (bc *BlockChainImpl) isInitTiKV() bool {
return bc.redisPreempt != nil
}
// tikvPreemptMaster used for tikv mode, writer node need preempt master or come be a follower
func (bc *BlockChainImpl) tikvPreemptMaster(fromBlock, toBlock uint64) bool {
for {
// preempt master
if ok, _ := bc.redisPreempt.TryLock(60); ok {
return true
}
// follower
if bc.CurrentBlock().NumberU64() >= toBlock {
return false
}
time.Sleep(time.Second)
}
}
// rangeBlock get the block range of blocks
func (bc *BlockChainImpl) rangeBlock(blocks types.Blocks) (uint64, uint64) {
if len(blocks) == 0 {
return 0, 0
}
max := blocks[0].NumberU64()
min := max
for _, tmpBlock := range blocks {
if tmpBlock.NumberU64() > max {
max = tmpBlock.NumberU64()
} else if tmpBlock.NumberU64() < min {
min = tmpBlock.NumberU64()
}
}
return min, max
}
// RedisPreempt used for tikv mode, get the redis preempt instance
func (bc *BlockChainImpl) RedisPreempt() *redis_helper.RedisPreempt {
if bc == nil {
return nil
}
return bc.redisPreempt
}
func (bc *BlockChainImpl) IsTikvWriterMaster() bool {
if bc == nil || bc.redisPreempt == nil {
return false
}
return bc.redisPreempt.LastLockStatus()
}
// InitTiKV used for tikv mode, init the tikv mode
func (bc *BlockChainImpl) InitTiKV(conf *harmonyconfig.TiKVConfig) {
bc.cleanCacheChan = make(chan uint64, 10)
if conf.Role == tikv.RoleWriter {
// only writer need preempt permission
bc.redisPreempt = redis_helper.CreatePreempt(fmt.Sprintf("shard_%d_preempt", bc.ShardID()))
}
if conf.Debug {
// used for init redis cache
// If redis is empty, the hit rate will be too low and the synchronization block speed will be slow
// set LOAD_PRE_FETCH is yes can significantly improve this.
if os.Getenv("LOAD_PRE_FETCH") == "yes" {
if trie, err := state.New(bc.CurrentBlock().Root(), bc.stateCache); err == nil {
trie.Prefetch(512)
} else {
log.Println("LOAD_PRE_FETCH ERR: ", err)
}
}
// If redis is empty, there is no need to clear the cache of nearly 1000 blocks
// set CLEAN_INIT is the latest block num can skip slow and unneeded cleanup process
if os.Getenv("CLEAN_INIT") != "" {
from, err := strconv.Atoi(os.Getenv("CLEAN_INIT"))
if err == nil {
bc.latestCleanCacheNum = uint64(from)
}
}
}
// start clean block tire data process
go bc.tikvCleanCache()
}
var ( var (
leveldbErrSpec = "leveldb" leveldbErrSpec = "leveldb"
tooManyOpenFilesErrStr = "Too many open files" tooManyOpenFilesErrStr = "Too many open files"

@ -46,7 +46,7 @@ func getTestEnvironment(testBankKey ecdsa.PrivateKey) (*BlockChainImpl, *state.D
genesis := gspec.MustCommit(database) genesis := gspec.MustCommit(database)
// fake blockchain // fake blockchain
chain, _ := NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) chain, _ := NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil)
db, _ := chain.StateAt(genesis.Root()) db, _ := chain.StateAt(genesis.Root())
// make a fake block header (use epoch 1 so that locked tokens can be tested) // make a fake block header (use epoch 1 so that locked tokens can be tested)

@ -0,0 +1,167 @@
package state
import (
"bytes"
"log"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/harmony-one/harmony/internal/utils"
)
type prefetchJob struct {
accountAddr []byte
account *Account
start, end []byte
}
// Prefetch If redis is empty, the hit rate will be too low and the synchronization block speed will be slow
// this function will parallel load the latest block statedb to redis
// this function used by debug or first time to init tikv cluster
func (s *DB) Prefetch(parallel int) {
wg := sync.WaitGroup{}
jobChan := make(chan *prefetchJob, 10000)
waitWorker := int64(parallel)
// start parallel workers
for i := 0; i < parallel; i++ {
go func() {
defer wg.Done()
for job := range jobChan {
atomic.AddInt64(&waitWorker, -1)
s.prefetchWorker(job, jobChan)
atomic.AddInt64(&waitWorker, 1)
}
}()
wg.Add(1)
}
// add first jobs
for i := 0; i < 255; i++ {
start := []byte{byte(i)}
end := []byte{byte(i + 1)}
if i == parallel-1 {
end = nil
} else if i == 0 {
start = nil
}
jobChan <- &prefetchJob{
start: start,
end: end,
}
}
// wait all worker done
start := time.Now()
log.Println("Prefetch start")
var sleepCount int
for sleepCount < 3 {
time.Sleep(time.Second)
waitWorkerCount := atomic.LoadInt64(&waitWorker)
if waitWorkerCount >= int64(parallel) {
sleepCount++
} else {
sleepCount = 0
}
}
close(jobChan)
wg.Wait()
end := time.Now()
log.Println("Prefetch end, use", end.Sub(start))
}
// prefetchWorker used to process one job
func (s *DB) prefetchWorker(job *prefetchJob, jobs chan *prefetchJob) {
if job.account == nil {
// scan one account
nodeIterator := s.trie.NodeIterator(job.start)
it := trie.NewIterator(nodeIterator)
for it.Next() {
if job.end != nil && bytes.Compare(it.Key, job.end) >= 0 {
return
}
// build account data from main trie tree
var data Account
if err := rlp.DecodeBytes(it.Value, &data); err != nil {
panic(err)
}
addrBytes := s.trie.GetKey(it.Key)
addr := common.BytesToAddress(addrBytes)
obj := newObject(s, addr, data)
if data.CodeHash != nil {
obj.Code(s.db)
}
// build account trie tree
storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(nil))
storageJob := &prefetchJob{
accountAddr: addrBytes,
account: &data,
}
// fetch data
s.prefetchAccountStorage(jobs, storageJob, storageIt)
}
} else {
// scan main trie tree
obj := newObject(s, common.BytesToAddress(job.accountAddr), *job.account)
storageIt := trie.NewIterator(obj.getTrie(s.db).NodeIterator(job.start))
// fetch data
s.prefetchAccountStorage(jobs, job, storageIt)
}
}
// prefetchAccountStorage used for fetch account storage
func (s *DB) prefetchAccountStorage(jobs chan *prefetchJob, job *prefetchJob, it *trie.Iterator) {
start := time.Now()
count := 0
for it.Next() {
if job.end != nil && bytes.Compare(it.Key, job.end) >= 0 {
return
}
count++
// if fetch one account job used more than 15s, then split the job
if count%10000 == 0 && time.Now().Sub(start) > 15*time.Second {
middle := utils.BytesMiddle(it.Key, job.end)
startJob := &prefetchJob{
accountAddr: job.accountAddr,
account: job.account,
start: it.Key,
end: middle,
}
endJob := &prefetchJob{
accountAddr: job.accountAddr,
account: job.account,
start: middle,
end: job.end,
}
select {
case jobs <- endJob:
select {
case jobs <- startJob:
return
default:
job.end = startJob.end
start = time.Now()
}
default:
log.Println("job full, continue")
}
}
}
}

@ -0,0 +1,80 @@
package state
import (
"bytes"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
)
var secureKeyPrefix = []byte("secure-key-")
// DiffAndCleanCache clean block tire data from redis, Used to reduce redis storage and increase hit rate
func (s *DB) DiffAndCleanCache(shardId uint32, to *DB) (int, error) {
// create difference iterator
it, _ := trie.NewDifferenceIterator(to.trie.NodeIterator(nil), s.trie.NodeIterator(nil))
db, err := tikv_manage.GetDefaultTiKVFactory().NewCacheStateDB(shardId)
if err != nil {
return 0, err
}
batch := db.NewBatch()
wg := &sync.WaitGroup{}
count := uint64(0)
for it.Next(true) {
if !it.Leaf() {
// delete it if trie leaf node
atomic.AddUint64(&count, 1)
_ = batch.Delete(it.Hash().Bytes())
} else {
// build account data
addrBytes := s.trie.GetKey(it.LeafKey())
addr := common.BytesToAddress(addrBytes)
var fromAccount, toAccount Account
if err := rlp.DecodeBytes(it.LeafBlob(), &fromAccount); err != nil {
continue
}
if toByte, err := to.trie.TryGet(addrBytes); err != nil {
continue
} else if err := rlp.DecodeBytes(toByte, &toAccount); err != nil {
continue
}
// if account not changed, skip
if bytes.Compare(fromAccount.Root.Bytes(), toAccount.Root.Bytes()) == 0 {
continue
}
// create account difference iterator
fromAccountTrie := newObject(s, addr, fromAccount).getTrie(s.db)
toAccountTrie := newObject(to, addr, toAccount).getTrie(to.db)
accountIt, _ := trie.NewDifferenceIterator(toAccountTrie.NodeIterator(nil), fromAccountTrie.NodeIterator(nil))
// parallel to delete data
wg.Add(1)
go func() {
defer wg.Done()
for accountIt.Next(true) {
atomic.AddUint64(&count, 1)
if !accountIt.Leaf() {
_ = batch.Delete(accountIt.Hash().Bytes())
} else {
_ = batch.Delete(append(append([]byte{}, secureKeyPrefix...), accountIt.LeafKey()...))
}
}
}()
}
}
wg.Wait()
return int(count), db.ReplayCache(batch)
}

@ -170,6 +170,8 @@ type TxPoolConfig struct {
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
AddEvent func(tx types.PoolTransaction, local bool) // Fire add event
Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction
AllowedTxs map[common.Address]AllowedTxData // Set of allowed transactions can break the blocklist AllowedTxs map[common.Address]AllowedTxData // Set of allowed transactions can break the blocklist
} }
@ -958,7 +960,14 @@ func (pool *TxPool) pendingEpoch() *big.Int {
// If a newly added transaction is marked as local, its sending account will be // If a newly added transaction is marked as local, its sending account will be
// whitelisted, preventing any associated transaction from being dropped out of // whitelisted, preventing any associated transaction from being dropped out of
// the pool due to pricing constraints. // the pool due to pricing constraints.
func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { func (pool *TxPool) add(tx types.PoolTransaction, local bool) (replaced bool, err error) {
defer func() {
if err == nil && pool.config.AddEvent != nil {
// used for tikv mode, writer will publish txpool change to all reader, this makes the state consistent
pool.config.AddEvent(tx, local)
}
}()
logger := utils.Logger().With().Stack().Logger() logger := utils.Logger().With().Stack().Logger()
// If the transaction is in the error sink, remove it as it may succeed // If the transaction is in the error sink, remove it as it may succeed
if pool.txErrorSink.Contains(tx.Hash().String()) { if pool.txErrorSink.Contains(tx.Hash().String()) {

@ -161,7 +161,7 @@ func createBlockChain() *BlockChainImpl {
genesis := gspec.MustCommit(database) genesis := gspec.MustCommit(database)
_ = genesis _ = genesis
engine := chain2.NewEngine() engine := chain2.NewEngine()
blockchain, _ := NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) blockchain, _ := NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil)
return blockchain return blockchain
} }

@ -3,6 +3,8 @@ module github.com/harmony-one/harmony
go 1.18 go 1.18
require ( require (
github.com/RoaringBitmap/roaring v1.1.0
github.com/VictoriaMetrics/fastcache v1.5.7
github.com/Workiva/go-datastructures v1.0.50 github.com/Workiva/go-datastructures v1.0.50
github.com/allegro/bigcache v1.2.1 github.com/allegro/bigcache v1.2.1
github.com/aws/aws-sdk-go v1.30.1 github.com/aws/aws-sdk-go v1.30.1
@ -13,6 +15,8 @@ require (
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.7.1 github.com/deckarep/golang-set v1.7.1
github.com/ethereum/go-ethereum v1.9.25 github.com/ethereum/go-ethereum v1.9.25
github.com/fjl/memsize v0.0.0-20180929194037-2a09253e352a // indirect
github.com/go-redis/redis/v8 v8.11.5
github.com/golang/mock v1.6.0 github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.5.2
github.com/golangci/golangci-lint v1.22.2 github.com/golangci/golangci-lint v1.22.2
@ -38,7 +42,7 @@ require (
github.com/pborman/uuid v1.2.0 github.com/pborman/uuid v1.2.0
github.com/pelletier/go-toml v1.9.3 github.com/pelletier/go-toml v1.9.3
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.10.0 github.com/prometheus/client_golang v1.11.0
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/rjeczalik/notify v0.9.2 github.com/rjeczalik/notify v0.9.2
github.com/rs/cors v1.7.0 github.com/rs/cors v1.7.0
@ -48,13 +52,15 @@ require (
github.com/spf13/viper v1.6.1 github.com/spf13/viper v1.6.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca
github.com/tikv/client-go/v2 v2.0.1
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
go.uber.org/ratelimit v0.1.0 go.uber.org/ratelimit v0.1.0
go.uber.org/zap v1.16.0 go.uber.org/zap v1.20.0
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.33.2 golang.org/x/tools v0.1.7 // indirect
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.26.0 google.golang.org/protobuf v1.26.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"math/big" "math/big"
v3 "github.com/harmony-one/harmony/block/v3"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
@ -174,10 +176,20 @@ func (hmy *Harmony) GetPreStakingBlockRewards(
// GetLatestChainHeaders .. // GetLatestChainHeaders ..
func (hmy *Harmony) GetLatestChainHeaders() *block.HeaderPair { func (hmy *Harmony) GetLatestChainHeaders() *block.HeaderPair {
return &block.HeaderPair{ pair := &block.HeaderPair{
BeaconHeader: hmy.BeaconChain.CurrentHeader(), BeaconHeader: &block.Header{Header: v3.NewHeader()},
ShardHeader: hmy.BlockChain.CurrentHeader(), ShardHeader: &block.Header{Header: v3.NewHeader()},
}
if hmy.BeaconChain != nil {
pair.BeaconHeader = hmy.BeaconChain.CurrentHeader()
} }
if hmy.BlockChain != nil {
pair.ShardHeader = hmy.BlockChain.CurrentHeader()
}
return pair
} }
// GetLastCrossLinks .. // GetLastCrossLinks ..

@ -27,6 +27,7 @@ type HarmonyConfig struct {
Revert *RevertConfig `toml:",omitempty"` Revert *RevertConfig `toml:",omitempty"`
Legacy *LegacyConfig `toml:",omitempty"` Legacy *LegacyConfig `toml:",omitempty"`
Prometheus *PrometheusConfig `toml:",omitempty"` Prometheus *PrometheusConfig `toml:",omitempty"`
TiKV *TiKVConfig `toml:",omitempty"`
DNSSync DnsSync DNSSync DnsSync
ShardData ShardDataConfig ShardData ShardDataConfig
} }
@ -66,6 +67,18 @@ type GeneralConfig struct {
DataDir string DataDir string
TraceEnable bool TraceEnable bool
EnablePruneBeaconChain bool EnablePruneBeaconChain bool
RunElasticMode bool
}
type TiKVConfig struct {
Debug bool
PDAddr []string
Role string
StateDBCacheSizeInMB uint32
StateDBCachePersistencePath string
StateDBRedisServerAddr []string
StateDBRedisLRUTimeInDay uint32
} }
type ShardDataConfig struct { type ShardDataConfig struct {

@ -0,0 +1,126 @@
package shardchain
import (
"fmt"
"io"
"sync"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/harmony-one/harmony/internal/tikv"
tikvCommon "github.com/harmony-one/harmony/internal/tikv/common"
"github.com/harmony-one/harmony/internal/tikv/prefix"
"github.com/harmony-one/harmony/internal/tikv/remote"
"github.com/harmony-one/harmony/internal/tikv/statedb_cache"
"github.com/ethereum/go-ethereum/ethdb"
)
const (
LDBTiKVPrefix = "harmony_tikv"
)
type TiKvCacheConfig struct {
StateDBCacheSizeInMB uint32
StateDBCachePersistencePath string
StateDBRedisServerAddr string
StateDBRedisLRUTimeInDay uint32
}
// TiKvFactory is a memory-backed blockchain database factory.
type TiKvFactory struct {
cacheDBMap sync.Map
PDAddr []string
Role string
CacheConfig statedb_cache.StateDBCacheConfig
}
// getStateDB create statedb storage use tikv
func (f *TiKvFactory) getRemoteDB(shardID uint32) (*prefix.PrefixDatabase, error) {
key := fmt.Sprintf("remote_%d_%s", shardID, f.Role)
if db, ok := f.cacheDBMap.Load(key); ok {
return db.(*prefix.PrefixDatabase), nil
} else {
prefixStr := []byte(fmt.Sprintf("%s_%d/", LDBTiKVPrefix, shardID))
remoteDatabase, err := remote.NewRemoteDatabase(f.PDAddr, f.Role == tikv.RoleReader)
if err != nil {
return nil, err
}
tmpDB := prefix.NewPrefixDatabase(prefixStr, remoteDatabase)
if loadedDB, loaded := f.cacheDBMap.LoadOrStore(key, tmpDB); loaded {
_ = tmpDB.Close()
return loadedDB.(*prefix.PrefixDatabase), nil
}
return tmpDB, nil
}
}
// getStateDB create statedb storage with local memory cahce
func (f *TiKvFactory) getStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) {
key := fmt.Sprintf("state_db_%d_%s", shardID, f.Role)
if db, ok := f.cacheDBMap.Load(key); ok {
return db.(*statedb_cache.StateDBCacheDatabase), nil
} else {
db, err := f.getRemoteDB(shardID)
if err != nil {
return nil, err
}
tmpDB, err := statedb_cache.NewStateDBCacheDatabase(db, f.CacheConfig, f.Role == tikv.RoleReader)
if err != nil {
return nil, err
}
if loadedDB, loaded := f.cacheDBMap.LoadOrStore(key, tmpDB); loaded {
_ = tmpDB.Close()
return loadedDB.(*statedb_cache.StateDBCacheDatabase), nil
}
return tmpDB, nil
}
}
// NewChainDB returns a new memDB for the blockchain for given shard.
func (f *TiKvFactory) NewChainDB(shardID uint32) (ethdb.Database, error) {
var database ethdb.KeyValueStore
db, err := f.getRemoteDB(shardID)
if err != nil {
return nil, err
}
database = tikvCommon.ToEthKeyValueStore(db)
return rawdb.NewDatabase(database), nil
}
// NewStateDB create shard tikv database
func (f *TiKvFactory) NewStateDB(shardID uint32) (ethdb.Database, error) {
var database ethdb.KeyValueStore
cacheDatabase, err := f.getStateDB(shardID)
if err != nil {
return nil, err
}
database = tikvCommon.ToEthKeyValueStore(cacheDatabase)
return rawdb.NewDatabase(database), nil
}
// NewCacheStateDB create shard statedb storage with memory cache
func (f *TiKvFactory) NewCacheStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error) {
return f.getStateDB(shardID)
}
// CloseAllDB close all tikv database
func (f *TiKvFactory) CloseAllDB() {
f.cacheDBMap.Range(func(_, value interface{}) bool {
if closer, ok := value.(io.Closer); ok {
_ = closer.Close()
}
return true
})
}

@ -4,6 +4,10 @@ import (
"math/big" "math/big"
"sync" "sync"
"github.com/harmony-one/harmony/core/state"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/shard"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -33,13 +37,14 @@ type Collection interface {
// CollectionImpl is the main implementation of the shard chain collection. // CollectionImpl is the main implementation of the shard chain collection.
// See the Collection interface for details. // See the Collection interface for details.
type CollectionImpl struct { type CollectionImpl struct {
dbFactory DBFactory dbFactory DBFactory
dbInit DBInitializer dbInit DBInitializer
engine engine.Engine engine engine.Engine
mtx sync.Mutex mtx sync.Mutex
pool map[uint32]core.BlockChain pool map[uint32]core.BlockChain
disableCache map[uint32]bool disableCache map[uint32]bool
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
harmonyconfig *harmonyconfig.HarmonyConfig
} }
// NewCollection creates and returns a new shard chain collection. // NewCollection creates and returns a new shard chain collection.
@ -49,16 +54,18 @@ type CollectionImpl struct {
// dbInit is the shard chain initializer to use when the database returned by // dbInit is the shard chain initializer to use when the database returned by
// the factory is brand new (empty). // the factory is brand new (empty).
func NewCollection( func NewCollection(
harmonyconfig *harmonyconfig.HarmonyConfig,
dbFactory DBFactory, dbInit DBInitializer, engine engine.Engine, dbFactory DBFactory, dbInit DBInitializer, engine engine.Engine,
chainConfig *params.ChainConfig, chainConfig *params.ChainConfig,
) *CollectionImpl { ) *CollectionImpl {
return &CollectionImpl{ return &CollectionImpl{
dbFactory: dbFactory, harmonyconfig: harmonyconfig,
dbInit: dbInit, dbFactory: dbFactory,
engine: engine, dbInit: dbInit,
pool: make(map[uint32]core.BlockChain), engine: engine,
disableCache: make(map[uint32]bool), pool: make(map[uint32]core.BlockChain),
chainConfig: chainConfig, disableCache: make(map[uint32]bool),
chainConfig: chainConfig,
} }
} }
@ -105,7 +112,6 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c
// For beacon chain inside a shard chain, need to reset the eth chainID to shard 0's eth chainID in the config // For beacon chain inside a shard chain, need to reset the eth chainID to shard 0's eth chainID in the config
chainConfig.EthCompatibleChainID = big.NewInt(chainConfig.EthCompatibleShard0ChainID.Int64()) chainConfig.EthCompatibleChainID = big.NewInt(chainConfig.EthCompatibleShard0ChainID.Int64())
} }
opts := core.Options{} opts := core.Options{}
if len(options) == 1 { if len(options) == 1 {
opts = options[0] opts = options[0]
@ -114,8 +120,10 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c
if opts.EpochChain { if opts.EpochChain {
bc, err = core.NewEpochChain(db, &chainConfig, sc.engine, vm.Config{}) bc, err = core.NewEpochChain(db, &chainConfig, sc.engine, vm.Config{})
} else { } else {
stateCache, err := initStateCache(db, sc, shardID)
bc, err = core.NewBlockChainWithOptions( bc, err = core.NewBlockChainWithOptions(
db, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil, opts, db, stateCache, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil, opts,
) )
} }
@ -124,9 +132,28 @@ func (sc *CollectionImpl) ShardChain(shardID uint32, options ...core.Options) (c
} }
db = nil // don't close db = nil // don't close
sc.pool[shardID] = bc sc.pool[shardID] = bc
if sc.harmonyconfig.General.RunElasticMode {
// init the tikv mode
bc.InitTiKV(sc.harmonyconfig.TiKV)
}
return bc, nil return bc, nil
} }
func initStateCache(db ethdb.Database, sc *CollectionImpl, shardID uint32) (state.Database, error) {
if sc.harmonyconfig.General.RunElasticMode {
// used for tikv mode, init state db using tikv storage
stateDB, err := tikv_manage.GetDefaultTiKVFactory().NewStateDB(shardID)
if err != nil {
return nil, err
}
return state.NewDatabaseWithCache(stateDB, 64), nil
} else {
return state.NewDatabase(db), nil
}
}
// DisableCache disables caching mode for newly opened chains. // DisableCache disables caching mode for newly opened chains.
// It does not affect already open chains. For best effect, // It does not affect already open chains. For best effect,
// use this immediately after creating collection. // use this immediately after creating collection.

@ -0,0 +1,28 @@
package tikv_manage
import (
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/harmony-one/harmony/internal/tikv/statedb_cache"
)
type TiKvFactory interface {
NewChainDB(shardID uint32) (ethdb.Database, error)
NewStateDB(shardID uint32) (ethdb.Database, error)
NewCacheStateDB(shardID uint32) (*statedb_cache.StateDBCacheDatabase, error)
CloseAllDB()
}
var tikvInit = sync.Once{}
var tikvFactory TiKvFactory
func SetDefaultTiKVFactory(factory TiKvFactory) {
tikvInit.Do(func() {
tikvFactory = factory
})
}
func GetDefaultTiKVFactory() (factory TiKvFactory) {
return tikvFactory
}

@ -0,0 +1,73 @@
// from https://github.com/xtaci/smux/blob/master/alloc.go
package byte_alloc
import (
"sync"
)
// magic number
var debruijinPos = [...]byte{0, 9, 1, 10, 13, 21, 2, 29, 11, 14, 16, 18, 22, 25, 3, 30, 8, 12, 20, 28, 15, 17, 24, 7, 19, 27, 23, 6, 26, 5, 4, 31}
// Allocator for incoming frames, optimized to prevent overwriting after zeroing
type Allocator struct {
buffers []sync.Pool
}
// NewAllocator initiates a []byte allocator for frames less than 1048576 bytes,
// the waste(memory fragmentation) of space allocation is guaranteed to be
// no more than 50%.
func NewAllocator() *Allocator {
alloc := new(Allocator)
alloc.buffers = make([]sync.Pool, 21) // 1B -> 1M
for k := range alloc.buffers {
size := 1 << uint32(k)
alloc.buffers[k].New = func() interface{} {
return make([]byte, size)
}
}
return alloc
}
// Get a []byte from pool with most appropriate cap
func (alloc *Allocator) Get(size int) []byte {
if size <= 0 {
return nil
}
if size > 1048576 {
return make([]byte, size)
}
bits := msb(size)
if size == 1<<bits {
return alloc.buffers[bits].Get().([]byte)[:size]
} else {
return alloc.buffers[bits+1].Get().([]byte)[:size]
}
}
// Put returns a []byte to pool for future use,
// which the cap must be exactly 2^n
func (alloc *Allocator) Put(buf []byte) {
bufCap := cap(buf)
bits := msb(bufCap)
if bufCap == 0 || bufCap > 65536 || bufCap != 1<<bits {
return
}
alloc.buffers[bits].Put(buf)
return
}
// msb return the pos of most significiant bit
// Equivalent to: uint(math.Floor(math.Log2(float64(n))))
// http://supertech.csail.mit.edu/papers/debruijn.pdf
func msb(size int) byte {
v := uint32(size)
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
return debruijinPos[(v*0x07C4ACDD)>>27]
}

@ -0,0 +1,15 @@
package byte_alloc
var defaultAllocator *Allocator
func init() {
defaultAllocator = NewAllocator()
}
func Get(size int) []byte {
return defaultAllocator.Get(size)
}
func Put(buf []byte) {
defaultAllocator.Put(buf)
}

@ -0,0 +1,10 @@
package common
import (
"errors"
"github.com/syndtr/goleveldb/leveldb"
)
var ErrEmptyKey = errors.New("empty key is not supported")
var ErrNotFound = leveldb.ErrNotFound

@ -0,0 +1,20 @@
package common
import (
"unsafe"
)
// String converts byte slice to string.
func String(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
// StringBytes converts string to byte slice.
func StringBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}

@ -0,0 +1,41 @@
package common
import (
"io"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/syndtr/goleveldb/leveldb/util"
)
type TiKVStore interface {
ethdb.KeyValueReader
ethdb.KeyValueWriter
ethdb.Batcher
ethdb.Stater
ethdb.Compacter
io.Closer
NewIterator(start, end []byte) ethdb.Iterator
}
// TiKVStoreWrapper simple wrapper to covert to ethdb.KeyValueStore
type TiKVStoreWrapper struct {
TiKVStore
}
func (t *TiKVStoreWrapper) NewIterator() ethdb.Iterator {
return t.TiKVStore.NewIterator(nil, nil)
}
func (t *TiKVStoreWrapper) NewIteratorWithStart(start []byte) ethdb.Iterator {
return t.TiKVStore.NewIterator(start, nil)
}
func (t *TiKVStoreWrapper) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
bytesPrefix := util.BytesPrefix(prefix)
return t.TiKVStore.NewIterator(bytesPrefix.Start, bytesPrefix.Limit)
}
func ToEthKeyValueStore(store TiKVStore) ethdb.KeyValueStore {
return &TiKVStoreWrapper{TiKVStore: store}
}

@ -0,0 +1,6 @@
package tikv
const (
RoleReader = "Reader"
RoleWriter = "Writer"
)

@ -0,0 +1,42 @@
package prefix
import "github.com/ethereum/go-ethereum/ethdb"
type PrefixBatch struct {
prefix []byte
batch ethdb.Batch
}
func newPrefixBatch(prefix []byte, batch ethdb.Batch) *PrefixBatch {
return &PrefixBatch{prefix: prefix, batch: batch}
}
// Put inserts the given value into the key-value data store.
func (p *PrefixBatch) Put(key []byte, value []byte) error {
return p.batch.Put(append(append([]byte{}, p.prefix...), key...), value)
}
// Delete removes the key from the key-value data store.
func (p *PrefixBatch) Delete(key []byte) error {
return p.batch.Delete(append(append([]byte{}, p.prefix...), key...))
}
// ValueSize retrieves the amount of data queued up for writing.
func (p *PrefixBatch) ValueSize() int {
return p.batch.ValueSize()
}
// Write flushes any accumulated data to disk.
func (p *PrefixBatch) Write() error {
return p.batch.Write()
}
// Reset resets the batch for reuse.
func (p *PrefixBatch) Reset() {
p.batch.Reset()
}
// Replay replays the batch contents.
func (p *PrefixBatch) Replay(w ethdb.KeyValueWriter) error {
return p.batch.Replay(newPrefixBatchReplay(p.prefix, w))
}

@ -0,0 +1,35 @@
package prefix
import (
"bytes"
"github.com/ethereum/go-ethereum/ethdb"
)
type PrefixBatchReplay struct {
prefix []byte
prefixLen int
w ethdb.KeyValueWriter
}
func newPrefixBatchReplay(prefix []byte, w ethdb.KeyValueWriter) *PrefixBatchReplay {
return &PrefixBatchReplay{prefix: prefix, prefixLen: len(prefix), w: w}
}
// Put inserts the given value into the key-value data store.
func (p *PrefixBatchReplay) Put(key []byte, value []byte) error {
if bytes.HasPrefix(key, p.prefix) {
return p.w.Put(key[p.prefixLen:], value)
} else {
return p.w.Put(key, value)
}
}
// Delete removes the key from the key-value data store.
func (p *PrefixBatchReplay) Delete(key []byte) error {
if bytes.HasPrefix(key, p.prefix) {
return p.w.Delete(key[p.prefixLen:])
} else {
return p.w.Delete(key)
}
}

@ -0,0 +1,109 @@
package prefix
import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/harmony-one/harmony/internal/tikv/byte_alloc"
"github.com/harmony-one/harmony/internal/tikv/common"
)
// PrefixDatabase is a wrapper to split the storage with prefix
type PrefixDatabase struct {
prefix []byte
db common.TiKVStore
keysPool *byte_alloc.Allocator
}
func NewPrefixDatabase(prefix []byte, db common.TiKVStore) *PrefixDatabase {
return &PrefixDatabase{
prefix: prefix,
db: db,
keysPool: byte_alloc.NewAllocator(),
}
}
// makeKey use to create a key with prefix, keysPool can reduce gc pressure
func (p *PrefixDatabase) makeKey(keys []byte) []byte {
prefixLen := len(p.prefix)
byt := p.keysPool.Get(len(keys) + prefixLen)
copy(byt, p.prefix)
copy(byt[prefixLen:], keys)
return byt
}
// Has retrieves if a key is present in the key-value data store.
func (p *PrefixDatabase) Has(key []byte) (bool, error) {
return p.db.Has(p.makeKey(key))
}
// Get retrieves the given key if it's present in the key-value data store.
func (p *PrefixDatabase) Get(key []byte) ([]byte, error) {
return p.db.Get(p.makeKey(key))
}
// Put inserts the given value into the key-value data store.
func (p *PrefixDatabase) Put(key []byte, value []byte) error {
return p.db.Put(p.makeKey(key), value)
}
// Delete removes the key from the key-value data store.
func (p *PrefixDatabase) Delete(key []byte) error {
return p.db.Delete(p.makeKey(key))
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
func (p *PrefixDatabase) NewBatch() ethdb.Batch {
return newPrefixBatch(p.prefix, p.db.NewBatch())
}
// buildLimitUsePrefix build the limit byte from start byte, useful generating from prefix works
func (p *PrefixDatabase) buildLimitUsePrefix() []byte {
var limit []byte
for i := len(p.prefix) - 1; i >= 0; i-- {
c := p.prefix[i]
if c < 0xff {
limit = make([]byte, i+1)
copy(limit, p.prefix)
limit[i] = c + 1
break
}
}
return limit
}
// NewIterator creates a binary-alphabetical iterator over the start to end keyspace
// contained within the key-value database.
func (p *PrefixDatabase) NewIterator(start, end []byte) ethdb.Iterator {
start = append(p.prefix, start...)
if len(end) == 0 {
end = p.buildLimitUsePrefix()
} else {
end = append(p.prefix, end...)
}
return newPrefixIterator(p.prefix, p.db.NewIterator(start, end))
}
// Stat returns a particular internal stat of the database.
func (p *PrefixDatabase) Stat(property string) (string, error) {
return p.db.Stat(property)
}
// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (p *PrefixDatabase) Compact(start []byte, limit []byte) error {
return p.db.Compact(start, limit)
}
// Close the storage
func (p *PrefixDatabase) Close() error {
return p.db.Close()
}

@ -0,0 +1,53 @@
package prefix
import (
"github.com/ethereum/go-ethereum/ethdb"
)
type PrefixIterator struct {
prefix []byte
prefixLen int
it ethdb.Iterator
}
func newPrefixIterator(prefix []byte, it ethdb.Iterator) *PrefixIterator {
return &PrefixIterator{prefix: prefix, prefixLen: len(prefix), it: it}
}
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
func (i *PrefixIterator) Next() bool {
return i.it.Next()
}
// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error.
func (i *PrefixIterator) Error() error {
return i.it.Error()
}
// Key returns the key of the current key/value pair, or nil if done. The caller
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
func (i *PrefixIterator) Key() []byte {
key := i.it.Key()
if len(key) < len(i.prefix) {
return nil
}
return key[i.prefixLen:]
}
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
func (i *PrefixIterator) Value() []byte {
return i.it.Value()
}
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (i *PrefixIterator) Release() {
i.it.Release()
}

@ -0,0 +1,34 @@
package redis_helper
import (
"context"
"github.com/go-redis/redis/v8"
)
var redisInstance *redis.ClusterClient
// Init used to init redis instance in tikv mode
func Init(serverAddr []string) error {
option := &redis.ClusterOptions{
Addrs: serverAddr,
PoolSize: 2,
}
rdb := redis.NewClusterClient(option)
err := rdb.Ping(context.Background()).Err()
if err != nil {
return err
}
redisInstance = rdb
return nil
}
// Close disconnect redis instance in tikv mode
func Close() error {
if redisInstance != nil {
return redisInstance.Close()
}
return nil
}

@ -0,0 +1,72 @@
package redis_helper
import (
"context"
"crypto/rand"
"encoding/base64"
"github.com/go-redis/redis/v8"
)
type RedisPreempt struct {
key string
password string
lockScript, unlockScript *redis.Script
lastLockStatus bool
}
// CreatePreempt used to create a redis preempt instance
func CreatePreempt(key string) *RedisPreempt {
p := &RedisPreempt{
key: key,
}
p.init()
return p
}
// init redis preempt instance and some script
func (p *RedisPreempt) init() {
byt := make([]byte, 18)
_, _ = rand.Read(byt)
p.password = base64.StdEncoding.EncodeToString(byt)
p.lockScript = redis.NewScript(`
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return redis.call('set', KEYS[1], ARGV[1], 'ex', ARGV[2], 'nx')
end
`)
p.unlockScript = redis.NewScript(`
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`)
}
// TryLock attempt to lock the master for ttlSecond
func (p *RedisPreempt) TryLock(ttlSecond int) (ok bool, err error) {
ok, err = p.lockScript.Run(context.Background(), redisInstance, []string{p.key}, p.password, ttlSecond).Bool()
p.lastLockStatus = ok
return
}
// Unlock try to release the master permission
func (p *RedisPreempt) Unlock() (bool, error) {
if p == nil {
return false, nil
}
return p.unlockScript.Run(context.Background(), redisInstance, []string{p.key}, p.password).Bool()
}
// LastLockStatus get the last preempt status
func (p *RedisPreempt) LastLockStatus() bool {
if p == nil {
return false
}
return p.lastLockStatus
}

@ -0,0 +1,130 @@
package redis_helper
import (
"context"
"fmt"
"io"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
stakingTypes "github.com/harmony-one/harmony/staking/types"
"github.com/pkg/errors"
)
// BlockUpdate block update event
type BlockUpdate struct {
BlkNum uint64
Logs []*types.Log
}
// SubscribeShardUpdate subscribe block update event
func SubscribeShardUpdate(shardID uint32, cb func(blkNum uint64, logs []*types.Log)) {
pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("shard_update_%d", shardID))
for message := range pubsub.Channel() {
block := &BlockUpdate{}
err := rlp.DecodeBytes([]byte(message.Payload), block)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis subscribe shard update error")
continue
}
cb(block.BlkNum, block.Logs)
}
}
// PublishShardUpdate publish block update event
func PublishShardUpdate(shardID uint32, blkNum uint64, logs []*types.Log) error {
msg, err := rlp.EncodeToBytes(&BlockUpdate{
BlkNum: blkNum,
Logs: logs,
})
if err != nil {
return err
}
return redisInstance.Publish(context.Background(), fmt.Sprintf("shard_update_%d", shardID), msg).Err()
}
//TxPoolUpdate tx pool update event
type TxPoolUpdate struct {
typ string
Local bool
Tx types.PoolTransaction
}
// DecodeRLP decode struct from binary stream
func (t *TxPoolUpdate) DecodeRLP(stream *rlp.Stream) error {
if err := stream.Decode(&t.typ); err != nil {
return err
}
if err := stream.Decode(&t.Local); err != nil {
return err
}
switch t.typ {
case "types.EthTransaction":
var tmp = &types.EthTransaction{}
if err := stream.Decode(tmp); err != nil {
return err
}
t.Tx = tmp
case "types.Transaction":
var tmp = &types.Transaction{}
if err := stream.Decode(tmp); err != nil {
return err
}
t.Tx = tmp
case "stakingTypes.StakingTransaction":
var tmp = &stakingTypes.StakingTransaction{}
if err := stream.Decode(tmp); err != nil {
return err
}
t.Tx = tmp
default:
return errors.New("unknown txpool type")
}
return nil
}
// EncodeRLP encode struct to binary stream
func (t *TxPoolUpdate) EncodeRLP(w io.Writer) error {
switch t.Tx.(type) {
case *types.EthTransaction:
t.typ = "types.EthTransaction"
case *types.Transaction:
t.typ = "types.Transaction"
case *stakingTypes.StakingTransaction:
t.typ = "stakingTypes.StakingTransaction"
}
if err := rlp.Encode(w, t.typ); err != nil {
return err
}
if err := rlp.Encode(w, t.Local); err != nil {
return err
}
return rlp.Encode(w, t.Tx)
}
// SubscribeTxPoolUpdate subscribe tx pool update event
func SubscribeTxPoolUpdate(shardID uint32, cb func(tx types.PoolTransaction, local bool)) {
pubsub := redisInstance.Subscribe(context.Background(), fmt.Sprintf("txpool_update_%d", shardID))
for message := range pubsub.Channel() {
txu := &TxPoolUpdate{}
err := rlp.DecodeBytes([]byte(message.Payload), &txu)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis subscribe shard update error")
continue
}
cb(txu.Tx, txu.Local)
}
}
// PublishTxPoolUpdate publish tx pool update event
func PublishTxPoolUpdate(shardID uint32, tx types.PoolTransaction, local bool) error {
txu := &TxPoolUpdate{Local: local, Tx: tx}
msg, err := rlp.EncodeToBytes(txu)
if err != nil {
return err
}
return redisInstance.Publish(context.Background(), fmt.Sprintf("txpool_update_%d", shardID), msg).Err()
}

@ -0,0 +1,119 @@
package remote
import (
"bytes"
"context"
"sync"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/harmony-one/harmony/internal/tikv/common"
)
type RemoteBatch struct {
db *RemoteDatabase
lock sync.Mutex
size int
batchWriteKey [][]byte
batchWriteValue [][]byte
batchDeleteKey [][]byte
}
func newRemoteBatch(db *RemoteDatabase) *RemoteBatch {
return &RemoteBatch{db: db}
}
// Put inserts the given value into the key-value data store.
func (b *RemoteBatch) Put(key []byte, value []byte) error {
if len(key) == 0 {
return common.ErrEmptyKey
}
if len(value) == 0 {
value = EmptyValueStub
}
b.lock.Lock()
defer b.lock.Unlock()
b.batchWriteKey = append(b.batchWriteKey, key)
b.batchWriteValue = append(b.batchWriteValue, value)
b.size += len(key) + len(value)
return nil
}
// Delete removes the key from the key-value data store.
func (b *RemoteBatch) Delete(key []byte) error {
b.lock.Lock()
defer b.lock.Unlock()
b.batchDeleteKey = append(b.batchDeleteKey, key)
b.size += len(key)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *RemoteBatch) ValueSize() int {
return b.size
}
// Write flushes any accumulated data to disk.
func (b *RemoteBatch) Write() error {
b.lock.Lock()
defer b.lock.Unlock()
if len(b.batchWriteKey) > 0 {
err := b.db.client.BatchPut(context.Background(), b.batchWriteKey, b.batchWriteValue)
if err != nil {
return err
}
}
if len(b.batchDeleteKey) > 0 {
err := b.db.client.BatchDelete(context.Background(), b.batchDeleteKey)
if err != nil {
return err
}
}
return nil
}
// Reset resets the batch for reuse.
func (b *RemoteBatch) Reset() {
b.lock.Lock()
defer b.lock.Unlock()
b.batchWriteKey = b.batchWriteKey[:0]
b.batchWriteValue = b.batchWriteValue[:0]
b.batchDeleteKey = b.batchDeleteKey[:0]
b.size = 0
}
// Replay replays the batch contents.
func (b *RemoteBatch) Replay(w ethdb.KeyValueWriter) error {
for i, key := range b.batchWriteKey {
if bytes.Compare(b.batchWriteValue[i], EmptyValueStub) == 0 {
err := w.Put(key, []byte{})
if err != nil {
return err
}
} else {
err := w.Put(key, b.batchWriteValue[i])
if err != nil {
return err
}
}
}
if len(b.batchDeleteKey) > 0 {
for _, key := range b.batchDeleteKey {
err := w.Delete(key)
if err != nil {
return err
}
}
}
return nil
}

@ -0,0 +1,139 @@
package remote
import (
"bytes"
"context"
"runtime/trace"
"sync/atomic"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/harmony-one/harmony/internal/tikv/common"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/rawkv"
)
var EmptyValueStub = []byte("HarmonyTiKVEmptyValueStub")
type RemoteDatabase struct {
client *rawkv.Client
readOnly bool
isClose uint64
}
func NewRemoteDatabase(pdAddr []string, readOnly bool) (*RemoteDatabase, error) {
client, err := rawkv.NewClient(context.Background(), pdAddr, config.DefaultConfig().Security)
if err != nil {
return nil, err
}
db := &RemoteDatabase{
client: client,
readOnly: readOnly,
}
return db, nil
}
// ReadOnly set storage to readonly mode
func (d *RemoteDatabase) ReadOnly() {
d.readOnly = true
}
// Has retrieves if a key is present in the key-value data store.
func (d *RemoteDatabase) Has(key []byte) (bool, error) {
data, err := d.Get(key)
if err != nil {
if err == common.ErrNotFound {
return false, nil
}
return false, err
} else {
return len(data) != 0, nil
}
}
// Get retrieves the given key if it's present in the key-value data store.
func (d *RemoteDatabase) Get(key []byte) ([]byte, error) {
if len(key) == 0 {
return nil, common.ErrEmptyKey
}
region := trace.StartRegion(context.Background(), "tikv Get")
defer region.End()
get, err := d.client.Get(context.Background(), key)
if err != nil {
return nil, err
}
if len(get) == 0 {
return nil, common.ErrNotFound
}
if len(get) == len(EmptyValueStub) && bytes.Compare(get, EmptyValueStub) == 0 {
get = get[:0]
}
return get, nil
}
// Put inserts the given value into the key-value data store.
func (d *RemoteDatabase) Put(key []byte, value []byte) error {
if len(key) == 0 {
return common.ErrEmptyKey
}
if d.readOnly {
return nil
}
if len(value) == 0 {
value = EmptyValueStub
}
return d.client.Put(context.Background(), key, value)
}
// Delete removes the key from the key-value data store.
func (d *RemoteDatabase) Delete(key []byte) error {
if len(key) == 0 {
return common.ErrEmptyKey
}
if d.readOnly {
return nil
}
return d.client.Delete(context.Background(), key)
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
func (d *RemoteDatabase) NewBatch() ethdb.Batch {
if d.readOnly {
return newNopRemoteBatch(d)
}
return newRemoteBatch(d)
}
func (d *RemoteDatabase) NewIterator(start, end []byte) ethdb.Iterator {
return newRemoteIterator(d, start, end)
}
// Stat returns a particular internal stat of the database.
func (d *RemoteDatabase) Stat(property string) (string, error) {
return "", common.ErrNotFound
}
// Compact tikv current not supprot manual compact
func (d *RemoteDatabase) Compact(start []byte, limit []byte) error {
return nil
}
// Close disconnect the tikv
func (d *RemoteDatabase) Close() error {
if atomic.CompareAndSwapUint64(&d.isClose, 0, 1) {
return d.client.Close()
}
return nil
}

@ -0,0 +1,110 @@
package remote
import (
"context"
"sync"
)
const (
iteratorOnce = 300
)
type RemoteIterator struct {
db *RemoteDatabase
lock sync.Mutex
limit []byte
start []byte
err error
end bool
pos int
keys, values [][]byte
currentKey, currentValue []byte
}
func newRemoteIterator(db *RemoteDatabase, start, limit []byte) *RemoteIterator {
return &RemoteIterator{
db: db,
start: start,
limit: limit,
}
}
// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is exhausted.
func (i *RemoteIterator) Next() bool {
if i.end {
return false
}
if i.keys == nil {
if next := i.scanNext(); !next {
return false
}
}
i.currentKey = i.keys[i.pos]
i.currentValue = i.values[i.pos]
i.pos++
if i.pos >= len(i.keys) {
i.scanNext()
}
return true
}
// scanNext real scan from tikv, and cache it
func (i *RemoteIterator) scanNext() bool {
keys, values, err := i.db.client.Scan(context.Background(), i.start, i.limit, iteratorOnce)
if err != nil {
i.err = err
i.end = true
return false
}
if len(keys) == 0 {
i.end = true
return false
} else {
i.start = append(keys[len(keys)-1], 0)
}
i.pos = 0
i.keys = keys
i.values = values
return true
}
// Error returns any accumulated error. Exhausting all the key/value pairs
// is not considered to be an error.
func (i *RemoteIterator) Error() error {
return i.err
}
// Key returns the key of the current key/value pair, or nil if done. The caller
// should not modify the contents of the returned slice, and its contents may
// change on the next call to Next.
func (i *RemoteIterator) Key() []byte {
return i.currentKey
}
// Value returns the value of the current key/value pair, or nil if done. The
// caller should not modify the contents of the returned slice, and its contents
// may change on the next call to Next.
func (i *RemoteIterator) Value() []byte {
return i.currentValue
}
// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (i *RemoteIterator) Release() {
i.db = nil
i.end = true
i.keys = nil
i.values = nil
i.currentKey = nil
i.currentValue = nil
}

@ -0,0 +1,36 @@
package remote
import (
"github.com/ethereum/go-ethereum/ethdb"
)
// NopRemoteBatch on readonly mode, write operator will be reject
type NopRemoteBatch struct {
}
func newNopRemoteBatch(db *RemoteDatabase) *NopRemoteBatch {
return &NopRemoteBatch{}
}
func (b *NopRemoteBatch) Put(key []byte, value []byte) error {
return nil
}
func (b *NopRemoteBatch) Delete(key []byte) error {
return nil
}
func (b *NopRemoteBatch) ValueSize() int {
return 0
}
func (b *NopRemoteBatch) Write() error {
return nil
}
func (b *NopRemoteBatch) Reset() {
}
func (b *NopRemoteBatch) Replay(w ethdb.KeyValueWriter) error {
return nil
}

@ -0,0 +1,45 @@
package statedb_cache
import (
"github.com/ethereum/go-ethereum/ethdb"
)
type StateDBCacheBatch struct {
db *StateDBCacheDatabase
remoteBatch ethdb.Batch
}
func newStateDBCacheBatch(db *StateDBCacheDatabase, batch ethdb.Batch) *StateDBCacheBatch {
return &StateDBCacheBatch{db: db, remoteBatch: batch}
}
func (b *StateDBCacheBatch) Put(key []byte, value []byte) error {
return b.remoteBatch.Put(key, value)
}
func (b *StateDBCacheBatch) Delete(key []byte) error {
return b.remoteBatch.Delete(key)
}
func (b *StateDBCacheBatch) ValueSize() int {
// In ethdb, the size of each commit is too small, this controls the submission frequency
return b.remoteBatch.ValueSize() / 40
}
func (b *StateDBCacheBatch) Write() (err error) {
defer func() {
if err == nil {
_ = b.db.cacheWrite(b)
}
}()
return b.remoteBatch.Write()
}
func (b *StateDBCacheBatch) Reset() {
b.remoteBatch.Reset()
}
func (b *StateDBCacheBatch) Replay(w ethdb.KeyValueWriter) error {
return b.remoteBatch.Replay(w)
}

@ -0,0 +1,373 @@
package statedb_cache
import (
"context"
"log"
"runtime"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/ethdb"
redis "github.com/go-redis/redis/v8"
"github.com/harmony-one/harmony/internal/tikv/common"
)
const (
maxMemoryEntrySize = 64 * 1024
maxPipelineEntriesCount = 10000
cacheMinGapInSecond = 600
)
type cacheWrapper struct {
*fastcache.Cache
}
// Put inserts the given value into the key-value data store.
func (c *cacheWrapper) Put(key []byte, value []byte) error {
c.Cache.Set(key, value)
return nil
}
// Delete removes the key from the key-value data store.
func (c *cacheWrapper) Delete(key []byte) error {
c.Cache.Del(key)
return nil
}
type redisPipelineWrapper struct {
redis.Pipeliner
expiredTime time.Duration
}
// Put inserts the given value into the key-value data store.
func (c *redisPipelineWrapper) Put(key []byte, value []byte) error {
c.SetEX(context.Background(), string(key), value, c.expiredTime)
return nil
}
// Delete removes the key from the key-value data store.
func (c *redisPipelineWrapper) Delete(key []byte) error {
c.Del(context.Background(), string(key))
return nil
}
type StateDBCacheConfig struct {
CacheSizeInMB uint32
CachePersistencePath string
RedisServerAddr []string
RedisLRUTimeInDay uint32
DebugHitRate bool
}
type StateDBCacheDatabase struct {
config StateDBCacheConfig
enableReadCache bool
isClose uint64
remoteDB common.TiKVStore
// l1cache (memory)
l1Cache *cacheWrapper
// l2cache (redis)
l2Cache *redis.ClusterClient
l2ReadOnly bool
l2ExpiredTime time.Duration
l2NextExpiredTime time.Time
l2ExpiredRefresh chan string
// stats
l1HitCount uint64
l2HitCount uint64
missCount uint64
notFoundOrErrorCount uint64
}
func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfig, readOnly bool) (*StateDBCacheDatabase, error) {
// create or load memory cache from file
cache := fastcache.LoadFromFileOrNew(config.CachePersistencePath, int(config.CacheSizeInMB)*1024*1024)
// create options
option := &redis.ClusterOptions{
Addrs: config.RedisServerAddr,
PoolSize: 32,
IdleTimeout: 60 * time.Second,
}
if readOnly {
option.PoolSize = 16
option.ReadOnly = true
}
// check redis connection
rdb := redis.NewClusterClient(option)
err := rdb.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
// reload redis cluster slots status
rdb.ReloadState(context.Background())
db := &StateDBCacheDatabase{
config: config,
enableReadCache: true,
remoteDB: remoteDB,
l1Cache: &cacheWrapper{cache},
l2Cache: rdb,
l2ReadOnly: readOnly,
l2ExpiredTime: time.Duration(config.RedisLRUTimeInDay) * 24 * time.Hour,
}
if !readOnly {
// Read a copy of the memory hit data into redis to improve the hit rate
// refresh read time to prevent recycling by lru
db.l2ExpiredRefresh = make(chan string, 100000)
go db.startL2ExpiredRefresh()
}
// print debug info
if config.DebugHitRate {
go func() {
for range time.Tick(5 * time.Second) {
db.cacheStatusPrint()
}
}()
}
return db, nil
}
// Has retrieves if a key is present in the key-value data store.
func (c *StateDBCacheDatabase) Has(key []byte) (bool, error) {
return c.remoteDB.Has(key)
}
// Get retrieves the given key if it's present in the key-value data store.
func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) {
if c.enableReadCache {
var ok bool
// first, get data from memory cache
keyStr := string(key)
if ret, ok = c.l1Cache.HasGet(nil, key); ok {
if !c.l2ReadOnly {
select {
// refresh read time to prevent recycling by lru
case c.l2ExpiredRefresh <- keyStr:
default:
}
}
atomic.AddUint64(&c.l1HitCount, 1)
return ret, nil
}
defer func() {
if err == nil {
if len(ret) < maxMemoryEntrySize {
// set data to memory db if loaded from redis cache or leveldb
c.l1Cache.Set(key, ret)
}
}
}()
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
// load data from redis cache
data := c.l2Cache.Get(timeoutCtx, keyStr)
if data.Err() == nil {
atomic.AddUint64(&c.l2HitCount, 1)
return data.Bytes()
} else if data.Err() != redis.Nil {
log.Printf("[Get WARN]Redis Error: %v", data.Err())
}
if !c.l2ReadOnly {
defer func() {
// set data to redis db if loaded from leveldb
if err == nil {
c.l2Cache.SetEX(context.Background(), keyStr, ret, c.l2ExpiredTime)
}
}()
}
}
ret, err = c.remoteDB.Get(key)
if err != nil {
atomic.AddUint64(&c.notFoundOrErrorCount, 1)
} else {
atomic.AddUint64(&c.missCount, 1)
}
return
}
// Put inserts the given value into the key-value data store.
func (c *StateDBCacheDatabase) Put(key []byte, value []byte) (err error) {
if c.enableReadCache {
defer func() {
if err == nil {
if len(value) < maxMemoryEntrySize {
// memory db only accept the small data
c.l1Cache.Set(key, value)
}
if !c.l2ReadOnly {
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
// send the data to redis
res := c.l2Cache.SetEX(timeoutCtx, string(key), value, c.l2ExpiredTime)
if res.Err() == nil {
log.Printf("[Put WARN]Redis Error: %v", res.Err())
}
}
}
}()
}
return c.remoteDB.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (c *StateDBCacheDatabase) Delete(key []byte) (err error) {
if c.enableReadCache {
defer func() {
if err == nil {
c.l1Cache.Del(key)
if !c.l2ReadOnly {
c.l2Cache.Del(context.Background(), string(key))
}
}
}()
}
return c.remoteDB.Delete(key)
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
func (c *StateDBCacheDatabase) NewBatch() ethdb.Batch {
return newStateDBCacheBatch(c, c.remoteDB.NewBatch())
}
// Stat returns a particular internal stat of the database.
func (c *StateDBCacheDatabase) Stat(property string) (string, error) {
switch property {
case "tikv.save.cache":
_ = c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU())
}
return c.remoteDB.Stat(property)
}
func (c *StateDBCacheDatabase) Compact(start []byte, limit []byte) error {
return c.remoteDB.Compact(start, limit)
}
func (c *StateDBCacheDatabase) NewIterator(start, end []byte) ethdb.Iterator {
return c.remoteDB.NewIterator(start, end)
}
// Close disconnect the redis and save memory cache to file
func (c *StateDBCacheDatabase) Close() error {
if atomic.CompareAndSwapUint64(&c.isClose, 0, 1) {
err := c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU())
if err != nil {
log.Printf("save file to '%s' error: %v", c.config.CachePersistencePath, err)
}
if !c.l2ReadOnly {
close(c.l2ExpiredRefresh)
time.Sleep(time.Second)
for range c.l2ExpiredRefresh {
// nop, clear chan
}
}
_ = c.l2Cache.Close()
return c.remoteDB.Close()
}
return nil
}
// cacheWrite write batch to cache
func (c *StateDBCacheDatabase) cacheWrite(b ethdb.Batch) error {
if !c.l2ReadOnly {
pipeline := c.l2Cache.Pipeline()
err := b.Replay(&redisPipelineWrapper{Pipeliner: pipeline, expiredTime: c.l2ExpiredTime})
if err != nil {
return err
}
_, err = pipeline.Exec(context.Background())
if err != nil {
log.Printf("[BatchWrite WARN]Redis Error: %v", err)
}
}
return b.Replay(c.l1Cache)
}
func (c *StateDBCacheDatabase) refreshL2ExpiredTime() {
unix := time.Now().Add(c.l2ExpiredTime).Unix()
unix = unix - (unix % cacheMinGapInSecond) + cacheMinGapInSecond
c.l2NextExpiredTime = time.Unix(unix, 0)
}
// startL2ExpiredRefresh batch refresh redis cache
func (c *StateDBCacheDatabase) startL2ExpiredRefresh() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
pipeline := c.l2Cache.Pipeline()
lastWrite := time.Now()
for {
select {
case key, ok := <-c.l2ExpiredRefresh:
if !ok {
return
}
pipeline.ExpireAt(context.Background(), key, c.l2NextExpiredTime)
if pipeline.Len() > maxPipelineEntriesCount {
_, _ = pipeline.Exec(context.Background())
lastWrite = time.Now()
}
case now := <-ticker.C:
c.refreshL2ExpiredTime()
if pipeline.Len() > 0 && now.Sub(lastWrite) > time.Second {
_, _ = pipeline.Exec(context.Background())
lastWrite = time.Now()
}
}
}
}
// print stats to stdout
func (c *StateDBCacheDatabase) cacheStatusPrint() {
var state = &fastcache.Stats{}
state.Reset()
c.l1Cache.UpdateStats(state)
stats := c.l2Cache.PoolStats()
log.Printf("redis: TotalConns: %d, IdleConns: %d, StaleConns: %d", stats.TotalConns, stats.IdleConns, stats.StaleConns)
total := float64(c.l1HitCount + c.l2HitCount + c.missCount + c.notFoundOrErrorCount)
log.Printf(
"total: l1Hit: %d(%.2f%%), l2Hit: %d(%.2f%%), miss: %d(%.2f%%), notFoundOrErrorCount: %d, fastcache: GetCalls: %d, SetCalls: %d, Misses: %d, EntriesCount: %d, BytesSize: %d",
c.l1HitCount, float64(c.l1HitCount)/total*100, c.l2HitCount, float64(c.l2HitCount)/total*100, c.missCount, float64(c.missCount)/total*100, c.notFoundOrErrorCount,
state.GetCalls, state.SetCalls, state.Misses, state.EntriesCount, state.BytesSize,
)
}
func (c *StateDBCacheDatabase) ReplayCache(batch ethdb.Batch) error {
return c.cacheWrite(batch)
}

@ -1,6 +1,9 @@
package utils package utils
import "encoding/hex" import (
"encoding/hex"
"math/big"
)
// use to look up number of 1 bit in 4 bits // use to look up number of 1 bit in 4 bits
var halfByteLookup = [16]int{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4} var halfByteLookup = [16]int{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4}
@ -44,3 +47,36 @@ func CountOneBits(arr []byte) int64 {
} }
return int64(count) return int64(count)
} }
func BytesMiddle(a, b []byte) []byte {
if a == nil && b == nil {
return []byte{128}
}
if len(a) > len(b) {
tmp := make([]byte, len(a))
if b == nil {
for i, _ := range tmp {
tmp[i] = 255
}
}
copy(tmp, b)
b = tmp
} else if len(a) < len(b) {
tmp := make([]byte, len(b))
if a == nil {
for i, _ := range tmp {
tmp[i] = 0
}
}
copy(tmp, a)
a = tmp
}
aI := big.NewInt(0).SetBytes(a)
bI := big.NewInt(0).SetBytes(b)
aI.Add(aI, bI)
aI.Div(aI, big.NewInt(2))
return aI.Bytes()
}

@ -3,12 +3,17 @@ package node
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"math/big" "math/big"
"os" "os"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
"github.com/harmony-one/harmony/internal/utils/crosslinks" "github.com/harmony-one/harmony/internal/utils/crosslinks"
@ -152,6 +157,11 @@ func (node *Node) Blockchain() core.BlockChain {
// Beaconchain returns the beaconchain from node. // Beaconchain returns the beaconchain from node.
func (node *Node) Beaconchain() core.BlockChain { func (node *Node) Beaconchain() core.BlockChain {
// tikv mode not have the BeaconChain storage
if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID {
return nil
}
return node.chain(shard.BeaconChainShardID, core.Options{}) return node.chain(shard.BeaconChainShardID, core.Options{})
} }
@ -222,6 +232,13 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) []error {
Msg("[addPendingTransactions] Node out of sync, ignoring transactions") Msg("[addPendingTransactions] Node out of sync, ignoring transactions")
return nil return nil
} }
// in tikv mode, reader only accept the pending transaction from writer node, ignore the p2p message
if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.TiKV.Role == tikv.RoleReader {
log.Printf("skip reader addPendingTransactions: %#v", newTxs)
return nil
}
poolTxs := types.PoolTransactions{} poolTxs := types.PoolTransactions{}
errs := []error{} errs := []error{}
acceptCx := node.Blockchain().Config().AcceptsCrossTx(node.Blockchain().CurrentHeader().Epoch()) acceptCx := node.Blockchain().Config().AcceptsCrossTx(node.Blockchain().CurrentHeader().Epoch())
@ -434,6 +451,11 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
case proto_node.Sync: case proto_node.Sync:
nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "block_sync"}).Inc() nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "block_sync"}).Inc()
// in tikv mode, not need BeaconChain message
if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.General.ShardID != shard.BeaconChainShardID {
return nil, 0, errIgnoreBeaconMsg
}
// checks whether the beacon block is larger than current block number // checks whether the beacon block is larger than current block number
blocksPayload := payload[p2pNodeMsgPrefixSize+1:] blocksPayload := payload[p2pNodeMsgPrefixSize+1:]
var blocks []*types.Block var blocks []*types.Block
@ -997,7 +1019,7 @@ func New(
engine := chain.NewEngine() engine := chain.NewEngine()
collection := shardchain.NewCollection( collection := shardchain.NewCollection(
chainDBFactory, &core.GenesisInitializer{NetworkType: node.NodeConfig.GetNetworkType()}, engine, &chainConfig, harmonyconfig, chainDBFactory, &core.GenesisInitializer{NetworkType: node.NodeConfig.GetNetworkType()}, engine, &chainConfig,
) )
for shardID, archival := range isArchival { for shardID, archival := range isArchival {
@ -1022,16 +1044,19 @@ func New(
} }
if b1, b2 := beaconChain == nil, blockchain == nil; b1 || b2 { if b1, b2 := beaconChain == nil, blockchain == nil; b1 || b2 {
var err error // in tikv mode, not need BeaconChain
if b2 { if !(node.HarmonyConfig != nil && node.HarmonyConfig.General.RunElasticMode) || node.HarmonyConfig.General.ShardID == shard.BeaconChainShardID {
shardID := node.NodeConfig.ShardID var err error
// HACK get the real error reason if b2 {
_, err = node.shardChains.ShardChain(shardID) shardID := node.NodeConfig.ShardID
} else { // HACK get the real error reason
_, err = node.shardChains.ShardChain(shard.BeaconChainShardID) _, err = node.shardChains.ShardChain(shardID)
} else {
_, err = node.shardChains.ShardChain(shard.BeaconChainShardID)
}
fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err)
os.Exit(-1)
} }
fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err)
os.Exit(-1)
} }
node.BlockChannel = make(chan *types.Block) node.BlockChannel = make(chan *types.Block)
@ -1053,6 +1078,16 @@ func New(
txPoolConfig.Blacklist = blacklist txPoolConfig.Blacklist = blacklist
txPoolConfig.AllowedTxs = allowedTxs txPoolConfig.AllowedTxs = allowedTxs
txPoolConfig.Journal = fmt.Sprintf("%v/%v", node.NodeConfig.DBDir, txPoolConfig.Journal) txPoolConfig.Journal = fmt.Sprintf("%v/%v", node.NodeConfig.DBDir, txPoolConfig.Journal)
txPoolConfig.AddEvent = func(tx types.PoolTransaction, local bool) {
// in tikv mode, writer will publish tx pool update to all reader
if node.Blockchain().IsTikvWriterMaster() {
err := redis_helper.PublishTxPoolUpdate(uint32(harmonyconfig.General.ShardID), tx, local)
if err != nil {
utils.Logger().Info().Err(err).Msg("redis publish txpool update error")
}
}
}
node.TxPool = core.NewTxPool(txPoolConfig, node.Blockchain().Config(), blockchain, node.TransactionErrorSink) node.TxPool = core.NewTxPool(txPoolConfig, node.Blockchain().Config(), blockchain, node.TransactionErrorSink)
node.CxPool = core.NewCxPool(core.CxPoolSize) node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, engine) node.Worker = worker.New(node.Blockchain().Config(), blockchain, engine)
@ -1060,6 +1095,13 @@ func New(
node.deciderCache, _ = lru.New(16) node.deciderCache, _ = lru.New(16)
node.committeeCache, _ = lru.New(16) node.committeeCache, _ = lru.New(16)
// in tikv mode, not need BeaconChain
if !node.HarmonyConfig.General.RunElasticMode && node.Blockchain().ShardID() != shard.BeaconChainShardID {
node.BeaconWorker = worker.New(
node.Beaconchain().Config(), beaconChain, engine,
)
}
node.pendingCXReceipts = map[string]*types.CXReceiptsProof{} node.pendingCXReceipts = map[string]*types.CXReceiptsProof{}
node.proposedBlock = map[uint64]*types.Block{} node.proposedBlock = map[uint64]*types.Block{}
node.Consensus.VerifiedNewBlock = make(chan *types.Block, 1) node.Consensus.VerifiedNewBlock = make(chan *types.Block, 1)
@ -1108,8 +1150,11 @@ func New(
}() }()
} }
// update reward values now that node is ready // in tikv mode, not need BeaconChain
node.updateInitialRewardValues() if !(node.HarmonyConfig != nil && node.HarmonyConfig.General.RunElasticMode) || node.HarmonyConfig.General.ShardID == shard.BeaconChainShardID {
// update reward values now that node is ready
node.updateInitialRewardValues()
}
// init metrics // init metrics
initMetrics() initMetrics()
@ -1273,6 +1318,18 @@ func (node *Node) ShutDown() {
node.Blockchain().Stop() node.Blockchain().Stop()
node.Beaconchain().Stop() node.Beaconchain().Stop()
if node.HarmonyConfig.General.RunElasticMode {
_, _ = node.Blockchain().RedisPreempt().Unlock()
_, _ = node.Beaconchain().RedisPreempt().Unlock()
_ = redis_helper.Close()
time.Sleep(time.Second)
if storage := tikv_manage.GetDefaultTiKVFactory(); storage != nil {
storage.CloseAllDB()
}
}
const msg = "Successfully shut down!\n" const msg = "Successfully shut down!\n"
utils.Logger().Print(msg) utils.Logger().Print(msg)
fmt.Print(msg) fmt.Print(msg)
@ -1365,3 +1422,38 @@ func (node *Node) GetAddresses(epoch *big.Int) map[string]common.Address {
func (node *Node) IsRunningBeaconChain() bool { func (node *Node) IsRunningBeaconChain() bool {
return node.NodeConfig.ShardID == shard.BeaconChainShardID return node.NodeConfig.ShardID == shard.BeaconChainShardID
} }
// syncFromTiKVWriter used for tikv mode, subscribe data from tikv writer
func (node *Node) syncFromTiKVWriter() {
bc := node.Blockchain()
// subscribe block update
go redis_helper.SubscribeShardUpdate(bc.ShardID(), func(blkNum uint64, logs []*types.Log) {
err := bc.SyncFromTiKVWriter(blkNum, logs)
if err != nil {
utils.Logger().Warn().
Err(err).
Msg("cannot sync block from tikv writer")
return
}
})
// subscribe txpool update
if node.HarmonyConfig.TiKV.Role == tikv.RoleReader {
go redis_helper.SubscribeTxPoolUpdate(bc.ShardID(), func(tx types.PoolTransaction, local bool) {
var err error
if local {
err = node.TxPool.AddLocal(tx)
} else {
err = node.TxPool.AddRemote(tx)
}
if err != nil {
utils.Logger().Debug().
Err(err).
Interface("tx", tx).
Msg("cannot sync txpool from tikv writer")
return
}
})
}
}

@ -5,6 +5,8 @@ import (
"encoding/json" "encoding/json"
"sync" "sync"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
@ -145,6 +147,11 @@ func (node *Node) TraceLoopForExplorer() {
// AddNewBlockForExplorer add new block for explorer. // AddNewBlockForExplorer add new block for explorer.
func (node *Node) AddNewBlockForExplorer(block *types.Block) { func (node *Node) AddNewBlockForExplorer(block *types.Block) {
if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.TiKV.Role == tikv.RoleReader {
node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64())
return
}
utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node") utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node")
if _, err := node.Blockchain().InsertChain([]*types.Block{block}, false); err == nil { if _, err := node.Blockchain().InsertChain([]*types.Block{block}, false); err == nil {
@ -153,23 +160,38 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) {
} }
// Clean up the blocks to avoid OOM. // Clean up the blocks to avoid OOM.
node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64()) node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64())
// Do dump all blocks from state syncing for explorer one time
// TODO: some blocks can be dumped before state syncing finished. // if in tikv mode, only master writer node need dump all explorer block
// And they would be dumped again here. Please fix it. if !node.HarmonyConfig.General.RunElasticMode || node.Blockchain().IsTikvWriterMaster() {
once.Do(func() { // Do dump all blocks from state syncing for explorer one time
utils.Logger().Info().Int64("starting height", int64(block.NumberU64())-1). // TODO: some blocks can be dumped before state syncing finished.
Msg("[Explorer] Populating explorer data from state synced blocks") // And they would be dumped again here. Please fix it.
go func() { once.Do(func() {
exp, err := node.getExplorerService() utils.Logger().Info().Int64("starting height", int64(block.NumberU64())-1).
if err != nil { Msg("[Explorer] Populating explorer data from state synced blocks")
// shall be unreachable go func() {
utils.Logger().Fatal().Err(err).Msg("critical error in explorer node") exp, err := node.getExplorerService()
} if err != nil {
for blockHeight := int64(block.NumberU64()) - 1; blockHeight >= 0; blockHeight-- { // shall be unreachable
exp.DumpCatchupBlock(node.Blockchain().GetBlockByNumber(uint64(blockHeight))) utils.Logger().Fatal().Err(err).Msg("critical error in explorer node")
} }
}()
}) if block.NumberU64() == 0 {
return
}
// get checkpoint bitmap and flip all bit
bitmap := exp.GetCheckpointBitmap()
bitmap.Flip(0, block.NumberU64())
// find all not processed block and dump it
iterator := bitmap.ReverseIterator()
for iterator.HasNext() {
exp.DumpCatchupBlock(node.Blockchain().GetBlockByNumber(iterator.Next()))
}
}()
})
}
} else { } else {
utils.Logger().Error().Err(err).Msg("[Explorer] Error when adding new block for explorer node") utils.Logger().Error().Err(err).Msg("[Explorer] Error when adding new block for explorer node")
} }
@ -177,17 +199,20 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) {
// ExplorerMessageHandler passes received message in node_handler to explorer service. // ExplorerMessageHandler passes received message in node_handler to explorer service.
func (node *Node) commitBlockForExplorer(block *types.Block) { func (node *Node) commitBlockForExplorer(block *types.Block) {
if block.ShardID() != node.NodeConfig.ShardID { // if in tikv mode, only master writer node need dump explorer block
return if !node.HarmonyConfig.General.RunElasticMode || (node.HarmonyConfig.TiKV.Role == tikv.RoleWriter && node.Blockchain().IsTikvWriterMaster()) {
} if block.ShardID() != node.NodeConfig.ShardID {
// Dump new block into level db. return
utils.Logger().Info().Uint64("blockNum", block.NumberU64()).Msg("[Explorer] Committing block into explorer DB") }
exp, err := node.getExplorerService() // Dump new block into level db.
if err != nil { utils.Logger().Info().Uint64("blockNum", block.NumberU64()).Msg("[Explorer] Committing block into explorer DB")
// shall be unreachable exp, err := node.getExplorerService()
utils.Logger().Fatal().Err(err).Msg("critical error in explorer node") if err != nil {
// shall be unreachable
utils.Logger().Fatal().Err(err).Msg("critical error in explorer node")
}
exp.DumpNewBlock(block)
} }
exp.DumpNewBlock(block)
curNum := block.NumberU64() curNum := block.NumberU64()
if curNum-100 > 0 { if curNum-100 > 0 {

@ -8,6 +8,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/harmony-one/harmony/internal/tikv"
prom "github.com/harmony-one/harmony/api/service/prometheus" prom "github.com/harmony-one/harmony/api/service/prometheus"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -216,6 +218,10 @@ func (node *Node) doBeaconSyncing() {
return return
} }
if node.HarmonyConfig.General.RunElasticMode {
return
}
if !node.NodeConfig.Downloader { if !node.NodeConfig.Downloader {
// If Downloader is not working, we need also deal with blocks from beaconBlockChannel // If Downloader is not working, we need also deal with blocks from beaconBlockChannel
go func(node *Node) { go func(node *Node) {
@ -319,7 +325,22 @@ func (node *Node) StartGRPCSyncClient() {
Msg("SupportBeaconSyncing") Msg("SupportBeaconSyncing")
go node.doBeaconSyncing() go node.doBeaconSyncing()
} }
node.supportSyncing() }
// NodeSyncing makes sure to start all the processes needed to sync the node based on different configuration factors.
func (node *Node) NodeSyncing() {
if node.HarmonyConfig.General.RunElasticMode {
node.syncFromTiKVWriter() // this is for both reader and backup writers
if node.HarmonyConfig.TiKV.Role == tikv.RoleReader {
node.Consensus.UpdateConsensusInformation()
}
if node.HarmonyConfig.TiKV.Role == tikv.RoleWriter {
node.supportSyncing() // the writer needs to be in sync with it's other peers
}
} else if !node.HarmonyConfig.General.IsOffline && node.HarmonyConfig.DNSSync.Client {
node.supportSyncing() // for non-writer-reader mode a.k.a tikv nodes
}
} }
// supportSyncing keeps sleeping until it's doing consensus or it's a leader. // supportSyncing keeps sleeping until it's doing consensus or it's a leader.

@ -27,7 +27,7 @@ func (node *Node) RegisterValidatorServices() {
func (node *Node) RegisterExplorerServices() { func (node *Node) RegisterExplorerServices() {
// Register explorer service. // Register explorer service.
node.serviceManager.Register( node.serviceManager.Register(
service.SupportExplorer, explorer.New(&node.SelfPeer, node.Blockchain(), node), service.SupportExplorer, explorer.New(node.HarmonyConfig, &node.SelfPeer, node.Blockchain(), node),
) )
} }

@ -5,6 +5,8 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/harmony-one/harmony/core/state"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -43,7 +45,7 @@ func TestNewWorker(t *testing.T) {
genesis := gspec.MustCommit(database) genesis := gspec.MustCommit(database)
_ = genesis _ = genesis
chain, err := core.NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) chain, err := core.NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -70,7 +72,7 @@ func TestCommitTransactions(t *testing.T) {
) )
gspec.MustCommit(database) gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) chain, _ := core.NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil)
// Create a new worker // Create a new worker
worker := New(params.TestChainConfig, chain, engine) worker := New(params.TestChainConfig, chain, engine)

@ -85,7 +85,7 @@ func (c *CallAPIService) Call(
func NewCallAPIService(hmy *hmy.Harmony, limiterEnable bool, rateLimit int) server.CallAPIServicer { func NewCallAPIService(hmy *hmy.Harmony, limiterEnable bool, rateLimit int) server.CallAPIServicer {
return &CallAPIService{ return &CallAPIService{
hmy: hmy, hmy: hmy,
publicContractAPI: rpc2.NewPublicContractAPI(hmy, rpc2.V2), publicContractAPI: rpc2.NewPublicContractAPI(hmy, rpc2.V2, limiterEnable, rateLimit),
publicStakingAPI: rpc2.NewPublicStakingAPI(hmy, rpc2.V2), publicStakingAPI: rpc2.NewPublicStakingAPI(hmy, rpc2.V2),
publicBlockChainAPI: rpc2.NewPublicBlockchainAPI(hmy, rpc2.V2, limiterEnable, rateLimit), publicBlockChainAPI: rpc2.NewPublicBlockchainAPI(hmy, rpc2.V2, limiterEnable, rateLimit),
} }

@ -35,11 +35,16 @@ type PublicContractService struct {
} }
// NewPublicContractAPI creates a new API for the RPC interface // NewPublicContractAPI creates a new API for the RPC interface
func NewPublicContractAPI(hmy *hmy.Harmony, version Version) rpc.API { func NewPublicContractAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API {
var limiter *rate.Limiter
if limiterEnable {
limiter = rate.NewLimiter(rate.Limit(limit), limit)
}
return rpc.API{ return rpc.API{
Namespace: version.Namespace(), Namespace: version.Namespace(),
Version: APIVersion, Version: APIVersion,
Service: &PublicContractService{hmy, version, rate.NewLimiter(200, 1500)}, Service: &PublicContractService{hmy, version, limiter},
Public: true, Public: true,
} }
} }

@ -37,11 +37,15 @@ type PublicPoolService struct {
} }
// NewPublicPoolAPI creates a new API for the RPC interface // NewPublicPoolAPI creates a new API for the RPC interface
func NewPublicPoolAPI(hmy *hmy.Harmony, version Version) rpc.API { func NewPublicPoolAPI(hmy *hmy.Harmony, version Version, limiterEnable bool, limit int) rpc.API {
var limiter *rate.Limiter
if limiterEnable {
limiter = rate.NewLimiter(rate.Limit(limit), limit)
}
return rpc.API{ return rpc.API{
Namespace: version.Namespace(), Namespace: version.Namespace(),
Version: APIVersion, Version: APIVersion,
Service: &PublicPoolService{hmy, version, rate.NewLimiter(2, 5)}, Service: &PublicPoolService{hmy, version, limiter},
Public: true, Public: true,
} }
} }

@ -158,12 +158,12 @@ func getAPIs(hmy *hmy.Harmony, config nodeconfig.RPCServerConfig) []rpc.API {
NewPublicHarmonyAPI(hmy, V2), NewPublicHarmonyAPI(hmy, V2),
NewPublicBlockchainAPI(hmy, V1, config.RateLimiterEnabled, config.RequestsPerSecond), NewPublicBlockchainAPI(hmy, V1, config.RateLimiterEnabled, config.RequestsPerSecond),
NewPublicBlockchainAPI(hmy, V2, config.RateLimiterEnabled, config.RequestsPerSecond), NewPublicBlockchainAPI(hmy, V2, config.RateLimiterEnabled, config.RequestsPerSecond),
NewPublicContractAPI(hmy, V1), NewPublicContractAPI(hmy, V1, config.RateLimiterEnabled, config.RequestsPerSecond),
NewPublicContractAPI(hmy, V2), NewPublicContractAPI(hmy, V2, config.RateLimiterEnabled, config.RequestsPerSecond),
NewPublicTransactionAPI(hmy, V1), NewPublicTransactionAPI(hmy, V1),
NewPublicTransactionAPI(hmy, V2), NewPublicTransactionAPI(hmy, V2),
NewPublicPoolAPI(hmy, V1), NewPublicPoolAPI(hmy, V1, config.RateLimiterEnabled, config.RequestsPerSecond),
NewPublicPoolAPI(hmy, V2), NewPublicPoolAPI(hmy, V2, config.RateLimiterEnabled, config.RequestsPerSecond),
} }
// Legacy methods (subject to removal) // Legacy methods (subject to removal)
@ -185,9 +185,9 @@ func getAPIs(hmy *hmy.Harmony, config nodeconfig.RPCServerConfig) []rpc.API {
publicAPIs = append(publicAPIs, publicAPIs = append(publicAPIs,
NewPublicHarmonyAPI(hmy, Eth), NewPublicHarmonyAPI(hmy, Eth),
NewPublicBlockchainAPI(hmy, Eth, config.RateLimiterEnabled, config.RequestsPerSecond), NewPublicBlockchainAPI(hmy, Eth, config.RateLimiterEnabled, config.RequestsPerSecond),
NewPublicContractAPI(hmy, Eth), NewPublicContractAPI(hmy, Eth, config.RateLimiterEnabled, config.RequestsPerSecond),
NewPublicTransactionAPI(hmy, Eth), NewPublicTransactionAPI(hmy, Eth),
NewPublicPoolAPI(hmy, Eth), NewPublicPoolAPI(hmy, Eth, config.RateLimiterEnabled, config.RequestsPerSecond),
eth.NewPublicEthService(hmy, "eth"), eth.NewPublicEthService(hmy, "eth"),
) )
} }

@ -15,6 +15,7 @@ import (
blockfactory "github.com/harmony-one/harmony/block/factory" blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
core_state "github.com/harmony-one/harmony/core/state" core_state "github.com/harmony-one/harmony/core/state"
harmonyState "github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/crypto/hash"
@ -205,7 +206,7 @@ func playFaucetContract(chain core.BlockChain) {
func main() { func main() {
genesis := gspec.MustCommit(database) genesis := gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, chain.Engine(), vm.Config{}, nil) chain, _ := core.NewBlockChain(database, harmonyState.NewDatabase(database), nil, gspec.Config, chain.Engine(), vm.Config{}, nil)
txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain, types.NewTransactionErrorSink()) txpool := core.NewTxPool(core.DefaultTxPoolConfig, chainConfig, chain, types.NewTransactionErrorSink())
backend := &testWorkerBackend{ backend := &testWorkerBackend{

@ -109,7 +109,7 @@ func main() {
genesis := gspec.MustCommit(database) genesis := gspec.MustCommit(database)
_ = genesis _ = genesis
engine := chain.NewEngine() engine := chain.NewEngine()
bc, _ := core.NewBlockChain(database, nil, gspec.Config, engine, vm.Config{}, nil) bc, _ := core.NewBlockChain(database, state.NewDatabase(database), nil, gspec.Config, engine, vm.Config{}, nil)
statedb, _ := state.New(common2.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase())) statedb, _ := state.New(common2.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()))
msg := createValidator() msg := createValidator()
statedb.AddBalance(msg.ValidatorAddress, new(big.Int).Mul(big.NewInt(5e18), big.NewInt(2000))) statedb.AddBalance(msg.ValidatorAddress, new(big.Int).Mul(big.NewInt(5e18), big.NewInt(2000)))

Loading…
Cancel
Save