You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
374 lines
9.4 KiB
374 lines
9.4 KiB
3 years ago
|
package statedb_cache
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"log"
|
||
|
"runtime"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/VictoriaMetrics/fastcache"
|
||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||
|
redis "github.com/go-redis/redis/v8"
|
||
|
"github.com/harmony-one/harmony/internal/tikv/common"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
maxMemoryEntrySize = 64 * 1024
|
||
|
maxPipelineEntriesCount = 10000
|
||
|
cacheMinGapInSecond = 600
|
||
|
)
|
||
|
|
||
|
type cacheWrapper struct {
|
||
|
*fastcache.Cache
|
||
|
}
|
||
|
|
||
|
// Put inserts the given value into the key-value data store.
|
||
|
func (c *cacheWrapper) Put(key []byte, value []byte) error {
|
||
|
c.Cache.Set(key, value)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Delete removes the key from the key-value data store.
|
||
|
func (c *cacheWrapper) Delete(key []byte) error {
|
||
|
c.Cache.Del(key)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type redisPipelineWrapper struct {
|
||
|
redis.Pipeliner
|
||
|
|
||
|
expiredTime time.Duration
|
||
|
}
|
||
|
|
||
|
// Put inserts the given value into the key-value data store.
|
||
|
func (c *redisPipelineWrapper) Put(key []byte, value []byte) error {
|
||
|
c.SetEX(context.Background(), string(key), value, c.expiredTime)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Delete removes the key from the key-value data store.
|
||
|
func (c *redisPipelineWrapper) Delete(key []byte) error {
|
||
|
c.Del(context.Background(), string(key))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type StateDBCacheConfig struct {
|
||
|
CacheSizeInMB uint32
|
||
|
CachePersistencePath string
|
||
|
RedisServerAddr []string
|
||
|
RedisLRUTimeInDay uint32
|
||
|
DebugHitRate bool
|
||
|
}
|
||
|
|
||
|
type StateDBCacheDatabase struct {
|
||
|
config StateDBCacheConfig
|
||
|
enableReadCache bool
|
||
|
isClose uint64
|
||
|
remoteDB common.TiKVStore
|
||
|
|
||
|
// l1cache (memory)
|
||
|
l1Cache *cacheWrapper
|
||
|
|
||
|
// l2cache (redis)
|
||
|
l2Cache *redis.ClusterClient
|
||
|
l2ReadOnly bool
|
||
|
l2ExpiredTime time.Duration
|
||
|
l2NextExpiredTime time.Time
|
||
|
l2ExpiredRefresh chan string
|
||
|
|
||
|
// stats
|
||
|
l1HitCount uint64
|
||
|
l2HitCount uint64
|
||
|
missCount uint64
|
||
|
notFoundOrErrorCount uint64
|
||
|
}
|
||
|
|
||
|
func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfig, readOnly bool) (*StateDBCacheDatabase, error) {
|
||
|
// create or load memory cache from file
|
||
|
cache := fastcache.LoadFromFileOrNew(config.CachePersistencePath, int(config.CacheSizeInMB)*1024*1024)
|
||
|
|
||
|
// create options
|
||
|
option := &redis.ClusterOptions{
|
||
|
Addrs: config.RedisServerAddr,
|
||
|
PoolSize: 32,
|
||
|
IdleTimeout: 60 * time.Second,
|
||
|
}
|
||
|
|
||
|
if readOnly {
|
||
|
option.PoolSize = 16
|
||
|
option.ReadOnly = true
|
||
|
}
|
||
|
|
||
|
// check redis connection
|
||
|
rdb := redis.NewClusterClient(option)
|
||
|
err := rdb.Ping(context.Background()).Err()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// reload redis cluster slots status
|
||
|
rdb.ReloadState(context.Background())
|
||
|
|
||
|
db := &StateDBCacheDatabase{
|
||
|
config: config,
|
||
|
enableReadCache: true,
|
||
|
remoteDB: remoteDB,
|
||
|
l1Cache: &cacheWrapper{cache},
|
||
|
l2Cache: rdb,
|
||
|
l2ReadOnly: readOnly,
|
||
|
l2ExpiredTime: time.Duration(config.RedisLRUTimeInDay) * 24 * time.Hour,
|
||
|
}
|
||
|
|
||
|
if !readOnly {
|
||
|
// Read a copy of the memory hit data into redis to improve the hit rate
|
||
|
// refresh read time to prevent recycling by lru
|
||
|
db.l2ExpiredRefresh = make(chan string, 100000)
|
||
|
go db.startL2ExpiredRefresh()
|
||
|
}
|
||
|
|
||
|
// print debug info
|
||
|
if config.DebugHitRate {
|
||
|
go func() {
|
||
|
for range time.Tick(5 * time.Second) {
|
||
|
db.cacheStatusPrint()
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
return db, nil
|
||
|
}
|
||
|
|
||
|
// Has retrieves if a key is present in the key-value data store.
|
||
|
func (c *StateDBCacheDatabase) Has(key []byte) (bool, error) {
|
||
|
return c.remoteDB.Has(key)
|
||
|
}
|
||
|
|
||
|
// Get retrieves the given key if it's present in the key-value data store.
|
||
|
func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) {
|
||
|
if c.enableReadCache {
|
||
|
var ok bool
|
||
|
|
||
|
// first, get data from memory cache
|
||
|
keyStr := string(key)
|
||
|
if ret, ok = c.l1Cache.HasGet(nil, key); ok {
|
||
|
if !c.l2ReadOnly {
|
||
|
select {
|
||
|
// refresh read time to prevent recycling by lru
|
||
|
case c.l2ExpiredRefresh <- keyStr:
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
atomic.AddUint64(&c.l1HitCount, 1)
|
||
|
return ret, nil
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
if err == nil {
|
||
|
if len(ret) < maxMemoryEntrySize {
|
||
|
// set data to memory db if loaded from redis cache or leveldb
|
||
|
c.l1Cache.Set(key, ret)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
|
||
|
defer cancelFunc()
|
||
|
|
||
|
// load data from redis cache
|
||
|
data := c.l2Cache.Get(timeoutCtx, keyStr)
|
||
|
if data.Err() == nil {
|
||
|
atomic.AddUint64(&c.l2HitCount, 1)
|
||
|
return data.Bytes()
|
||
|
} else if data.Err() != redis.Nil {
|
||
|
log.Printf("[Get WARN]Redis Error: %v", data.Err())
|
||
|
}
|
||
|
|
||
|
if !c.l2ReadOnly {
|
||
|
defer func() {
|
||
|
// set data to redis db if loaded from leveldb
|
||
|
if err == nil {
|
||
|
c.l2Cache.SetEX(context.Background(), keyStr, ret, c.l2ExpiredTime)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
ret, err = c.remoteDB.Get(key)
|
||
|
if err != nil {
|
||
|
atomic.AddUint64(&c.notFoundOrErrorCount, 1)
|
||
|
} else {
|
||
|
atomic.AddUint64(&c.missCount, 1)
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Put inserts the given value into the key-value data store.
|
||
|
func (c *StateDBCacheDatabase) Put(key []byte, value []byte) (err error) {
|
||
|
if c.enableReadCache {
|
||
|
defer func() {
|
||
|
if err == nil {
|
||
|
if len(value) < maxMemoryEntrySize {
|
||
|
// memory db only accept the small data
|
||
|
c.l1Cache.Set(key, value)
|
||
|
}
|
||
|
if !c.l2ReadOnly {
|
||
|
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
|
||
|
defer cancelFunc()
|
||
|
|
||
|
// send the data to redis
|
||
|
res := c.l2Cache.SetEX(timeoutCtx, string(key), value, c.l2ExpiredTime)
|
||
|
if res.Err() == nil {
|
||
|
log.Printf("[Put WARN]Redis Error: %v", res.Err())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
return c.remoteDB.Put(key, value)
|
||
|
}
|
||
|
|
||
|
// Delete removes the key from the key-value data store.
|
||
|
func (c *StateDBCacheDatabase) Delete(key []byte) (err error) {
|
||
|
if c.enableReadCache {
|
||
|
defer func() {
|
||
|
if err == nil {
|
||
|
c.l1Cache.Del(key)
|
||
|
if !c.l2ReadOnly {
|
||
|
c.l2Cache.Del(context.Background(), string(key))
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
return c.remoteDB.Delete(key)
|
||
|
}
|
||
|
|
||
|
// NewBatch creates a write-only database that buffers changes to its host db
|
||
|
// until a final write is called.
|
||
|
func (c *StateDBCacheDatabase) NewBatch() ethdb.Batch {
|
||
|
return newStateDBCacheBatch(c, c.remoteDB.NewBatch())
|
||
|
}
|
||
|
|
||
|
// Stat returns a particular internal stat of the database.
|
||
|
func (c *StateDBCacheDatabase) Stat(property string) (string, error) {
|
||
|
switch property {
|
||
|
case "tikv.save.cache":
|
||
|
_ = c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU())
|
||
|
}
|
||
|
return c.remoteDB.Stat(property)
|
||
|
}
|
||
|
|
||
|
func (c *StateDBCacheDatabase) Compact(start []byte, limit []byte) error {
|
||
|
return c.remoteDB.Compact(start, limit)
|
||
|
}
|
||
|
|
||
|
func (c *StateDBCacheDatabase) NewIterator(start, end []byte) ethdb.Iterator {
|
||
|
return c.remoteDB.NewIterator(start, end)
|
||
|
}
|
||
|
|
||
|
// Close disconnect the redis and save memory cache to file
|
||
|
func (c *StateDBCacheDatabase) Close() error {
|
||
|
if atomic.CompareAndSwapUint64(&c.isClose, 0, 1) {
|
||
|
err := c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU())
|
||
|
if err != nil {
|
||
|
log.Printf("save file to '%s' error: %v", c.config.CachePersistencePath, err)
|
||
|
}
|
||
|
|
||
|
if !c.l2ReadOnly {
|
||
|
close(c.l2ExpiredRefresh)
|
||
|
time.Sleep(time.Second)
|
||
|
|
||
|
for range c.l2ExpiredRefresh {
|
||
|
// nop, clear chan
|
||
|
}
|
||
|
}
|
||
|
|
||
|
_ = c.l2Cache.Close()
|
||
|
|
||
|
return c.remoteDB.Close()
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// cacheWrite write batch to cache
|
||
|
func (c *StateDBCacheDatabase) cacheWrite(b ethdb.Batch) error {
|
||
|
if !c.l2ReadOnly {
|
||
|
pipeline := c.l2Cache.Pipeline()
|
||
|
|
||
|
err := b.Replay(&redisPipelineWrapper{Pipeliner: pipeline, expiredTime: c.l2ExpiredTime})
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
_, err = pipeline.Exec(context.Background())
|
||
|
if err != nil {
|
||
|
log.Printf("[BatchWrite WARN]Redis Error: %v", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return b.Replay(c.l1Cache)
|
||
|
}
|
||
|
|
||
|
func (c *StateDBCacheDatabase) refreshL2ExpiredTime() {
|
||
|
unix := time.Now().Add(c.l2ExpiredTime).Unix()
|
||
|
unix = unix - (unix % cacheMinGapInSecond) + cacheMinGapInSecond
|
||
|
c.l2NextExpiredTime = time.Unix(unix, 0)
|
||
|
}
|
||
|
|
||
|
// startL2ExpiredRefresh batch refresh redis cache
|
||
|
func (c *StateDBCacheDatabase) startL2ExpiredRefresh() {
|
||
|
ticker := time.NewTicker(time.Second)
|
||
|
defer ticker.Stop()
|
||
|
|
||
|
pipeline := c.l2Cache.Pipeline()
|
||
|
lastWrite := time.Now()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case key, ok := <-c.l2ExpiredRefresh:
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
pipeline.ExpireAt(context.Background(), key, c.l2NextExpiredTime)
|
||
|
if pipeline.Len() > maxPipelineEntriesCount {
|
||
|
_, _ = pipeline.Exec(context.Background())
|
||
|
lastWrite = time.Now()
|
||
|
}
|
||
|
case now := <-ticker.C:
|
||
|
c.refreshL2ExpiredTime()
|
||
|
|
||
|
if pipeline.Len() > 0 && now.Sub(lastWrite) > time.Second {
|
||
|
_, _ = pipeline.Exec(context.Background())
|
||
|
lastWrite = time.Now()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// print stats to stdout
|
||
|
func (c *StateDBCacheDatabase) cacheStatusPrint() {
|
||
|
var state = &fastcache.Stats{}
|
||
|
state.Reset()
|
||
|
c.l1Cache.UpdateStats(state)
|
||
|
|
||
|
stats := c.l2Cache.PoolStats()
|
||
|
log.Printf("redis: TotalConns: %d, IdleConns: %d, StaleConns: %d", stats.TotalConns, stats.IdleConns, stats.StaleConns)
|
||
|
total := float64(c.l1HitCount + c.l2HitCount + c.missCount + c.notFoundOrErrorCount)
|
||
|
|
||
|
log.Printf(
|
||
|
"total: l1Hit: %d(%.2f%%), l2Hit: %d(%.2f%%), miss: %d(%.2f%%), notFoundOrErrorCount: %d, fastcache: GetCalls: %d, SetCalls: %d, Misses: %d, EntriesCount: %d, BytesSize: %d",
|
||
|
c.l1HitCount, float64(c.l1HitCount)/total*100, c.l2HitCount, float64(c.l2HitCount)/total*100, c.missCount, float64(c.missCount)/total*100, c.notFoundOrErrorCount,
|
||
|
state.GetCalls, state.SetCalls, state.Misses, state.EntriesCount, state.BytesSize,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
func (c *StateDBCacheDatabase) ReplayCache(batch ethdb.Batch) error {
|
||
|
return c.cacheWrite(batch)
|
||
|
}
|