add task management logic to state download manager in stream sync

pull/4465/head
“GheisMohammadi” 1 year ago
parent 2064cfd62c
commit 702eb5e1fb
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 8
      api/service/stagedstreamsync/const.go
  2. 272
      api/service/stagedstreamsync/state_download_manager.go

@ -23,6 +23,14 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish. // no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100 SoftQueueCap int = 100
StatesPerRequest int = 10 // number of get nodes by hashes for each request
// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4
// MaxTriesToFetchNodeData is the maximum number of tries to fetch node data
MaxTriesToFetchNodeData int = 5
// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync // ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes` // to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute ShortRangeTimeout time.Duration = 1 * time.Minute

@ -1,6 +1,7 @@
package stagedstreamsync package stagedstreamsync
import ( import (
"fmt"
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -15,30 +16,10 @@ import (
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
) )
// trieTask represents a single trie node download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type trieTask struct {
hash common.Hash
path [][]byte
attempts map[string]struct{}
}
// codeTask represents a single byte code download task, containing a set of // codeTask represents a single byte code download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort. // peers already attempted retrieval from to detect stalled syncs and abort.
type codeTask struct { type codeTask struct {
attempts map[string]struct{} attempts map[sttypes.StreamID]int
}
type task struct {
trieTasks map[string]*trieTask // Set of trie node tasks currently queued for retrieval, indexed by path
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval, indexed by hash
}
func newTask() *task {
return &task{
trieTasks: make(map[string]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
}
} }
func (t *task) addCodeTask(h common.Hash, ct *codeTask) { func (t *task) addCodeTask(h common.Hash, ct *codeTask) {
@ -53,7 +34,7 @@ func (t *task) getCodeTask(h common.Hash) *codeTask {
func (t *task) addNewCodeTask(h common.Hash) { func (t *task) addNewCodeTask(h common.Hash) {
t.codeTasks[h] = &codeTask{ t.codeTasks[h] = &codeTask{
attempts: make(map[string]struct{}), attempts: make(map[sttypes.StreamID]int),
} }
} }
@ -61,15 +42,15 @@ func (t *task) deleteCodeTask(hash common.Hash) {
delete(t.codeTasks, hash) delete(t.codeTasks, hash)
} }
func (t *task) addTrieTask(hash common.Hash, path string) { // trieTask represents a single trie node download task, containing a set of
t.trieTasks[path] = &trieTask{ // peers already attempted retrieval from to detect stalled syncs and abort.
hash: hash, type trieTask struct {
path: trie.NewSyncPath([]byte(path)), hash common.Hash
attempts: make(map[string]struct{}), path [][]byte
} attempts map[sttypes.StreamID]int
} }
func (t *task) setTrieTask(path string, tt *trieTask) { func (t *task) addTrieTask(path string, tt *trieTask) {
t.trieTasks[path] = &trieTask{ t.trieTasks[path] = &trieTask{
hash: tt.hash, hash: tt.hash,
path: tt.path, path: tt.path,
@ -81,10 +62,30 @@ func (t *task) getTrieTask(path string) *trieTask {
return t.trieTasks[path] return t.trieTasks[path]
} }
func (t *task) addNewTrieTask(hash common.Hash, path string) {
t.trieTasks[path] = &trieTask{
hash: hash,
path: trie.NewSyncPath([]byte(path)),
attempts: make(map[sttypes.StreamID]int),
}
}
func (t *task) deleteTrieTask(path string) { func (t *task) deleteTrieTask(path string) {
delete(t.trieTasks, path) delete(t.trieTasks, path)
} }
type task struct {
trieTasks map[string]*trieTask // Set of trie node tasks currently queued for retrieval, indexed by path
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval, indexed by hash
}
func newTask() *task {
return &task{
trieTasks: make(map[string]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
}
}
// StateDownloadManager is the helper structure for get blocks request management // StateDownloadManager is the helper structure for get blocks request management
type StateDownloadManager struct { type StateDownloadManager struct {
bc core.BlockChain bc core.BlockChain
@ -125,3 +126,216 @@ func newStateDownloadManager(tx kv.RwTx,
} }
} }
// fillTasks fills the tasks to send to the remote peer.
func (s *StateDownloadManager) fillTasks(n int) error {
// Refill available tasks from the scheduler.
if fill := n - (len(s.tasks.trieTasks) + len(s.tasks.codeTasks)); fill > 0 {
paths, hashes, codes := s.sched.Missing(fill)
for i, path := range paths {
s.tasks.addNewTrieTask(hashes[i], path)
}
for _, hash := range codes {
s.tasks.addNewCodeTask(hash)
}
}
return nil
}
// getNextBatch returns objects with a maximum of n state download
// tasks to send to the remote peer.
func (s *StateDownloadManager) GetNextBatch() (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) {
s.lock.Lock()
defer s.lock.Unlock()
cap := StatesPerRequest
nodes, paths, codes = s.getBatchFromRetries(cap)
nItems := len(nodes) + len(codes)
cap -= nItems
if cap > 0 {
newNodes, newPaths, newCodes := s.getBatchFromUnprocessed(cap)
nodes = append(nodes, newNodes...)
paths = append(paths, newPaths...)
codes = append(codes, newCodes...)
}
return nodes, paths, codes
}
// getNextBatch returns objects with a maximum of n state download
// tasks to send to the remote peer.
func (s *StateDownloadManager) getBatchFromUnprocessed(n int) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) {
// over trie nodes as those can be written to disk and forgotten about.
nodes = make([]common.Hash, 0, n)
paths = make([]trie.SyncPath, 0, n)
codes = make([]common.Hash, 0, n)
for hash, t := range s.tasks.codeTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
codes = append(codes, hash)
s.requesting.addCodeTask(hash, t)
s.tasks.deleteCodeTask(hash)
}
for path, t := range s.tasks.trieTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
nodes = append(nodes, t.hash)
paths = append(paths, t.path)
s.requesting.addTrieTask(path, t)
s.tasks.deleteTrieTask(path)
}
return nodes, paths, codes
}
// getBatchFromRetries get the block number batch to be requested from retries.
func (s *StateDownloadManager) getBatchFromRetries(n int) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) {
// over trie nodes as those can be written to disk and forgotten about.
nodes = make([]common.Hash, 0, n)
paths = make([]trie.SyncPath, 0, n)
codes = make([]common.Hash, 0, n)
for hash, t := range s.retries.codeTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
codes = append(codes, hash)
s.requesting.addCodeTask(hash, t)
s.retries.deleteCodeTask(hash)
}
for path, t := range s.retries.trieTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
}
nodes = append(nodes, t.hash)
paths = append(paths, t.path)
s.requesting.addTrieTask(path, t)
s.retries.deleteTrieTask(path)
}
return nodes, paths, codes
}
// HandleRequestError handles the error result
func (s *StateDownloadManager) HandleRequestError(codeHashes []common.Hash, triePaths []string, streamID sttypes.StreamID, err error) {
s.lock.Lock()
defer s.lock.Unlock()
// add requested code hashes to retries
for _, h := range codeHashes {
s.retries.codeTasks[h] = &codeTask{
attempts: s.requesting.codeTasks[h].attempts,
}
delete(s.requesting.codeTasks, h)
}
// add requested trie paths to retries
for _, p := range triePaths {
s.retries.trieTasks[p] = &trieTask{
hash: s.requesting.trieTasks[p].hash,
path: s.requesting.trieTasks[p].path,
attempts: s.requesting.trieTasks[p].attempts,
}
delete(s.requesting.trieTasks, p)
}
}
// HandleRequestResult handles get trie paths and code hashes result
func (s *StateDownloadManager) HandleRequestResult(trieTasks map[string]*trieTask, codeTasks map[common.Hash]*codeTask, response [][]byte, loopID int, streamID sttypes.StreamID) error {
s.lock.Lock()
defer s.lock.Unlock()
// Collect processing stats and update progress if valid data was received
duplicate, unexpected, successful, numUncommitted, bytesUncommitted := 0, 0, 0, 0, 0
for _, blob := range response {
hash, err := s.processNodeData(trieTasks, codeTasks, blob)
switch err {
case nil:
numUncommitted++
bytesUncommitted += len(blob)
successful++
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
duplicate++
default:
return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
}
//TODO: remove successful tasks from requesting
for path, task := range trieTasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len(response) > 0 { //TODO: if timeout also do same
delete(task.attempts, streamID)
} else if task.attempts[streamID] >= MaxTriesToFetchNodeData {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
return fmt.Errorf("trie node %s failed with peer %s", task.hash.TerminalString(), task.attempts[streamID])
}
// Missing item, place into the retry queue.
s.retries.addTrieTask(path, task)
}
for hash, task := range codeTasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len(response) > 0 { //TODO: if timeout also do same
delete(task.attempts, streamID)
} else if task.attempts[streamID] >= MaxTriesToFetchNodeData {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
return fmt.Errorf("byte code %s failed with peer %s", hash.TerminalString(), task.attempts[streamID])
}
// Missing item, place into the retry queue.
s.retries.addCodeTask(hash, task)
}
return nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any
// error occurred.
//
// If multiple requests correspond to the same hash, this method will inject the
// blob as a result for the first one only, leaving the remaining duplicates to
// be fetched again.
func (s *StateDownloadManager) processNodeData(nodeTasks map[string]*trieTask, codeTasks map[common.Hash]*codeTask, responseData []byte) (common.Hash, error) {
var hash common.Hash
s.keccak.Reset()
s.keccak.Write(responseData)
s.keccak.Read(hash[:])
//TODO: remove from requesting
if _, present := codeTasks[hash]; present {
err := s.sched.ProcessCode(trie.CodeSyncResult{
Hash: hash,
Data: responseData,
})
delete(codeTasks, hash)
return hash, err
}
for path, task := range nodeTasks {
if task.hash == hash {
err := s.sched.ProcessNode(trie.NodeSyncResult{
Path: path,
Data: responseData,
})
delete(nodeTasks, path)
return hash, err
}
}
return common.Hash{}, trie.ErrNotRequested
}

Loading…
Cancel
Save