@ -108,6 +108,11 @@ var (
type accountTask struct {
id uint64 //unique id for account task
root common . Hash
origin common . Hash
limit common . Hash
cap int
// These fields get serialized to leveldb on shutdown
Next common . Hash // Next account to sync in this interval
Last common . Hash // Last account to sync in this interval
@ -229,16 +234,19 @@ type byteCodeTasksBundle struct {
id uint64 //unique id for bytecode task bundle
task * accountTask
hashes [ ] common . Hash
cap int
}
type storageTaskBundle struct {
id uint64 //unique id for storage task bundle
root common . Hash
accounts [ ] common . Hash
roots [ ] common . Hash
mainTask * accountTask
subtask * storageTask
origin common . Hash
limit common . Hash
cap int
}
// healTask represents the sync task for healing the snap-synced chunk boundaries.
@ -251,6 +259,7 @@ type healTask struct {
pathsets [ ] * message . TrieNodePathSet
task * healTask
root common . Hash
bytes int
byteCodeReq bool
}
@ -259,7 +268,6 @@ type tasks struct {
storageTasks map [ uint64 ] * storageTaskBundle // Set of trie node tasks currently queued for retrieval, indexed by path
codeTasks map [ uint64 ] * byteCodeTasksBundle // Set of byte code tasks currently queued for retrieval, indexed by hash
healer map [ uint64 ] * healTask
snapped bool // Flag to signal that snap phase is done
}
func newTasks ( ) * tasks {
@ -268,7 +276,6 @@ func newTasks() *tasks {
storageTasks : make ( map [ uint64 ] * storageTaskBundle , 0 ) ,
codeTasks : make ( map [ uint64 ] * byteCodeTasksBundle ) ,
healer : make ( map [ uint64 ] * healTask , 0 ) ,
snapped : false ,
}
}
@ -399,8 +406,6 @@ type FullStateDownloadManager struct {
storageSynced uint64 // Number of storage slots downloaded
storageBytes common . StorageSize // Number of storage trie bytes persisted to disk
pend sync . WaitGroup // Tracks network request goroutines for graceful shutdown
stateWriter ethdb . Batch // Shared batch writer used for persisting raw states
accountHealed uint64 // Number of accounts downloaded during the healing stage
accountHealedBytes common . StorageSize // Number of raw account bytes persisted to disk during the healing stage
@ -420,6 +425,9 @@ type FullStateDownloadManager struct {
bytecodeHealBytes common . StorageSize // Number of bytecodes persisted to disk
bytecodeHealDups uint64 // Number of bytecodes already processed
bytecodeHealNops uint64 // Number of bytecodes not requested
startTime time . Time // Time instance when snapshot sync started
logTime time . Time // Time instance when status was last reported
}
func newFullStateDownloadManager ( db ethdb . KeyValueStore ,
@ -430,18 +438,19 @@ func newFullStateDownloadManager(db ethdb.KeyValueStore,
logger zerolog . Logger ) * FullStateDownloadManager {
return & FullStateDownloadManager {
db : db ,
scheme : scheme ,
bc : bc ,
stateWriter : db . NewBatch ( ) ,
tx : tx ,
keccak : sha3 . NewLegacyKeccak256 ( ) . ( crypto . KeccakState ) ,
concurrency : concurrency ,
logger : logger ,
tasks : newTasks ( ) ,
requesting : newTasks ( ) ,
processing : newTasks ( ) ,
retries : newTasks ( ) ,
db : db ,
scheme : scheme ,
bc : bc ,
stateWriter : db . NewBatch ( ) ,
tx : tx ,
keccak : sha3 . NewLegacyKeccak256 ( ) . ( crypto . KeccakState ) ,
concurrency : concurrency ,
logger : logger ,
tasks : newTasks ( ) ,
requesting : newTasks ( ) ,
processing : newTasks ( ) ,
retries : newTasks ( ) ,
trienodeHealThrottle : maxTrienodeHealThrottle , // Tune downward instead of insta-filling with junk
}
}
@ -531,6 +540,12 @@ func (s *FullStateDownloadManager) commitHealer(force bool) {
utils . Logger ( ) . Debug ( ) . Str ( "type" , "trienodes" ) . Interface ( "bytes" , common . StorageSize ( batch . ValueSize ( ) ) ) . Msg ( "Persisted set of healing data" )
}
func ( s * FullStateDownloadManager ) SyncStarted ( ) {
if s . startTime == ( time . Time { } ) {
s . startTime = time . Now ( )
}
}
func ( s * FullStateDownloadManager ) SyncCompleted ( ) {
defer func ( ) { // Persist any progress, independent of failure
for _ , task := range s . tasks . accountTasks {
@ -556,7 +571,8 @@ func (s *FullStateDownloadManager) SyncCompleted() {
utils . Logger ( ) . Debug ( ) . Interface ( "root" , s . root ) . Msg ( "Terminating snapshot sync cycle" )
} ( )
utils . Logger ( ) . Debug ( ) . Msg ( "Snapshot sync already completed" )
elapsed := time . Since ( s . startTime )
utils . Logger ( ) . Debug ( ) . Interface ( "elapsed" , elapsed ) . Msg ( "Snapshot sync already completed" )
}
// getNextBatch returns objects with a maximum of n state download
@ -566,38 +582,30 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask,
storages * storageTaskBundle ,
healtask * healTask ,
codetask * healTask ,
nItems int ,
err error ) {
s . lock . Lock ( )
defer s . lock . Unlock ( )
accounts , codes , storages , healtask , codetask = s . getBatchFromRetries ( )
nItems := len ( accounts ) + len ( codes ) + len ( storages . roots ) + len ( healtask . hashes ) + len ( codetask . hashes )
accounts , codes , storages , healtask , codetask , nItems = s . getBatchFromRetries ( )
if nItems > 0 {
return
}
if len ( s . tasks . accountTasks ) == 0 && s . scheduler . Pending ( ) == 0 {
if nItems == 0 {
s . SyncCompleted ( )
}
s . SyncCompleted ( )
return
}
// Refill available tasks from the scheduler.
withHealTasks := true
if healtask != nil || codetask != nil {
withHealTasks = false
}
newAccounts , newCodes , newStorageTaskBundle , newHealTask , newCodeTask := s . getBatchFromUnprocessed ( withHealTasks )
newAccounts , newCodes , newStorageTaskBundle , newHealTask , newCodeTask , nItems := s . getBatchFromUnprocessed ( )
accounts = append ( accounts , newAccounts ... )
codes = append ( codes , newCodes ... )
storages = newStorageTaskBundle
if withHealTasks {
healtask = newHealTask
codetask = newCodeTask
}
healtask = newHealTask
codetask = newCodeTask
return
}
@ -714,7 +722,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() {
// Either we've failed to decode the previous state, or there was none.
// Start a fresh sync by chunking up the account range and scheduling
// them for retrieval.
s . tasks . accountTasks = nil
s . tasks = newTasks ( )
s . accountSynced , s . accountBytes = 0 , 0
s . bytecodeSynced , s . bytecodeBytes = 0 , 0
s . storageSynced , s . storageBytes = 0 , 0
@ -921,16 +929,18 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in
// getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download
// tasks to send to the remote peer.
func ( s * FullStateDownloadManager ) getBatchFromUnprocessed ( withHealTasks bool ) (
func ( s * FullStateDownloadManager ) getBatchFromUnprocessed ( ) (
accounts [ ] * accountTask ,
codes [ ] * byteCodeTasksBundle ,
storages * storageTaskBundle ,
healtask * healTask ,
codetask * healTask ) {
codetask * healTask ,
count int ) {
// over trie nodes as those can be written to disk and forgotten about.
codes = make ( [ ] * byteCodeTasksBundle , 0 )
accounts = make ( [ ] * accountTask , 0 )
count = 0
for i , task := range s . tasks . accountTasks {
// Stop when we've gathered enough requests
@ -956,12 +966,18 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
break
}
task . root = s . root
task . origin = task . Next
task . limit = task . Last
task . cap = maxRequestSize
task . requested = true
s . tasks . accountTasks [ i ] . requested = true
accounts = append ( accounts , task )
s . requesting . addAccountTask ( task . id , task )
s . tasks . addAccountTask ( task . id , task )
// one task account is enough for an stream
count = len ( accounts )
return
}
@ -997,6 +1013,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
id : taskID ,
hashes : hashes ,
task : task ,
cap : maxRequestSize ,
}
codes = append ( codes , bytecodeTask )
@ -1005,12 +1022,14 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// Stop when we've gathered enough requests
if totalHashes >= maxCodeRequestCount {
count = totalHashes
return
}
}
// if we found some codes, can assign it to node
if totalHashes > 0 {
count = totalHashes
return
}
@ -1020,14 +1039,8 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
continue
}
// TODO: check cap calculations (shouldn't give us big chunk)
// if cap > maxRequestSize {
// cap = maxRequestSize
// }
// if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
// cap = minRequestSize
// }
storageSets := maxRequestSize / 1024
cap := maxRequestSize
storageSets := cap / 1024
storages = & storageTaskBundle {
accounts : make ( [ ] common . Hash , 0 , storageSets ) ,
@ -1089,23 +1102,21 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
storages . origin = storages . subtask . Next
storages . limit = storages . subtask . Last
}
storages . root = s . root
storages . cap = cap
s . tasks . addStorageTaskBundle ( taskID , storages )
s . requesting . addStorageTaskBundle ( taskID , storages )
count = len ( storages . accounts )
return
}
if len ( storages . accounts ) > 0 {
return
}
if ! withHealTasks {
count = len ( storages . accounts )
return
}
// Sync phase done, run heal phase
// Iterate over pending tasks and try to find a peer to retrieve with
// Iterate over pending tasks
for ( len ( s . tasks . healer ) > 0 && len ( s . tasks . healer [ 0 ] . hashes ) > 0 ) || s . scheduler . Pending ( ) > 0 {
// If there are not enough trie tasks queued to fully assign, fill the
// queue from the state sync scheduler. The trie synced schedules these
@ -1129,7 +1140,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// If all the heal tasks are bytecodes or already downloading, bail
if len ( s . tasks . healer [ 0 ] . trieTasks ) == 0 {
return
break
}
// Generate the network query and send it to the peer
// if cap > maxTrieRequestCount {
@ -1177,6 +1188,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
pathsets : pathsets ,
root : s . root ,
task : s . tasks . healer [ 0 ] ,
bytes : maxRequestSize ,
byteCodeReq : false ,
}
@ -1184,6 +1196,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
s . requesting . addHealerTask ( taskID , healtask )
if len ( hashes ) > 0 {
count = len ( hashes )
return
}
}
@ -1205,7 +1218,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
// If all the heal tasks are trienodes or already downloading, bail
if len ( s . tasks . healer [ 0 ] . codeTasks ) == 0 {
return
break
}
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
@ -1243,9 +1256,10 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(withHealTasks bool) (
id : taskID ,
hashes : hashes ,
task : s . tasks . healer [ 0 ] ,
bytes : maxRequestSize ,
byteCodeReq : true ,
}
count = len ( hashes )
s . tasks . healer [ taskID ] = codetask
s . requesting . addHealerTask ( taskID , healtask )
}
@ -1272,7 +1286,8 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
codes [ ] * byteCodeTasksBundle ,
storages * storageTaskBundle ,
healtask * healTask ,
codetask * healTask ) {
codetask * healTask ,
count int ) {
// over trie nodes as those can be written to disk and forgotten about.
accounts = make ( [ ] * accountTask , 0 )
@ -1290,6 +1305,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
if len ( accounts ) > 0 {
count = len ( accounts )
return
}
@ -1301,6 +1317,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
if len ( codes ) > 0 {
count = len ( codes )
return
}
@ -1316,10 +1333,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
s . requesting . addStorageTaskBundle ( storages . id , storages )
s . retries . deleteStorageTaskBundle ( storages . id )
return
}
if len ( storages . accounts ) > 0 {
count = len ( storages . accounts )
return
}
@ -1338,6 +1352,7 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
s . requesting . addHealerTask ( id , task )
s . retries . deleteHealerTask ( id )
count = len ( task . hashes )
return
}
if task . byteCodeReq {
@ -1352,11 +1367,13 @@ func (s *FullStateDownloadManager) getBatchFromRetries() (
}
s . requesting . addHealerTask ( id , task )
s . retries . deleteHealerTask ( id )
count = len ( task . hashes )
return
}
}
}
count = 0
return
}
@ -1371,14 +1388,18 @@ func (s *FullStateDownloadManager) HandleRequestError(accounts []*accountTask,
s . lock . Lock ( )
defer s . lock . Unlock ( )
for _ , task := range accounts {
s . requesting . deleteAccountTask ( task . id )
s . retries . addAccountTask ( task . id , task )
if accounts != nil && len ( accounts ) > 0 {
for _ , task := range accounts {
s . requesting . deleteAccountTask ( task . id )
s . retries . addAccountTask ( task . id , task )
}
}
for _ , code := range codes {
s . requesting . deleteCodeTask ( code . id )
s . retries . addCodeTask ( code . id , code )
if codes != nil && len ( codes ) > 0 {
for _ , code := range codes {
s . requesting . deleteCodeTask ( code . id )
s . retries . addCodeTask ( code . id , code )
}
}
if storages != nil {