fix GetNextBatch to complete sync after there is no more pending states,remove extra comments, cleanup and goimports

pull/4465/head
“GheisMohammadi” 1 year ago
parent 99928257d0
commit c340c704ba
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 97
      api/service/stagedstreamsync/satate_sync.go

@ -109,9 +109,6 @@ type accountTask struct {
Last common.Hash // Last account to sync in this interval
SubTasks map[common.Hash][]*storageTask // Storage intervals needing fetching for large contracts
// These fields are internals used during runtime
//req *accountRequest // Pending request to fill this task
//res *accountResponse // Validate response filling this task
pend int // Number of pending subtasks for this round
needCode []bool // Flags whether the filling accounts need code retrieval
@ -135,10 +132,8 @@ type accountTask struct {
// the database that's going to be filled with the internal nodes on commit.
type accountResponse struct {
task *accountTask // Task which this request is filling
hashes []common.Hash // Account hashes in the returned range
accounts []*types.StateAccount // Expanded accounts in the returned range
cont bool // Whether the account range has a continuation
}
@ -146,14 +141,9 @@ type accountResponse struct {
type storageTask struct {
Next common.Hash // Next account to sync in this interval
Last common.Hash // Last account to sync in this interval
// These fields are internals used during runtime
root common.Hash // Storage root hash for this instance
//req *storageTaskBundleuest // Pending request to fill this task
genBatch ethdb.Batch // Batch used by the node generator
genTrie *trie.StackTrie // Node generator from storage slots
requested bool
done bool // Flag whether the task can be removed
}
@ -200,7 +190,7 @@ func (t *healRequestSort) Swap(i, j int) {
// Merge merges the pathsets, so that several storage requests concerning the
// same account are merged into one, to reduce bandwidth.
// OBS: This operation is moot if t has not first been sorted.
// This operation is moot if t has not first been sorted.
func (t *healRequestSort) Merge() []TrieNodePathSet {
var result []TrieNodePathSet
for _, path := range t.syncPaths {
@ -280,7 +270,6 @@ func (t *tasks) deleteAccountTask(accountTaskIndex uint64) {
if _, ok := t.accountTasks[accountTaskIndex]; ok {
delete(t.accountTasks, accountTaskIndex)
}
// t.accountTasks = append(t.accountTasks[:accountTaskIndex], t.accountTasks[accountTaskIndex+1:]...)
}
func (t *tasks) addCodeTask(h common.Hash) {
@ -375,7 +364,6 @@ type FullStateDownloadManager struct {
root common.Hash // Current state trie root being synced
snapped bool // Flag to signal that snap phase is done
// healer *healTask // Current state healing task being executed
protocol syncProtocol
scheduler *trie.Sync // State trie sync scheduler defining the tasks
@ -444,7 +432,6 @@ func (s *FullStateDownloadManager) setRootHash(root common.Hash) {
s.root = root
s.scheduler = state.NewStateSync(root, s.db, s.onHealState, s.scheme)
s.loadSyncStatus()
// s.sched = state.NewStateSync(root, s.bc.ChainDb(), nil, rawdb.HashScheme)
}
func (s *FullStateDownloadManager) taskDone(taskID uint64) {
@ -554,33 +541,7 @@ func (s *FullStateDownloadManager) commitHealer(force bool) {
utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data")
}
// getNextBatch returns objects with a maximum of n state download
// tasks to send to the remote peer.
func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask,
codes []common.Hash,
storages *storageTaskBundle,
healtask *healTask,
codetask *healTask,
err error) {
s.lock.Lock()
defer s.lock.Unlock()
cap := StatesPerRequest
accounts, codes, storages, healtask, codetask = s.getBatchFromRetries(cap)
nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes)
cap -= nItems
if cap == 0 {
return
}
if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 {
utils.Logger().Debug().Msg("Snapshot sync already completed")
return
}
func (s *FullStateDownloadManager) SyncCompleted() {
defer func() { // Persist any progress, independent of failure
for _, task := range s.tasks.accountTasks {
s.forwardAccountTask(task)
@ -605,27 +566,50 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask,
utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle")
}()
// Refill available tasks from the scheduler.
if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 {
utils.Logger().Debug().Msg("Snapshot sync already completed")
}
// getNextBatch returns objects with a maximum of n state download
// tasks to send to the remote peer.
func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask,
codes []common.Hash,
storages *storageTaskBundle,
healtask *healTask,
codetask *healTask,
err error) {
s.lock.Lock()
defer s.lock.Unlock()
cap := StatesPerRequest
accounts, codes, storages, healtask, codetask = s.getBatchFromRetries(cap)
nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes)
cap -= nItems
if cap == 0 {
return
}
// if err = s.fillTasks(cap); err != nil {
// return
// }
if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 {
if nItems == 0 {
s.SyncCompleted()
}
return
}
includeHealtasks := true
// Refill available tasks from the scheduler.
withHealTasks := true
if healtask != nil || codetask != nil {
includeHealtasks = false
withHealTasks = false
}
newAccounts, newCodes, newStorageTaskBundle, unprocessedHealtask, unprocessedCodetask := s.getBatchFromUnprocessed(cap, includeHealtasks)
newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask := s.getBatchFromUnprocessed(cap, withHealTasks)
accounts = append(accounts, newAccounts...)
codes = append(codes, newCodes...)
storages = newStorageTaskBundle
if includeHealtasks {
healtask = unprocessedHealtask
codetask = unprocessedCodetask
if withHealTasks {
healtask = newHealTask
codetask = newCodeTask
}
return
@ -690,7 +674,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() {
}
s.tasks.accountTasks = progress.Tasks
for _, task := range s.tasks.accountTasks {
// task := task // closure for task.genBatch in the stacktrie writer callback
task := task // closure for task.genBatch in the stacktrie writer callback
task.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
@ -810,11 +794,8 @@ func (s *FullStateDownloadManager) cleanAccountTasks() {
return
}
// Sync wasn't finished previously, check for any task that can be finalized
//for i := 0; i < len(s.tasks.accountTasks); i++ {
for taskID, _ := range s.tasks.accountTasks {
if s.tasks.accountTasks[taskID].done {
//s.tasks.accountTasks = append(s.tasks.accountTasks[:i], s.tasks.accountTasks[i+1:]...)
//i--
s.tasks.deleteAccountTask(taskID)
}
}
@ -953,7 +934,7 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in
// getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download
// tasks to send to the remote peer.
func (s *FullStateDownloadManager) getBatchFromUnprocessed(n int, includeHealtasks bool) (
func (s *FullStateDownloadManager) getBatchFromUnprocessed(n int, withHealTasks bool) (
accounts []*accountTask,
codes []common.Hash,
storages *storageTaskBundle,
@ -1093,7 +1074,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(n int, includeHealtas
return
}
if !includeHealtasks {
if !withHealTasks {
return
}

Loading…
Cancel
Save