change cache folder in sync and move it under data folder (#4438)

pull/4439/head
Gheis Mohammadi 2 years ago committed by GitHub
parent c4427231a6
commit 3d4bf3f49c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      api/service/stagedstreamsync/downloader.go
  2. 4
      api/service/stagedstreamsync/downloaders.go
  3. 4
      api/service/stagedstreamsync/service.go
  4. 16
      api/service/stagedstreamsync/syncing.go
  5. 10
      api/service/stagedsync/stage_blockhashes.go
  6. 10
      api/service/stagedsync/stage_bodies.go
  7. 12
      api/service/stagedsync/syncing.go
  8. 3
      cmd/harmony/main.go
  9. 3
      node/node_syncing.go

@ -37,7 +37,7 @@ type (
)
// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config Config) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues()
sp := sync.NewProtocol(sync.Config{
@ -68,7 +68,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config
ctx, cancel := context.WithCancel(context.Background())
//TODO: use mem db should be in config file
stagedSyncInstance, err := CreateStagedSync(ctx, bc, false, isBeaconNode, sp, config, logger, config.LogProgress)
stagedSyncInstance, err := CreateStagedSync(ctx, bc, dbDir, false, isBeaconNode, sp, config, logger, config.LogProgress)
if err != nil {
cancel()
return nil

@ -15,7 +15,7 @@ type Downloaders struct {
}
// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, config Config) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
@ -25,7 +25,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, config Config) *Downlo
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, isBeaconNode, config)
ds[bc.ShardID()] = NewDownloader(host, bc, dbDir, isBeaconNode, config)
}
return &Downloaders{
ds: ds,

@ -11,9 +11,9 @@ type StagedStreamSyncService struct {
}
// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, config Config) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, config),
Downloaders: NewDownloaders(host, bcs, dbDir, config),
}
}

@ -3,6 +3,7 @@ package stagedstreamsync
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"
@ -37,6 +38,7 @@ var Buckets = []string{
// CreateStagedSync creates an instance of staged sync
func CreateStagedSync(ctx context.Context,
bc core.BlockChain,
dbDir string,
UseMemDB bool,
isBeaconNode bool,
protocol syncProtocol,
@ -55,9 +57,9 @@ func CreateStagedSync(ctx context.Context,
dbs[i] = memdb.New()
}
} else {
mainDB = mdbx.NewMDBX(log.New()).Path(GetBlockDbPath(isBeacon, -1)).MustOpen()
mainDB = mdbx.NewMDBX(log.New()).Path(GetBlockDbPath(isBeacon, -1, dbDir)).MustOpen()
for i := 0; i < config.Concurrency; i++ {
dbPath := GetBlockDbPath(isBeacon, i)
dbPath := GetBlockDbPath(isBeacon, i, dbDir)
dbs[i] = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
}
}
@ -137,18 +139,18 @@ func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int)
}
// GetBlockDbPath returns the path of the cache database which stores blocks
func GetBlockDbPath(beacon bool, loopID int) string {
func GetBlockDbPath(beacon bool, loopID int, dbDir string) string {
if beacon {
if loopID >= 0 {
return fmt.Sprintf("%s_%d", "cache/beacon_blocks_db", loopID)
return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/beacon_blocks_db"), loopID)
} else {
return "cache/beacon_blocks_db_main"
return filepath.Join(dbDir, "cache/beacon_blocks_db_main")
}
} else {
if loopID >= 0 {
return fmt.Sprintf("%s_%d", "cache/blocks_db", loopID)
return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/blocks_db"), loopID)
} else {
return "cache/blocks_db_main"
return filepath.Join(dbDir, "cache/blocks_db_main")
}
}
}

@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"path/filepath"
"strconv"
"time"
@ -37,8 +38,8 @@ func NewStageBlockHashes(cfg StageBlockHashesCfg) *StageBlockHashes {
}
}
func NewStageBlockHashesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, isBeacon bool, turbo bool, logProgress bool) StageBlockHashesCfg {
cachedb, err := initHashesCacheDB(ctx, isBeacon)
func NewStageBlockHashesCfg(ctx context.Context, bc core.BlockChain, dbDir string, db kv.RwDB, isBeacon bool, turbo bool, logProgress bool) StageBlockHashesCfg {
cachedb, err := initHashesCacheDB(ctx, dbDir, isBeacon)
if err != nil {
panic("can't initialize sync caches")
}
@ -53,13 +54,14 @@ func NewStageBlockHashesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB,
}
}
func initHashesCacheDB(ctx context.Context, isBeacon bool) (db kv.RwDB, err error) {
func initHashesCacheDB(ctx context.Context, dbDir string, isBeacon bool) (db kv.RwDB, err error) {
// create caches db
cachedbName := BlockHashesCacheDB
if isBeacon {
cachedbName = "beacon_" + cachedbName
}
cachedb := mdbx.NewMDBX(log.New()).Path(cachedbName).MustOpen()
dbPath := filepath.Join(dbDir, cachedbName)
cachedb := mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
// create transaction on cachedb
tx, errRW := cachedb.BeginRw(ctx)
if errRW != nil {

@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"path/filepath"
"strconv"
"sync"
"time"
@ -37,8 +38,8 @@ func NewStageBodies(cfg StageBodiesCfg) *StageBodies {
}
}
func NewStageBodiesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, isBeacon bool, turbo bool, logProgress bool) StageBodiesCfg {
cachedb, err := initBlocksCacheDB(ctx, isBeacon)
func NewStageBodiesCfg(ctx context.Context, bc core.BlockChain, dbDir string, db kv.RwDB, isBeacon bool, turbo bool, logProgress bool) StageBodiesCfg {
cachedb, err := initBlocksCacheDB(ctx, dbDir, isBeacon)
if err != nil {
panic("can't initialize sync caches")
}
@ -53,13 +54,14 @@ func NewStageBodiesCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB, isBe
}
}
func initBlocksCacheDB(ctx context.Context, isBeacon bool) (db kv.RwDB, err error) {
func initBlocksCacheDB(ctx context.Context, dbDir string, isBeacon bool) (db kv.RwDB, err error) {
// create caches db
cachedbName := BlockCacheDB
if isBeacon {
cachedbName = "beacon_" + cachedbName
}
cachedb := mdbx.NewMDBX(log.New()).Path(cachedbName).MustOpen()
dbPath := filepath.Join(dbDir, cachedbName)
cachedb := mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
tx, errRW := cachedb.BeginRw(ctx)
if errRW != nil {
utils.Logger().Error().

@ -3,6 +3,7 @@ package stagedsync
import (
"context"
"fmt"
"path/filepath"
"time"
"github.com/c2h5oh/datasize"
@ -50,6 +51,7 @@ func CreateStagedSync(
port string,
peerHash [20]byte,
bc core.BlockChain,
dbDir string,
role nodeconfig.Role,
isExplorer bool,
TurboMode bool,
@ -82,9 +84,11 @@ func CreateStagedSync(
db = mdbx.NewMDBX(log.New()).MapSize(dbMapSize).InMem("cache_db").MustOpen()
} else {
if isBeacon {
db = mdbx.NewMDBX(log.New()).Path("cache_beacon_db").MustOpen()
dbPath := filepath.Join(dbDir, "cache_beacon_db")
db = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
} else {
db = mdbx.NewMDBX(log.New()).Path("cache_shard_db").MustOpen()
dbPath := filepath.Join(dbDir, "cache_shard_db")
db = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
}
}
@ -93,8 +97,8 @@ func CreateStagedSync(
}
headsCfg := NewStageHeadersCfg(ctx, bc, db)
blockHashesCfg := NewStageBlockHashesCfg(ctx, bc, db, isBeacon, TurboMode, logProgress)
bodiesCfg := NewStageBodiesCfg(ctx, bc, db, isBeacon, TurboMode, logProgress)
blockHashesCfg := NewStageBlockHashesCfg(ctx, bc, dbDir, db, isBeacon, TurboMode, logProgress)
bodiesCfg := NewStageBodiesCfg(ctx, bc, dbDir, db, isBeacon, TurboMode, logProgress)
statesCfg := NewStageStatesCfg(ctx, bc, db, logProgress)
lastMileCfg := NewStageLastMileCfg(ctx, bc, db)
finishCfg := NewStageFinishCfg(ctx, db)

@ -939,9 +939,8 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
InsertHook: node.BeaconSyncHook,
}
}
//Setup stream sync service
s := stagedstreamsync.NewService(host, blockchains, sConfig)
s := stagedstreamsync.NewService(host, blockchains, sConfig, hc.General.DataDir)
node.RegisterService(service.StagedStreamSync, s)

@ -109,9 +109,10 @@ func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync {
mutatedPort := strconv.Itoa(mySyncPort + legacysync.SyncingPortDifference)
role := node.NodeConfig.Role()
isExplorer := node.NodeConfig.Role() == nodeconfig.ExplorerNode
dbDir := node.NodeConfig.DBDir
if s, err := stagedsync.CreateStagedSync(node.SelfPeer.IP, mutatedPort,
node.GetSyncID(), bc, role, isExplorer,
node.GetSyncID(), bc, dbDir, role, isExplorer,
node.NodeConfig.StagedSyncTurboMode,
node.NodeConfig.UseMemDB,
node.NodeConfig.DoubleCheckBlockHashes,

Loading…
Cancel
Save