Improvements of streamsync to deploy on mainnet (#4493)

* add faultRecoveryThreshold to reset stream failures

* increase MaxStreamFailures to let stream be longer in the list

* set Concurrency to 2 for devnet to be same as MinStreams, otherwise it will rewrite MinStreams

* stream sync loop checks for ErrNotEnoughStreamsand waits for enough streams in case there are not enough connected streams in list

* fix fault recovery issue

* improve checkPrerequisites to be able to continue with minimum streams

* refactor fixValues function, put priority on MinStreams rather than Concurrency

* drop remote peer if sending empty blocks array

* goimports to fix build issue

* fix getReceipts array assignments

* fix getReceipts and add tests for it
pull/4518/head
Gheis Mohammadi 1 year ago committed by GitHub
parent d8f122538b
commit fa99cd1959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      api/service/stagedstreamsync/const.go
  2. 21
      api/service/stagedstreamsync/downloader.go
  3. 2
      api/service/stagedstreamsync/errors.go
  4. 5
      api/service/stagedstreamsync/short_range_helper.go
  5. 13
      api/service/stagedstreamsync/stage_bodies.go
  6. 7
      api/service/stagedstreamsync/stage_epoch.go
  7. 7
      api/service/stagedstreamsync/stage_short_range.go
  8. 3
      api/service/stagedstreamsync/staged_stream_sync.go
  9. 2
      cmd/harmony/default.go
  10. 3
      p2p/stream/common/requestmanager/interface_test.go
  11. 3
      p2p/stream/common/streammanager/interface_test.go
  12. 15
      p2p/stream/protocols/sync/chain_test.go
  13. 6
      p2p/stream/protocols/sync/const.go
  14. 5
      p2p/stream/protocols/sync/protocol.go
  15. 15
      p2p/stream/types/stream.go

@ -23,9 +23,6 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100
// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4
// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute
@ -74,10 +71,10 @@ type (
func (c *Config) fixValues() {
if c.Concurrency == 0 {
c.Concurrency = DefaultConcurrency
c.Concurrency = c.MinStreams
}
if c.Concurrency > c.MinStreams {
c.MinStreams = c.Concurrency
c.Concurrency = c.MinStreams
}
if c.MinStreams > c.InitStreams {
c.InitStreams = c.MinStreams

@ -153,6 +153,17 @@ func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscript
// waitForBootFinish waits for stream manager to finish the initial discovery and have
// enough peers to start downloader
func (d *Downloader) waitForBootFinish() {
bootCompleted, numStreams := d.waitForEnoughStreams(d.config.InitStreams)
if bootCompleted {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n",
d.bc.ShardID(), numStreams)
}
}
func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) {
d.logger.Info().Int("requiredStreams", requiredStreams).
Msg("waiting for enough stream connections to continue syncing")
evtCh := make(chan streammanager.EvtStreamAdded, 1)
sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh)
defer sub.Unsubscribe()
@ -177,12 +188,11 @@ func (d *Downloader) waitForBootFinish() {
trigger()
case <-checkCh:
if d.syncProtocol.NumStreams() >= d.config.InitStreams {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", d.bc.ShardID(), d.syncProtocol.NumStreams())
return
if d.syncProtocol.NumStreams() >= requiredStreams {
return true, d.syncProtocol.NumStreams()
}
case <-d.closeC:
return
return false, d.syncProtocol.NumStreams()
}
}
}
@ -212,6 +222,9 @@ func (d *Downloader) loop() {
case <-d.downloadC:
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err == ErrNotEnoughStreams {
d.waitForEnoughStreams(d.config.MinStreams)
}
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {

@ -14,7 +14,7 @@ var (
ErrUnexpectedNumberOfBlockHashes = WrapStagedSyncError("unexpected number of getBlocksByHashes result")
ErrUnexpectedBlockHashes = WrapStagedSyncError("unexpected get block hashes result delivered")
ErrNilBlock = WrapStagedSyncError("nil block found")
ErrNotEnoughStreams = WrapStagedSyncError("not enough streams")
ErrNotEnoughStreams = WrapStagedSyncError("number of streams smaller than minimum required")
ErrParseCommitSigAndBitmapFail = WrapStagedSyncError("parse commitSigAndBitmap failed")
ErrVerifyHeaderFail = WrapStagedSyncError("verify header failed")
ErrInsertChainFail = WrapStagedSyncError("insert to chain failed")

@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
@ -132,6 +133,10 @@ func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash,
func (sh *srHelper) checkPrerequisites() error {
if sh.syncProtocol.NumStreams() < sh.config.Concurrency {
utils.Logger().Info().
Int("available streams", sh.syncProtocol.NumStreams()).
Interface("concurrency", sh.config.Concurrency).
Msg("not enough streams to do concurrent processes")
return ErrNotEnoughStreams
}
return nil

@ -167,13 +167,22 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
Msg(WrapStagedSyncMsg("downloadRawBlocks failed"))
err = errors.Wrap(err, "request error")
gbm.HandleRequestError(batch, err, stid)
} else if blockBytes == nil || len(blockBytes) == 0 {
} else if blockBytes == nil {
utils.Logger().Warn().
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes"))
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received invalid (nil) blockBytes"))
err := errors.New("downloadRawBlocks received invalid (nil) blockBytes")
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed")
} else if len(blockBytes) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes, remote peer is not fully synced"))
err := errors.New("downloadRawBlocks received empty blockBytes")
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.RemoveStream(stid)
} else {
if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
panic(ErrSaveBlocksToDbFailed)

@ -92,7 +92,12 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage
}
if err := sh.checkPrerequisites(); err != nil {
return 0, errors.Wrap(err, "prerequisite")
// if error is ErrNotEnoughStreams but still some streams available,
// it can continue syncing, otherwise return error
// here we are not doing concurrent processes, so even 1 stream should be enough
if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() == 0 {
return 0, errors.Wrap(err, "prerequisite")
}
}
curBN := s.state.bc.CurrentBlock().NumberU64()
bns := make([]uint64, 0, BlocksPerRequest)

@ -97,7 +97,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
}
if err := sh.checkPrerequisites(); err != nil {
return 0, errors.Wrap(err, "prerequisite")
// if error is ErrNotEnoughStreams but still two streams available,
// it can continue syncing, otherwise return error
// at least 2 streams are needed to do concurrent processes
if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() < 2 {
return 0, errors.Wrap(err, "prerequisite")
}
}
curBN := sr.configs.bc.CurrentBlock().NumberU64()
blkNums := sh.prepareBlockHashNumbers(curBN)

@ -337,8 +337,9 @@ func (s *StagedStreamSync) promLabels() prometheus.Labels {
func (s *StagedStreamSync) checkHaveEnoughStreams() error {
numStreams := s.protocol.NumStreams()
if numStreams < s.config.MinStreams {
return fmt.Errorf("number of streams smaller than minimum: %v < %v",
s.logger.Debug().Msgf("number of streams smaller than minimum: %v < %v",
numStreams, s.config.MinStreams)
return ErrNotEnoughStreams
}
return nil
}

@ -232,7 +232,7 @@ var (
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
Concurrency: 2,
MinPeers: 2,
InitStreams: 2,
MaxAdvertiseWaitTime: 2, //minutes

@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rlp"
@ -118,7 +119,7 @@ func (st *testStream) FailedTimes() int {
return 0
}
func (st *testStream) AddFailedTimes() {
func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
return
}

@ -6,6 +6,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/libp2p/go-libp2p/core/network"
@ -74,7 +75,7 @@ func (st *testStream) FailedTimes() int {
return 0
}
func (st *testStream) AddFailedTimes() {
func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
return
}

@ -215,3 +215,18 @@ func checkBlocksByHashesResult(b []byte, hs []common.Hash) error {
}
return nil
}
func checkGetReceiptsResult(b []byte, hs []common.Hash) error {
var msg = &syncpb.Message{}
if err := protobuf.Unmarshal(b, msg); err != nil {
return err
}
bhResp, err := msg.GetReceiptsResponse()
if err != nil {
return err
}
if len(hs) != len(bhResp.Receipts) {
return errors.New("unexpected size")
}
return nil
}

@ -26,7 +26,11 @@ const (
GetReceiptsCap = 10
// MaxStreamFailures is the maximum allowed failures before stream gets removed
MaxStreamFailures = 3
MaxStreamFailures = 5
// FaultRecoveryThreshold is the minimum duration before it resets the previous failures
// So, if stream hasn't had any issue for a certain amount of time since last failure, we can still trust it
FaultRecoveryThreshold = 30 * time.Minute
// minAdvertiseInterval is the minimum advertise interval
minAdvertiseInterval = 1 * time.Minute

@ -272,13 +272,16 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID) {
st.Close()
// stream manager removes this stream from the list and triggers discovery if number of streams are not enough
p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed
p.logger.Info().
Str("stream ID", string(stID)).
Msg("stream removed")
}
}
func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) {
st, exist := p.sm.GetStreamByID(stID)
if exist && st != nil {
st.AddFailedTimes()
st.AddFailedTimes(FaultRecoveryThreshold)
p.logger.Info().
Str("stream ID", string(st.ID())).
Int("num failures", st.FailedTimes()).

@ -5,6 +5,7 @@ import (
"encoding/binary"
"io"
"sync"
"time"
libp2p_network "github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
@ -22,7 +23,7 @@ type Stream interface {
Close() error
CloseOnExit() error
FailedTimes() int
AddFailedTimes()
AddFailedTimes(faultRecoveryThreshold time.Duration)
ResetFailedTimes()
}
@ -38,7 +39,8 @@ type BaseStream struct {
specErr error
specOnce sync.Once
failedTimes int
failedTimes int
lastFailureTime time.Time
}
// NewBaseStream creates BaseStream as the wrapper of libp2p Stream
@ -82,8 +84,15 @@ func (st *BaseStream) FailedTimes() int {
return st.failedTimes
}
func (st *BaseStream) AddFailedTimes() {
func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
if st.failedTimes > 0 {
durationSinceLastFailure := time.Now().Sub(st.lastFailureTime)
if durationSinceLastFailure >= faultRecoveryThreshold {
st.ResetFailedTimes()
}
}
st.failedTimes++
st.lastFailureTime = time.Now()
}
func (st *BaseStream) ResetFailedTimes() {

Loading…
Cancel
Save