|
|
|
@ -67,7 +67,6 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.state.Debug("STATE SYNC ======================================================>", "started") |
|
|
|
|
// maxHeight := s.state.status.targetBN
|
|
|
|
|
// currentHead := s.state.CurrentBlockNumber()
|
|
|
|
|
// if currentHead >= maxHeight {
|
|
|
|
@ -106,15 +105,10 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Fetch states from neighbors
|
|
|
|
|
pivotRootHash := s.state.status.pivotBlock.Root() |
|
|
|
|
currentBlockRootHash := s.state.bc.CurrentFastBlock().Root() |
|
|
|
|
scheme := sss.configs.bc.TrieDB().Scheme() |
|
|
|
|
sdm := newFullStateDownloadManager(sss.configs.bc.ChainDb(), scheme, tx, sss.configs.bc, sss.configs.concurrency, s.state.logger) |
|
|
|
|
sdm.setRootHash(currentBlockRootHash) |
|
|
|
|
s.state.Debug("StateSync/setRootHash", pivotRootHash) |
|
|
|
|
s.state.Debug("StateSync/currentFastBlockRoot", currentBlockRootHash) |
|
|
|
|
s.state.Debug("StateSync/pivotBlockNumber", s.state.status.pivotBlock.NumberU64()) |
|
|
|
|
s.state.Debug("StateSync/currentFastBlockNumber", s.state.bc.CurrentFastBlock().NumberU64()) |
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
for i := 0; i < s.state.config.Concurrency; i++ { |
|
|
|
|
wg.Add(1) |
|
|
|
@ -127,7 +121,6 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever |
|
|
|
|
sss.configs.logger.Warn().Err(err). |
|
|
|
|
Uint64("pivot block number", s.state.status.pivotBlock.NumberU64()). |
|
|
|
|
Msg(WrapStagedSyncMsg("insert pivot block failed")) |
|
|
|
|
s.state.Debug("StateSync/pivot/insert/error", err) |
|
|
|
|
// TODO: panic("pivot block is failed to insert in chain.")
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -135,9 +128,6 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever |
|
|
|
|
// states should be fully synced in this stage
|
|
|
|
|
s.state.status.statesSynced = true |
|
|
|
|
|
|
|
|
|
s.state.Debug("StateSync/pivot/num", s.state.status.pivotBlock.NumberU64()) |
|
|
|
|
s.state.Debug("StateSync/pivot/insert", "done") |
|
|
|
|
|
|
|
|
|
/* |
|
|
|
|
gbm := s.state.gbm |
|
|
|
|
|
|
|
|
@ -171,21 +161,15 @@ func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRever |
|
|
|
|
// runStateWorkerLoop creates a work loop for download states
|
|
|
|
|
func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *FullStateDownloadManager, wg *sync.WaitGroup, loopID int, startTime time.Time, s *StageState) { |
|
|
|
|
|
|
|
|
|
s.state.Debug("runStateWorkerLoop/info", "started") |
|
|
|
|
|
|
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
s.state.Debug("runStateWorkerLoop/ctx/done", "Finished") |
|
|
|
|
return |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
accountTasks, codes, storages, healtask, codetask, err := sdm.GetNextBatch() |
|
|
|
|
s.state.Debug("runStateWorkerLoop/batch/len", len(accountTasks)+len(codes)+len(storages.accounts)) |
|
|
|
|
s.state.Debug("runStateWorkerLoop/batch/heals/len", len(healtask.hashes)+len(codetask.hashes)) |
|
|
|
|
s.state.Debug("runStateWorkerLoop/batch/err", err) |
|
|
|
|
if len(accountTasks)+len(codes)+len(storages.accounts)+len(healtask.hashes)+len(codetask.hashes) == 0 || err != nil { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
@ -194,8 +178,6 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
s.state.Debug("runStateWorkerLoop/batch/accounts", accountTasks) |
|
|
|
|
s.state.Debug("runStateWorkerLoop/batch/codes", codes) |
|
|
|
|
|
|
|
|
|
if len(accountTasks) > 0 { |
|
|
|
|
|
|
|
|
@ -217,7 +199,6 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full |
|
|
|
|
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err) |
|
|
|
|
return |
|
|
|
|
} else if retAccounts == nil || len(retAccounts) == 0 { |
|
|
|
|
s.state.Debug("runStateWorkerLoop/GetAccountRange/data", "nil array") |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Str("stream", string(stid)). |
|
|
|
|
Msg(WrapStagedSyncMsg("GetAccountRange failed, received empty accounts")) |
|
|
|
@ -275,7 +256,6 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full |
|
|
|
|
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err) |
|
|
|
|
return |
|
|
|
|
} else if slots == nil || len(slots) == 0 { |
|
|
|
|
s.state.Debug("runStateWorkerLoop/GetStorageRanges/data", "nil array") |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Str("stream", string(stid)). |
|
|
|
|
Msg(WrapStagedSyncMsg("GetStorageRanges failed, received empty slots")) |
|
|
|
@ -315,7 +295,6 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full |
|
|
|
|
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err) |
|
|
|
|
return |
|
|
|
|
} else if nodes == nil || len(nodes) == 0 { |
|
|
|
|
s.state.Debug("runStateWorkerLoop/GetTrieNodes/data", "nil array") |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Str("stream", string(stid)). |
|
|
|
|
Msg(WrapStagedSyncMsg("GetTrieNodes failed, received empty nodes")) |
|
|
|
@ -350,7 +329,6 @@ func (sss *StageFullStateSync) runStateWorkerLoop(ctx context.Context, sdm *Full |
|
|
|
|
sdm.HandleRequestError(accountTasks, codes, storages, healtask, codetask, stid, err) |
|
|
|
|
return |
|
|
|
|
} else if retCodes == nil || len(retCodes) == 0 { |
|
|
|
|
s.state.Debug("runStateWorkerLoop/GetByteCodes/data", "nil array") |
|
|
|
|
utils.Logger().Warn(). |
|
|
|
|
Str("stream", string(stid)). |
|
|
|
|
Msg(WrapStagedSyncMsg("GetByteCodes failed, received empty codes")) |
|
|
|
|