refactor state download manager

pull/4465/head
“GheisMohammadi” 1 year ago
parent 4629fda90b
commit 9e1249a836
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 48
      api/service/stagedstreamsync/state_download_manager.go

@ -143,7 +143,7 @@ func (s *StateDownloadManager) fillTasks(n int) error {
// getNextBatch returns objects with a maximum of n state download // getNextBatch returns objects with a maximum of n state download
// tasks to send to the remote peer. // tasks to send to the remote peer.
func (s *StateDownloadManager) GetNextBatch() (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) { func (s *StateDownloadManager) GetNextBatch() (nodes []common.Hash, paths []string, codes []common.Hash) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -165,10 +165,10 @@ func (s *StateDownloadManager) GetNextBatch() (nodes []common.Hash, paths []trie
// getNextBatch returns objects with a maximum of n state download // getNextBatch returns objects with a maximum of n state download
// tasks to send to the remote peer. // tasks to send to the remote peer.
func (s *StateDownloadManager) getBatchFromUnprocessed(n int) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) { func (s *StateDownloadManager) getBatchFromUnprocessed(n int) (nodes []common.Hash, paths []string, codes []common.Hash) {
// over trie nodes as those can be written to disk and forgotten about. // over trie nodes as those can be written to disk and forgotten about.
nodes = make([]common.Hash, 0, n) nodes = make([]common.Hash, 0, n)
paths = make([]trie.SyncPath, 0, n) paths = make([]string, 0, n)
codes = make([]common.Hash, 0, n) codes = make([]common.Hash, 0, n)
for hash, t := range s.tasks.codeTasks { for hash, t := range s.tasks.codeTasks {
@ -186,7 +186,7 @@ func (s *StateDownloadManager) getBatchFromUnprocessed(n int) (nodes []common.Ha
break break
} }
nodes = append(nodes, t.hash) nodes = append(nodes, t.hash)
paths = append(paths, t.path) paths = append(paths, path)
s.requesting.addTrieTask(path, t) s.requesting.addTrieTask(path, t)
s.tasks.deleteTrieTask(path) s.tasks.deleteTrieTask(path)
} }
@ -194,10 +194,10 @@ func (s *StateDownloadManager) getBatchFromUnprocessed(n int) (nodes []common.Ha
} }
// getBatchFromRetries get the block number batch to be requested from retries. // getBatchFromRetries get the block number batch to be requested from retries.
func (s *StateDownloadManager) getBatchFromRetries(n int) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) { func (s *StateDownloadManager) getBatchFromRetries(n int) (nodes []common.Hash, paths []string, codes []common.Hash) {
// over trie nodes as those can be written to disk and forgotten about. // over trie nodes as those can be written to disk and forgotten about.
nodes = make([]common.Hash, 0, n) nodes = make([]common.Hash, 0, n)
paths = make([]trie.SyncPath, 0, n) paths = make([]string, 0, n)
codes = make([]common.Hash, 0, n) codes = make([]common.Hash, 0, n)
for hash, t := range s.retries.codeTasks { for hash, t := range s.retries.codeTasks {
@ -215,7 +215,7 @@ func (s *StateDownloadManager) getBatchFromRetries(n int) (nodes []common.Hash,
break break
} }
nodes = append(nodes, t.hash) nodes = append(nodes, t.hash)
paths = append(paths, t.path) paths = append(paths, path)
s.requesting.addTrieTask(path, t) s.requesting.addTrieTask(path, t)
s.retries.deleteTrieTask(path) s.retries.deleteTrieTask(path)
} }
@ -236,18 +236,18 @@ func (s *StateDownloadManager) HandleRequestError(codeHashes []common.Hash, trie
} }
// add requested trie paths to retries // add requested trie paths to retries
for _, p := range triePaths { for _, path := range triePaths {
s.retries.trieTasks[p] = &trieTask{ s.retries.trieTasks[path] = &trieTask{
hash: s.requesting.trieTasks[p].hash, hash: s.requesting.trieTasks[path].hash,
path: s.requesting.trieTasks[p].path, path: s.requesting.trieTasks[path].path,
attempts: s.requesting.trieTasks[p].attempts, attempts: s.requesting.trieTasks[path].attempts,
} }
delete(s.requesting.trieTasks, p) delete(s.requesting.trieTasks, path)
} }
} }
// HandleRequestResult handles get trie paths and code hashes result // HandleRequestResult handles get trie paths and code hashes result
func (s *StateDownloadManager) HandleRequestResult(trieTasks map[string]*trieTask, codeTasks map[common.Hash]*codeTask, response [][]byte, loopID int, streamID sttypes.StreamID) error { func (s *StateDownloadManager) HandleRequestResult(codeHashes []common.Hash, triePaths []string, response [][]byte, loopID int, streamID sttypes.StreamID) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -255,7 +255,7 @@ func (s *StateDownloadManager) HandleRequestResult(trieTasks map[string]*trieTas
duplicate, unexpected, successful, numUncommitted, bytesUncommitted := 0, 0, 0, 0, 0 duplicate, unexpected, successful, numUncommitted, bytesUncommitted := 0, 0, 0, 0, 0
for _, blob := range response { for _, blob := range response {
hash, err := s.processNodeData(trieTasks, codeTasks, blob) hash, err := s.processNodeData(codeHashes, triePaths, blob)
switch err { switch err {
case nil: case nil:
numUncommitted++ numUncommitted++
@ -271,8 +271,7 @@ func (s *StateDownloadManager) HandleRequestResult(trieTasks map[string]*trieTas
} }
//TODO: remove successful tasks from requesting //TODO: remove successful tasks from requesting
for path, task := range s.requesting.trieTasks {
for path, task := range trieTasks {
// If the node did deliver something, missing items may be due to a protocol // If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit // limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls). // the node to retry the missing items (to avoid single-peer stalls).
@ -285,9 +284,10 @@ func (s *StateDownloadManager) HandleRequestResult(trieTasks map[string]*trieTas
} }
// Missing item, place into the retry queue. // Missing item, place into the retry queue.
s.retries.addTrieTask(path, task) s.retries.addTrieTask(path, task)
s.requesting.deleteTrieTask(path)
} }
for hash, task := range codeTasks { for hash, task := range s.requesting.codeTasks {
// If the node did deliver something, missing items may be due to a protocol // If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit // limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls). // the node to retry the missing items (to avoid single-peer stalls).
@ -300,6 +300,7 @@ func (s *StateDownloadManager) HandleRequestResult(trieTasks map[string]*trieTas
} }
// Missing item, place into the retry queue. // Missing item, place into the retry queue.
s.retries.addCodeTask(hash, task) s.retries.addCodeTask(hash, task)
s.requesting.deleteCodeTask(hash)
} }
return nil return nil
@ -312,28 +313,29 @@ func (s *StateDownloadManager) HandleRequestResult(trieTasks map[string]*trieTas
// If multiple requests correspond to the same hash, this method will inject the // If multiple requests correspond to the same hash, this method will inject the
// blob as a result for the first one only, leaving the remaining duplicates to // blob as a result for the first one only, leaving the remaining duplicates to
// be fetched again. // be fetched again.
func (s *StateDownloadManager) processNodeData(nodeTasks map[string]*trieTask, codeTasks map[common.Hash]*codeTask, responseData []byte) (common.Hash, error) { func (s *StateDownloadManager) processNodeData(codeHashes []common.Hash, triePaths []string, responseData []byte) (common.Hash, error) {
var hash common.Hash var hash common.Hash
s.keccak.Reset() s.keccak.Reset()
s.keccak.Write(responseData) s.keccak.Write(responseData)
s.keccak.Read(hash[:]) s.keccak.Read(hash[:])
//TODO: remove from requesting //TODO: remove from requesting
if _, present := codeTasks[hash]; present { if _, present := s.requesting.codeTasks[hash]; present {
err := s.sched.ProcessCode(trie.CodeSyncResult{ err := s.sched.ProcessCode(trie.CodeSyncResult{
Hash: hash, Hash: hash,
Data: responseData, Data: responseData,
}) })
delete(codeTasks, hash) s.requesting.deleteCodeTask(hash)
return hash, err return hash, err
} }
for path, task := range nodeTasks { for _, path := range triePaths {
task := s.requesting.getTrieTask(path)
if task.hash == hash { if task.hash == hash {
err := s.sched.ProcessNode(trie.NodeSyncResult{ err := s.sched.ProcessNode(trie.NodeSyncResult{
Path: path, Path: path,
Data: responseData, Data: responseData,
}) })
delete(nodeTasks, path) s.requesting.deleteTrieTask(path)
return hash, err return hash, err
} }
} }

Loading…
Cancel
Save