From c340c704ba6928787ccdd7ff2c4903d7dfad2650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 16 Nov 2023 20:55:52 +0800 Subject: [PATCH] fix GetNextBatch to complete sync after there is no more pending states,remove extra comments, cleanup and goimports --- api/service/stagedstreamsync/satate_sync.go | 113 ++++++++------------ 1 file changed, 47 insertions(+), 66 deletions(-) diff --git a/api/service/stagedstreamsync/satate_sync.go b/api/service/stagedstreamsync/satate_sync.go index e90640a9a..1bf685826 100644 --- a/api/service/stagedstreamsync/satate_sync.go +++ b/api/service/stagedstreamsync/satate_sync.go @@ -109,9 +109,6 @@ type accountTask struct { Last common.Hash // Last account to sync in this interval SubTasks map[common.Hash][]*storageTask // Storage intervals needing fetching for large contracts - // These fields are internals used during runtime - //req *accountRequest // Pending request to fill this task - //res *accountResponse // Validate response filling this task pend int // Number of pending subtasks for this round needCode []bool // Flags whether the filling accounts need code retrieval @@ -134,26 +131,19 @@ type accountTask struct { // range request. It contains the subtrie for the requested account range and // the database that's going to be filled with the internal nodes on commit. type accountResponse struct { - task *accountTask // Task which this request is filling - + task *accountTask // Task which this request is filling hashes []common.Hash // Account hashes in the returned range accounts []*types.StateAccount // Expanded accounts in the returned range - - cont bool // Whether the account range has a continuation + cont bool // Whether the account range has a continuation } // storageTask represents the sync task for a chunk of the storage snapshot. type storageTask struct { - Next common.Hash // Next account to sync in this interval - Last common.Hash // Last account to sync in this interval - - // These fields are internals used during runtime - root common.Hash // Storage root hash for this instance - //req *storageTaskBundleuest // Pending request to fill this task - - genBatch ethdb.Batch // Batch used by the node generator - genTrie *trie.StackTrie // Node generator from storage slots - + Next common.Hash // Next account to sync in this interval + Last common.Hash // Last account to sync in this interval + root common.Hash // Storage root hash for this instance + genBatch ethdb.Batch // Batch used by the node generator + genTrie *trie.StackTrie // Node generator from storage slots requested bool done bool // Flag whether the task can be removed } @@ -200,7 +190,7 @@ func (t *healRequestSort) Swap(i, j int) { // Merge merges the pathsets, so that several storage requests concerning the // same account are merged into one, to reduce bandwidth. -// OBS: This operation is moot if t has not first been sorted. +// This operation is moot if t has not first been sorted. func (t *healRequestSort) Merge() []TrieNodePathSet { var result []TrieNodePathSet for _, path := range t.syncPaths { @@ -280,7 +270,6 @@ func (t *tasks) deleteAccountTask(accountTaskIndex uint64) { if _, ok := t.accountTasks[accountTaskIndex]; ok { delete(t.accountTasks, accountTaskIndex) } - // t.accountTasks = append(t.accountTasks[:accountTaskIndex], t.accountTasks[accountTaskIndex+1:]...) } func (t *tasks) addCodeTask(h common.Hash) { @@ -375,7 +364,6 @@ type FullStateDownloadManager struct { root common.Hash // Current state trie root being synced snapped bool // Flag to signal that snap phase is done - // healer *healTask // Current state healing task being executed protocol syncProtocol scheduler *trie.Sync // State trie sync scheduler defining the tasks @@ -444,7 +432,6 @@ func (s *FullStateDownloadManager) setRootHash(root common.Hash) { s.root = root s.scheduler = state.NewStateSync(root, s.db, s.onHealState, s.scheme) s.loadSyncStatus() - // s.sched = state.NewStateSync(root, s.bc.ChainDb(), nil, rawdb.HashScheme) } func (s *FullStateDownloadManager) taskDone(taskID uint64) { @@ -554,33 +541,7 @@ func (s *FullStateDownloadManager) commitHealer(force bool) { utils.Logger().Debug().Str("type", "trienodes").Interface("bytes", common.StorageSize(batch.ValueSize())).Msg("Persisted set of healing data") } -// getNextBatch returns objects with a maximum of n state download -// tasks to send to the remote peer. -func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask, - codes []common.Hash, - storages *storageTaskBundle, - healtask *healTask, - codetask *healTask, - err error) { - - s.lock.Lock() - defer s.lock.Unlock() - - cap := StatesPerRequest - - accounts, codes, storages, healtask, codetask = s.getBatchFromRetries(cap) - nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes) - cap -= nItems - - if cap == 0 { - return - } - - if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 { - utils.Logger().Debug().Msg("Snapshot sync already completed") - return - } - +func (s *FullStateDownloadManager) SyncCompleted() { defer func() { // Persist any progress, independent of failure for _, task := range s.tasks.accountTasks { s.forwardAccountTask(task) @@ -605,27 +566,50 @@ func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask, utils.Logger().Debug().Interface("root", s.root).Msg("Terminating snapshot sync cycle") }() - // Refill available tasks from the scheduler. - if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 { - utils.Logger().Debug().Msg("Snapshot sync already completed") + utils.Logger().Debug().Msg("Snapshot sync already completed") +} + +// getNextBatch returns objects with a maximum of n state download +// tasks to send to the remote peer. +func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask, + codes []common.Hash, + storages *storageTaskBundle, + healtask *healTask, + codetask *healTask, + err error) { + + s.lock.Lock() + defer s.lock.Unlock() + + cap := StatesPerRequest + + accounts, codes, storages, healtask, codetask = s.getBatchFromRetries(cap) + nItems := len(accounts) + len(codes) + len(storages.roots) + len(healtask.hashes) + len(codetask.hashes) + cap -= nItems + + if cap == 0 { return } - // if err = s.fillTasks(cap); err != nil { - // return - // } + if len(s.tasks.accountTasks) == 0 && s.scheduler.Pending() == 0 { + if nItems == 0 { + s.SyncCompleted() + } + return + } - includeHealtasks := true + // Refill available tasks from the scheduler. + withHealTasks := true if healtask != nil || codetask != nil { - includeHealtasks = false + withHealTasks = false } - newAccounts, newCodes, newStorageTaskBundle, unprocessedHealtask, unprocessedCodetask := s.getBatchFromUnprocessed(cap, includeHealtasks) + newAccounts, newCodes, newStorageTaskBundle, newHealTask, newCodeTask := s.getBatchFromUnprocessed(cap, withHealTasks) accounts = append(accounts, newAccounts...) codes = append(codes, newCodes...) storages = newStorageTaskBundle - if includeHealtasks { - healtask = unprocessedHealtask - codetask = unprocessedCodetask + if withHealTasks { + healtask = newHealTask + codetask = newCodeTask } return @@ -690,7 +674,7 @@ func (s *FullStateDownloadManager) loadSyncStatus() { } s.tasks.accountTasks = progress.Tasks for _, task := range s.tasks.accountTasks { - // task := task // closure for task.genBatch in the stacktrie writer callback + task := task // closure for task.genBatch in the stacktrie writer callback task.genBatch = ethdb.HookedBatch{ Batch: s.db.NewBatch(), @@ -810,11 +794,8 @@ func (s *FullStateDownloadManager) cleanAccountTasks() { return } // Sync wasn't finished previously, check for any task that can be finalized - //for i := 0; i < len(s.tasks.accountTasks); i++ { for taskID, _ := range s.tasks.accountTasks { if s.tasks.accountTasks[taskID].done { - //s.tasks.accountTasks = append(s.tasks.accountTasks[:i], s.tasks.accountTasks[i+1:]...) - //i-- s.tasks.deleteAccountTask(taskID) } } @@ -953,7 +934,7 @@ func (s *FullStateDownloadManager) updateStats(written, duplicate, unexpected in // getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download // tasks to send to the remote peer. -func (s *FullStateDownloadManager) getBatchFromUnprocessed(n int, includeHealtasks bool) ( +func (s *FullStateDownloadManager) getBatchFromUnprocessed(n int, withHealTasks bool) ( accounts []*accountTask, codes []common.Hash, storages *storageTaskBundle, @@ -1093,7 +1074,7 @@ func (s *FullStateDownloadManager) getBatchFromUnprocessed(n int, includeHealtas return } - if !includeHealtasks { + if !withHealTasks { return }