[stream][consensus] resolve stn stuck issue and some fixes to short range sync.

pull/3610/head
Jacky Wang 4 years ago committed by Leo Chen
parent 3295a2bf03
commit e461b85ac8
  1. 5
      consensus/consensus_v2.go
  2. 9
      hmy/downloader/const.go
  3. 29
      hmy/downloader/shortrange.go

@ -366,8 +366,13 @@ func (consensus *Consensus) Start(
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC")
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc()
} else if consensus.Mode() == Syncing { } else if consensus.Mode() == Syncing {
// Corner case where sync is triggered before `onCommitted` and there is a race
// for block insertion between consensus and downloader.
mode := consensus.UpdateConsensusInformation() mode := consensus.UpdateConsensusInformation()
consensus.SetMode(mode) consensus.SetMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc()
} }
consensus.mutex.Unlock() consensus.mutex.Unlock()

@ -1,6 +1,8 @@
package downloader package downloader
import ( import (
"time"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
) )
@ -19,7 +21,12 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish. // no more request will be assigned to workers to wait for InsertChain to finish.
softQueueCap int = 100 softQueueCap int = 100
defaultConcurrency = 16 // defaultConcurrency is the default settings for concurrency
defaultConcurrency = 4
// shortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
shortRangeTimeout = 1 * time.Minute
) )
type ( type (

@ -26,9 +26,10 @@ var emptySigVerifyError *sigVerifyError
func (d *Downloader) doShortRangeSync() (int, error) { func (d *Downloader) doShortRangeSync() (int, error) {
numShortRangeCounterVec.With(d.promLabels()).Inc() numShortRangeCounterVec.With(d.promLabels()).Inc()
srCtx, _ := context.WithTimeout(d.ctx, shortRangeTimeout)
sh := &srHelper{ sh := &srHelper{
syncProtocol: d.syncProtocol, syncProtocol: d.syncProtocol,
ctx: d.ctx, ctx: srCtx,
config: d.config, config: d.config,
logger: d.logger.With().Str("mode", "short range").Logger(), logger: d.logger.With().Str("mode", "short range").Logger(),
} }
@ -46,10 +47,17 @@ func (d *Downloader) doShortRangeSync() (int, error) {
return 0, nil return 0, nil
} }
expEndBN := curBN + uint64(len(hashChain))
d.logger.Info().Uint64("current number", curBN).
Uint64("target number", expEndBN).
Interface("hashChain", hashChain).
Msg("short range start syncing")
d.startSyncing() d.startSyncing()
expEndBN := curBN + uint64(len(hashChain)) - 1
d.status.setTargetBN(expEndBN) d.status.setTargetBN(expEndBN)
defer d.finishSyncing() defer func() {
d.logger.Info().Msg("short range finished syncing")
d.finishSyncing()
}()
blocks, err := sh.getBlocksByHashes(hashChain, whitelist) blocks, err := sh.getBlocksByHashes(hashChain, whitelist)
if err != nil { if err != nil {
@ -118,8 +126,13 @@ func (sh *srHelper) getBlocksByHashes(hashes []common.Hash, whitelist []sttypes.
errLock sync.Mutex errLock sync.Mutex
) )
wg.Add(sh.config.Concurrency) concurrency := sh.config.Concurrency
for i := 0; i != sh.config.Concurrency; i++ { if concurrency > m.numRequests() {
concurrency = m.numRequests()
}
wg.Add(concurrency)
for i := 0; i != concurrency; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
defer cancel() // it's ok to cancel context more than once defer cancel() // it's ok to cancel context more than once
@ -189,6 +202,7 @@ func (sh *srHelper) doGetBlockHashesRequest(bns []uint64) ([]common.Hash, sttype
hashes, stid, err := sh.syncProtocol.GetBlockHashes(ctx, bns) hashes, stid, err := sh.syncProtocol.GetBlockHashes(ctx, bns)
if err != nil { if err != nil {
sh.logger.Warn().Err(err).Str("stream", string(stid)).Msg("failed to doGetBlockHashesRequest")
return nil, stid, err return nil, stid, err
} }
if len(hashes) != len(bns) { if len(hashes) != len(bns) {
@ -207,6 +221,7 @@ func (sh *srHelper) doGetBlocksByHashesRequest(ctx context.Context, hashes []com
blocks, stid, err := sh.syncProtocol.GetBlocksByHashes(ctx, hashes, blocks, stid, err := sh.syncProtocol.GetBlocksByHashes(ctx, hashes,
syncProto.WithWhitelist(wl)) syncProto.WithWhitelist(wl))
if err != nil { if err != nil {
sh.logger.Warn().Err(err).Str("stream", string(stid)).Msg("failed to getBlockByHashes")
return nil, stid, err return nil, stid, err
} }
if err := checkGetBlockByHashesResult(blocks, hashes); err != nil { if err := checkGetBlockByHashesResult(blocks, hashes); err != nil {
@ -385,6 +400,10 @@ func (m *getBlocksByHashManager) numBlocksPerRequest() int {
return val return val
} }
func (m *getBlocksByHashManager) numRequests() int {
return divideCeil(len(m.hashes), m.numBlocksPerRequest())
}
func (m *getBlocksByHashManager) addResult(hashes []common.Hash, blocks []*types.Block, stid sttypes.StreamID) { func (m *getBlocksByHashManager) addResult(hashes []common.Hash, blocks []*types.Block, stid sttypes.StreamID) {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()

Loading…
Cancel
Save