The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
woop/internal/tikv/statedb_cache/statedb_cache_database.go

380 lines
9.5 KiB

package statedb_cache
import (
"context"
"log"
"runtime"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/ethdb"
redis "github.com/go-redis/redis/v8"
"github.com/woop-chain/woop/internal/tikv/common"
)
const (
maxMemoryEntrySize = 64 * 1024
maxPipelineEntriesCount = 10000
cacheMinGapInSecond = 600
)
type cacheWrapper struct {
*fastcache.Cache
}
// Put inserts the given value into the key-value data store.
func (c *cacheWrapper) Put(key []byte, value []byte) error {
c.Cache.Set(key, value)
return nil
}
// Delete removes the key from the key-value data store.
func (c *cacheWrapper) Delete(key []byte) error {
c.Cache.Del(key)
return nil
}
type redisPipelineWrapper struct {
redis.Pipeliner
expiredTime time.Duration
}
// Put inserts the given value into the key-value data store.
func (c *redisPipelineWrapper) Put(key []byte, value []byte) error {
c.SetEX(context.Background(), string(key), value, c.expiredTime)
return nil
}
// Delete removes the key from the key-value data store.
func (c *redisPipelineWrapper) Delete(key []byte) error {
c.Del(context.Background(), string(key))
return nil
}
type StateDBCacheConfig struct {
CacheSizeInMB uint32
CachePersistencePath string
RedisServerAddr []string
RedisLRUTimeInDay uint32
DebugHitRate bool
}
type StateDBCacheDatabase struct {
config StateDBCacheConfig
enableReadCache bool
isClose uint64
remoteDB common.TiKVStore
// l1cache (memory)
l1Cache *cacheWrapper
// l2cache (redis)
l2Cache *redis.ClusterClient
l2ReadOnly bool
l2ExpiredTime time.Duration
l2NextExpiredTime time.Time
l2ExpiredRefresh chan string
// stats
l1HitCount uint64
l2HitCount uint64
missCount uint64
notFoundOrErrorCount uint64
}
func NewStateDBCacheDatabase(remoteDB common.TiKVStore, config StateDBCacheConfig, readOnly bool) (*StateDBCacheDatabase, error) {
// create or load memory cache from file
cache := fastcache.LoadFromFileOrNew(config.CachePersistencePath, int(config.CacheSizeInMB)*1024*1024)
// create options
option := &redis.ClusterOptions{
Addrs: config.RedisServerAddr,
PoolSize: 32,
IdleTimeout: 60 * time.Second,
}
if readOnly {
option.PoolSize = 16
option.ReadOnly = true
}
// check redis connection
rdb := redis.NewClusterClient(option)
err := rdb.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
// reload redis cluster slots status
rdb.ReloadState(context.Background())
db := &StateDBCacheDatabase{
config: config,
enableReadCache: true,
remoteDB: remoteDB,
l1Cache: &cacheWrapper{cache},
l2Cache: rdb,
l2ReadOnly: readOnly,
l2ExpiredTime: time.Duration(config.RedisLRUTimeInDay) * 24 * time.Hour,
}
if !readOnly {
// Read a copy of the memory hit data into redis to improve the hit rate
// refresh read time to prevent recycling by lru
db.l2ExpiredRefresh = make(chan string, 100000)
go db.startL2ExpiredRefresh()
}
// print debug info
if config.DebugHitRate {
go func() {
for range time.Tick(5 * time.Second) {
db.cacheStatusPrint()
}
}()
}
return db, nil
}
func (c *StateDBCacheDatabase) AncientDatadir() (string, error) {
return "", nil
}
func (c *StateDBCacheDatabase) NewBatchWithSize(size int) ethdb.Batch {
return nil
}
// Has retrieves if a key is present in the key-value data store.
func (c *StateDBCacheDatabase) Has(key []byte) (bool, error) {
return c.remoteDB.Has(key)
}
// Get retrieves the given key if it's present in the key-value data store.
func (c *StateDBCacheDatabase) Get(key []byte) (ret []byte, err error) {
if c.enableReadCache {
var ok bool
// first, get data from memory cache
keyStr := string(key)
if ret, ok = c.l1Cache.HasGet(nil, key); ok {
if !c.l2ReadOnly {
select {
// refresh read time to prevent recycling by lru
case c.l2ExpiredRefresh <- keyStr:
default:
}
}
atomic.AddUint64(&c.l1HitCount, 1)
return ret, nil
}
defer func() {
if err == nil {
if len(ret) < maxMemoryEntrySize {
// set data to memory db if loaded from redis cache or leveldb
c.l1Cache.Set(key, ret)
}
}
}()
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
// load data from redis cache
data := c.l2Cache.Get(timeoutCtx, keyStr)
if data.Err() == nil {
atomic.AddUint64(&c.l2HitCount, 1)
return data.Bytes()
} else if data.Err() != redis.Nil {
log.Printf("[Get WARN]Redis Error: %v", data.Err())
}
if !c.l2ReadOnly {
defer func() {
// set data to redis db if loaded from leveldb
if err == nil {
c.l2Cache.SetEX(context.Background(), keyStr, ret, c.l2ExpiredTime)
}
}()
}
}
ret, err = c.remoteDB.Get(key)
if err != nil {
atomic.AddUint64(&c.notFoundOrErrorCount, 1)
} else {
atomic.AddUint64(&c.missCount, 1)
}
return
}
// Put inserts the given value into the key-value data store.
func (c *StateDBCacheDatabase) Put(key []byte, value []byte) (err error) {
if c.enableReadCache {
defer func() {
if err == nil {
if len(value) < maxMemoryEntrySize {
// memory db only accept the small data
c.l1Cache.Set(key, value)
}
if !c.l2ReadOnly {
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
// send the data to redis
res := c.l2Cache.SetEX(timeoutCtx, string(key), value, c.l2ExpiredTime)
if res.Err() == nil {
log.Printf("[Put WARN]Redis Error: %v", res.Err())
}
}
}
}()
}
return c.remoteDB.Put(key, value)
}
// Delete removes the key from the key-value data store.
func (c *StateDBCacheDatabase) Delete(key []byte) (err error) {
if c.enableReadCache {
defer func() {
if err == nil {
c.l1Cache.Del(key)
if !c.l2ReadOnly {
c.l2Cache.Del(context.Background(), string(key))
}
}
}()
}
return c.remoteDB.Delete(key)
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called.
func (c *StateDBCacheDatabase) NewBatch() ethdb.Batch {
return newStateDBCacheBatch(c, c.remoteDB.NewBatch())
}
// Stat returns a particular internal stat of the database.
func (c *StateDBCacheDatabase) Stat(property string) (string, error) {
switch property {
case "tikv.save.cache":
_ = c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU())
}
return c.remoteDB.Stat(property)
}
func (c *StateDBCacheDatabase) Compact(start []byte, limit []byte) error {
return c.remoteDB.Compact(start, limit)
}
func (c *StateDBCacheDatabase) NewIterator(start, end []byte) ethdb.Iterator {
return c.remoteDB.NewIterator(start, end)
}
// Close disconnect the redis and save memory cache to file
func (c *StateDBCacheDatabase) Close() error {
if atomic.CompareAndSwapUint64(&c.isClose, 0, 1) {
defer c.remoteDB.Close()
defer c.l2Cache.Close()
err := c.l1Cache.SaveToFileConcurrent(c.config.CachePersistencePath, runtime.NumCPU())
if err != nil {
log.Printf("save file to '%s' error: %v", c.config.CachePersistencePath, err)
}
if !c.l2ReadOnly {
time.Sleep(time.Second)
close(c.l2ExpiredRefresh)
for range c.l2ExpiredRefresh {
// nop, clear chan
}
}
}
return nil
}
// cacheWrite write batch to cache
func (c *StateDBCacheDatabase) cacheWrite(b ethdb.Batch) error {
if !c.l2ReadOnly {
pipeline := c.l2Cache.Pipeline()
err := b.Replay(&redisPipelineWrapper{Pipeliner: pipeline, expiredTime: c.l2ExpiredTime})
if err != nil {
return err
}
_, err = pipeline.Exec(context.Background())
if err != nil {
log.Printf("[BatchWrite WARN]Redis Error: %v", err)
}
}
return b.Replay(c.l1Cache)
}
func (c *StateDBCacheDatabase) refreshL2ExpiredTime() {
unix := time.Now().Add(c.l2ExpiredTime).Unix()
unix = unix - (unix % cacheMinGapInSecond) + cacheMinGapInSecond
c.l2NextExpiredTime = time.Unix(unix, 0)
}
// startL2ExpiredRefresh batch refresh redis cache
func (c *StateDBCacheDatabase) startL2ExpiredRefresh() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
pipeline := c.l2Cache.Pipeline()
lastWrite := time.Now()
for {
select {
case key, ok := <-c.l2ExpiredRefresh:
if !ok {
return
}
pipeline.ExpireAt(context.Background(), key, c.l2NextExpiredTime)
if pipeline.Len() > maxPipelineEntriesCount {
_, _ = pipeline.Exec(context.Background())
lastWrite = time.Now()
}
case now := <-ticker.C:
c.refreshL2ExpiredTime()
if pipeline.Len() > 0 && now.Sub(lastWrite) > time.Second {
_, _ = pipeline.Exec(context.Background())
lastWrite = time.Now()
}
}
}
}
// print stats to stdout
func (c *StateDBCacheDatabase) cacheStatusPrint() {
var state = &fastcache.Stats{}
state.Reset()
c.l1Cache.UpdateStats(state)
stats := c.l2Cache.PoolStats()
log.Printf("redis: TotalConns: %d, IdleConns: %d, StaleConns: %d", stats.TotalConns, stats.IdleConns, stats.StaleConns)
total := float64(c.l1HitCount + c.l2HitCount + c.missCount + c.notFoundOrErrorCount)
log.Printf(
"total: l1Hit: %d(%.2f%%), l2Hit: %d(%.2f%%), miss: %d(%.2f%%), notFoundOrErrorCount: %d, fastcache: GetCalls: %d, SetCalls: %d, Misses: %d, EntriesCount: %d, BytesSize: %d",
c.l1HitCount, float64(c.l1HitCount)/total*100, c.l2HitCount, float64(c.l2HitCount)/total*100, c.missCount, float64(c.missCount)/total*100, c.notFoundOrErrorCount,
state.GetCalls, state.SetCalls, state.Misses, state.EntriesCount, state.BytesSize,
)
}
func (c *StateDBCacheDatabase) ReplayCache(batch ethdb.Batch) error {
return c.cacheWrite(batch)
}