@ -4,13 +4,15 @@ import (
"context"
"fmt"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
@ -38,41 +40,53 @@ var Buckets = []string{
// CreateStagedSync creates an instance of staged sync
func CreateStagedSync ( ctx context . Context ,
bc core . BlockChain ,
consensus * consensus . Consensus ,
dbDir string ,
UseMemDB bool ,
isBeaconNode bool ,
protocol syncProtocol ,
config Config ,
logger zerolog . Logger ,
logProgress bool ,
) ( * StagedStreamSync , error ) {
isBeacon := bc . ShardID ( ) == shard . BeaconChainShardID
logger . Info ( ) .
Uint32 ( "shard" , bc . ShardID ( ) ) .
Bool ( "beaconNode" , isBeaconNode ) .
Bool ( "memdb" , config . UseMemDB ) .
Str ( "dbDir" , dbDir ) .
Bool ( "serverOnly" , config . ServerOnly ) .
Int ( "minStreams" , config . MinStreams ) .
Msg ( WrapStagedSyncMsg ( "creating staged sync" ) )
var mainDB kv . RwDB
dbs := make ( [ ] kv . RwDB , config . Concurrency )
if UseMemDB {
mainDB = memdb . New ( getMemDbTempPath ( dbDir , - 1 ) )
if config . UseMemDB {
mainDB = memdb . New ( getBlockDbPath ( bc . ShardID ( ) , isBeaconNode , - 1 , dbDir ) )
for i := 0 ; i < config . Concurrency ; i ++ {
dbs [ i ] = memdb . New ( getMemDbTempPath ( dbDir , i ) )
dbPath := getBlockDbPath ( bc . ShardID ( ) , isBeaconNode , i , dbDir )
dbs [ i ] = memdb . New ( dbPath )
}
} else {
mainDB = mdbx . NewMDBX ( log . New ( ) ) . Path ( getBlockDbPath ( isBeacon , - 1 , dbDir ) ) . MustOpen ( )
logger . Info ( ) .
Str ( "path" , getBlockDbPath ( bc . ShardID ( ) , isBeaconNode , - 1 , dbDir ) ) .
Msg ( WrapStagedSyncMsg ( "creating main db" ) )
mainDB = mdbx . NewMDBX ( log . New ( ) ) . Path ( getBlockDbPath ( bc . ShardID ( ) , isBeaconNode , - 1 , dbDir ) ) . MustOpen ( )
for i := 0 ; i < config . Concurrency ; i ++ {
dbPath := getBlockDbPath ( isBeacon , i , dbDir )
dbPath := getBlockDbPath ( bc . ShardID ( ) , isBeaconNode , i , dbDir )
dbs [ i ] = mdbx . NewMDBX ( log . New ( ) ) . Path ( dbPath ) . MustOpen ( )
}
}
if errInitDB := initDB ( ctx , mainDB , dbs , config . Concurrency ) ; errInitDB != nil {
logger . Error ( ) . Err ( errInitDB ) . Msg ( "create staged sync instance failed" )
return nil , errInitDB
}
stageHeadsCfg := NewStageHeadersCfg ( bc , mainDB )
stageShortRangeCfg := NewStageShortRangeCfg ( bc , mainDB )
stageSyncEpochCfg := NewStageEpochCfg ( bc , mainDB )
stageBodiesCfg := NewStageBodiesCfg ( bc , mainDB , dbs , config . Concurrency , protocol , isBeacon , logProgress )
stageStatesCfg := NewStageStatesCfg ( bc , mainDB , dbs , config . Concurrency , logger , logProgress )
stageBodiesCfg := NewStageBodiesCfg ( bc , mainDB , dbs , config . Concurrency , protocol , isBeaconNode , config . LogProgress )
stageStatesCfg := NewStageStatesCfg ( bc , mainDB , dbs , config . Concurrency , logger , config . LogProgress )
lastMileCfg := NewStageLastMileCfg ( ctx , bc , mainDB )
stageFinishCfg := NewStageFinishCfg ( mainDB )
stages := DefaultStages ( ctx ,
@ -81,17 +95,27 @@ func CreateStagedSync(ctx context.Context,
stageShortRangeCfg ,
stageBodiesCfg ,
stageStatesCfg ,
lastMileCfg ,
stageFinishCfg ,
)
logger . Info ( ) .
Uint32 ( "shard" , bc . ShardID ( ) ) .
Bool ( "beaconNode" , isBeaconNode ) .
Bool ( "memdb" , config . UseMemDB ) .
Str ( "dbDir" , dbDir ) .
Bool ( "serverOnly" , config . ServerOnly ) .
Int ( "minStreams" , config . MinStreams ) .
Msg ( WrapStagedSyncMsg ( "staged sync created successfully" ) )
return New (
bc ,
consensus ,
mainDB ,
stages ,
isBeacon ,
isBeaconNode ,
protocol ,
isBeaconNode ,
UseMemDB ,
config ,
logger ,
) , nil
@ -147,7 +171,7 @@ func getMemDbTempPath(dbDir string, dbIndex int) string {
}
// getBlockDbPath returns the path of the cache database which stores blocks
func getBlockDbPath ( beacon bool , loopID int , dbDir string ) string {
func getBlockDbPath ( shardID uint32 , beacon bool , loopID int , dbDir string ) string {
if beacon {
if loopID >= 0 {
return fmt . Sprintf ( "%s_%d" , filepath . Join ( dbDir , "cache/beacon_blocks_db" ) , loopID )
@ -156,30 +180,57 @@ func getBlockDbPath(beacon bool, loopID int, dbDir string) string {
}
} else {
if loopID >= 0 {
return fmt . Sprintf ( "%s_%d" , filepath . Join ( dbDir , "cache/blocks_db" ) , loopID )
return fmt . Sprintf ( "%s_%d_%d " , filepath . Join ( dbDir , "cache/blocks_db" ) , shardID , loopID )
} else {
return filepath . Join ( dbDir , "cache/blocks_db_main" )
return fmt . Sprintf ( "%s_%d" , f ilepath . Join ( dbDir , "cache/blocks_db_main" ) , shardID )
}
}
}
func ( s * StagedStreamSync ) Debug ( source string , msg interface { } ) {
// only log the msg in debug mode
if ! s . config . DebugMode {
return
}
pc , _ , _ , _ := runtime . Caller ( 1 )
caller := runtime . FuncForPC ( pc ) . Name ( )
callerParts := strings . Split ( caller , "/" )
if len ( callerParts ) > 0 {
caller = callerParts [ len ( callerParts ) - 1 ]
}
src := source
if src == "" {
src = "message"
}
// SSSD: STAGED STREAM SYNC DEBUG
if msg == nil {
fmt . Printf ( "[SSSD:%s] %s: nil or no error\n" , caller , src )
} else if err , ok := msg . ( error ) ; ok {
fmt . Printf ( "[SSSD:%s] %s: %s\n" , caller , src , err . Error ( ) )
} else if str , ok := msg . ( string ) ; ok {
fmt . Printf ( "[SSSD:%s] %s: %s\n" , caller , src , str )
} else {
fmt . Printf ( "[SSSD:%s] %s: %v\n" , caller , src , msg )
}
}
// doSync does the long range sync.
// One LongRangeSync consists of several iterations.
// For each iteration, estimate the current block number, then fetch block & insert to blockchain
func ( s * StagedStreamSync ) doSync ( downloaderContext context . Context , initSync bool ) ( int , error ) {
func ( s * StagedStreamSync ) doSync ( downloaderContext context . Context , initSync bool ) ( uint64 , int , error ) {
var totalInserted int
s . initSync = initSync
if err := s . checkPrerequisites ( ) ; err != nil {
return 0 , err
return 0 , 0 , err
}
var estimatedHeight uint64
if initSync {
if h , err := s . estimateCurrentNumber ( downloaderContext ) ; err != nil {
return 0 , err
return 0 , 0 , err
} else {
estimatedHeight = h
//TODO: use directly currentCycle var
@ -187,36 +238,70 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
}
if curBN := s . bc . CurrentBlock ( ) . NumberU64 ( ) ; estimatedHeight <= curBN {
s . logger . Info ( ) . Uint64 ( "current number" , curBN ) . Uint64 ( "target number" , estimatedHeight ) .
Msg ( WrapStagedSyncMsg ( "early return of long range sync" ) )
return 0 , nil
Msg ( WrapStagedSyncMsg ( "early return of long range sync (chain is already ahead of target height)" ) )
return estimatedHeight , 0 , nil
}
}
s . startSyncing ( )
defer s . finishSyncing ( )
}
for {
ctx , cancel := context . WithCancel ( downloaderContext )
n , err := s . doSyncCycle ( ctx , initSync )
if err != nil {
utils . Logger ( ) . Error ( ) .
Err ( err ) .
Bool ( "initSync" , s . initSync ) .
Bool ( "isBeacon" , s . isBeacon ) .
Uint32 ( "shard" , s . bc . ShardID ( ) ) .
Msg ( WrapStagedSyncMsg ( "sync cycle failed" ) )
pl := s . promLabels ( )
pl [ "error" ] = err . Error ( )
numFailedDownloadCounterVec . With ( pl ) . Inc ( )
cancel ( )
return totalInserted + n , err
return estimatedHeight , totalInserted + n , err
}
cancel ( )
totalInserted += n
// if it's not long range sync, skip loop
if n < LastMileBlocksThreshold || ! initSync {
return totalInserted , nil
if n == 0 || ! initSync {
break
}
}
if totalInserted > 0 {
utils . Logger ( ) . Info ( ) .
Bool ( "initSync" , s . initSync ) .
Bool ( "isBeacon" , s . isBeacon ) .
Uint32 ( "shard" , s . bc . ShardID ( ) ) .
Int ( "blocks" , totalInserted ) .
Msg ( WrapStagedSyncMsg ( "sync cycle blocks inserted successfully" ) )
}
// add consensus last mile blocks
if s . consensus != nil {
if hashes , err := s . addConsensusLastMile ( s . Blockchain ( ) , s . consensus ) ; err != nil {
utils . Logger ( ) . Error ( ) . Err ( err ) .
Msg ( "[STAGED_STREAM_SYNC] Add consensus last mile failed" )
s . RollbackLastMileBlocks ( downloaderContext , hashes )
return estimatedHeight , totalInserted , err
} else {
totalInserted += len ( hashes )
}
// TODO: move this to explorer handler code.
if s . isExplorer {
s . consensus . UpdateConsensusInformation ( )
}
}
s . purgeLastMileBlocksFromCache ( )
return estimatedHeight , totalInserted , nil
}
func ( s * StagedStreamSync ) doSyncCycle ( ctx context . Context , initSync bool ) ( int , error ) {