|
|
|
@ -163,7 +163,7 @@ func (s *StateDownloadManager) GetNextBatch() (nodes []common.Hash, paths []stri |
|
|
|
|
return nodes, paths, codes |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getNextBatch returns objects with a maximum of n state download
|
|
|
|
|
// getBatchFromUnprocessed returns objects with a maximum of n unprocessed state download
|
|
|
|
|
// tasks to send to the remote peer.
|
|
|
|
|
func (s *StateDownloadManager) getBatchFromUnprocessed(n int) (nodes []common.Hash, paths []string, codes []common.Hash) { |
|
|
|
|
// over trie nodes as those can be written to disk and forgotten about.
|
|
|
|
@ -229,19 +229,13 @@ func (s *StateDownloadManager) HandleRequestError(codeHashes []common.Hash, trie |
|
|
|
|
|
|
|
|
|
// add requested code hashes to retries
|
|
|
|
|
for _, h := range codeHashes { |
|
|
|
|
s.retries.codeTasks[h] = &codeTask{ |
|
|
|
|
attempts: s.requesting.codeTasks[h].attempts, |
|
|
|
|
} |
|
|
|
|
s.retries.addCodeTask(h,s.requesting.codeTasks[h])
|
|
|
|
|
delete(s.requesting.codeTasks, h) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// add requested trie paths to retries
|
|
|
|
|
for _, path := range triePaths { |
|
|
|
|
s.retries.trieTasks[path] = &trieTask{ |
|
|
|
|
hash: s.requesting.trieTasks[path].hash, |
|
|
|
|
path: s.requesting.trieTasks[path].path, |
|
|
|
|
attempts: s.requesting.trieTasks[path].attempts, |
|
|
|
|
} |
|
|
|
|
s.retries.addTrieTask(path,s.requesting.trieTasks[path])
|
|
|
|
|
delete(s.requesting.trieTasks, path) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -270,13 +264,13 @@ func (s *StateDownloadManager) HandleRequestResult(codeHashes []common.Hash, tri |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//TODO: remove successful tasks from requesting
|
|
|
|
|
for path, task := range s.requesting.trieTasks { |
|
|
|
|
for _, path := range triePaths { |
|
|
|
|
task := s.requesting.getTrieTask(path) |
|
|
|
|
// If the node did deliver something, missing items may be due to a protocol
|
|
|
|
|
// limit or a previous timeout + delayed delivery. Both cases should permit
|
|
|
|
|
// the node to retry the missing items (to avoid single-peer stalls).
|
|
|
|
|
if len(response) > 0 { //TODO: if timeout also do same
|
|
|
|
|
delete(task.attempts, streamID) |
|
|
|
|
delete(s.requesting.trieTasks[path].attempts, streamID) |
|
|
|
|
} else if task.attempts[streamID] >= MaxTriesToFetchNodeData { |
|
|
|
|
// If we've requested the node too many times already, it may be a malicious
|
|
|
|
|
// sync where nobody has the right data. Abort.
|
|
|
|
@ -287,12 +281,13 @@ func (s *StateDownloadManager) HandleRequestResult(codeHashes []common.Hash, tri |
|
|
|
|
s.requesting.deleteTrieTask(path) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for hash, task := range s.requesting.codeTasks { |
|
|
|
|
for _, hash := range codeHashes { |
|
|
|
|
task:= s.requesting.getCodeTask(hash) |
|
|
|
|
// If the node did deliver something, missing items may be due to a protocol
|
|
|
|
|
// limit or a previous timeout + delayed delivery. Both cases should permit
|
|
|
|
|
// the node to retry the missing items (to avoid single-peer stalls).
|
|
|
|
|
if len(response) > 0 { //TODO: if timeout also do same
|
|
|
|
|
delete(task.attempts, streamID) |
|
|
|
|
delete(s.requesting.codeTasks[hash].attempts, streamID) //TODO: do we need delete attempts???
|
|
|
|
|
} else if task.attempts[streamID] >= MaxTriesToFetchNodeData { |
|
|
|
|
// If we've requested the node too many times already, it may be a malicious
|
|
|
|
|
// sync where nobody has the right data. Abort.
|
|
|
|
|