The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
woop/hmy/downloader/downloader.go

265 lines
5.9 KiB

package downloader
import (
"context"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/rs/zerolog"
)
type (
// Downloader is responsible for sync task of one shard
Downloader struct {
bc blockChain
ih insertHelper
syncProtocol syncProtocol
bh *beaconHelper
downloadC chan struct{}
closeC chan struct{}
ctx context.Context
cancel func()
evtDownloadFinished event.Feed // channel for each download task finished
evtDownloadStarted event.Feed // channel for each download has started
status status
config Config
logger zerolog.Logger
}
)
// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc *core.BlockChain, config Config) *Downloader {
config.fixValues()
ih := newInsertHelper(bc)
sp := sync.NewProtocol(sync.Config{
Chain: bc,
Host: host.GetP2PHost(),
Discovery: host.GetDiscovery(),
ShardID: nodeconfig.ShardID(bc.ShardID()),
Network: config.Network,
SmSoftLowCap: config.SmSoftLowCap,
SmHardLowCap: config.SmHardLowCap,
SmHiCap: config.SmHiCap,
DiscBatch: config.SmDiscBatch,
})
host.AddStreamProtocol(sp)
var bh *beaconHelper
if config.BHConfig != nil && bc.ShardID() == 0 {
bh = newBeaconHelper(bc, ih, config.BHConfig.BlockC, config.BHConfig.InsertHook)
}
ctx, cancel := context.WithCancel(context.Background())
return &Downloader{
bc: bc,
ih: ih,
syncProtocol: sp,
bh: bh,
downloadC: make(chan struct{}),
closeC: make(chan struct{}),
ctx: ctx,
cancel: cancel,
status: newStatus(),
config: config,
logger: utils.Logger().With().Str("module", "downloader").Logger(),
}
}
// Start start the downloader
func (d *Downloader) Start() {
if d.config.ServerOnly {
return
}
go d.run()
if d.bh != nil {
d.bh.start()
}
}
// Close close the downloader
func (d *Downloader) Close() {
if d.config.ServerOnly {
return
}
close(d.closeC)
d.cancel()
if d.bh != nil {
d.bh.close()
}
}
// DownloadAsync triggers the download async.
func (d *Downloader) DownloadAsync() {
select {
case d.downloadC <- struct{}{}:
consensusTriggeredDownloadCounterVec.With(d.promLabels()).Inc()
case <-time.After(100 * time.Millisecond):
}
}
// NumPeers returns the number of peers connected of a specific shard.
func (d *Downloader) NumPeers() int {
return d.syncProtocol.NumStreams()
}
// IsSyncing return the current sync status
func (d *Downloader) SyncStatus() (bool, uint64) {
syncing, target := d.status.get()
if !syncing {
target = d.bc.CurrentBlock().NumberU64()
}
return syncing, target
}
// SubscribeDownloadStarted subscribe download started
func (d *Downloader) SubscribeDownloadStarted(ch chan struct{}) event.Subscription {
return d.evtDownloadStarted.Subscribe(ch)
}
// SubscribeDownloadFinishedEvent subscribe the download finished
func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscription {
return d.evtDownloadFinished.Subscribe(ch)
}
func (d *Downloader) run() {
d.waitForBootFinish()
d.loop()
}
// waitForBootFinish wait for stream manager to finish the initial discovery and have
// enough peers to start downloader
func (d *Downloader) waitForBootFinish() {
evtCh := make(chan streammanager.EvtStreamAdded, 1)
sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh)
defer sub.Unsubscribe()
checkCh := make(chan struct{}, 1)
trigger := func() {
select {
case checkCh <- struct{}{}:
default:
}
}
trigger()
t := time.NewTicker(10 * time.Second)
for {
d.logger.Info().Msg("waiting for initial bootstrap discovery")
select {
case <-t.C:
trigger()
case <-evtCh:
trigger()
case <-checkCh:
if d.syncProtocol.NumStreams() >= d.config.InitStreams {
return
}
case <-d.closeC:
return
}
}
}
func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
initSync := true
trigger := func() {
select {
case d.downloadC <- struct{}{}:
case <-time.After(100 * time.Millisecond):
}
}
go trigger()
for {
select {
case <-ticker.C:
go trigger()
case <-d.downloadC:
addedBN, err := d.doDownload(initSync)
if err != nil {
// If error happens, sleep 5 seconds and retry
d.logger.Warn().Err(err).Bool("bootstrap", initSync).Msg("failed to download")
go func() {
time.Sleep(5 * time.Second)
trigger()
}()
continue
}
d.logger.Info().Int("block added", addedBN).
Uint64("current height", d.bc.CurrentBlock().NumberU64()).
Bool("initSync", initSync).
Uint32("shard", d.bc.ShardID()).
Msg("sync finished")
if addedBN != 0 {
// If block number has been changed, trigger another sync
// and try to add last mile from pub-sub (blocking)
go trigger()
if d.bh != nil {
d.bh.insertSync()
}
}
initSync = false
case <-d.closeC:
return
}
}
}
func (d *Downloader) doDownload(initSync bool) (n int, err error) {
if initSync {
d.logger.Info().Uint64("current number", d.bc.CurrentBlock().NumberU64()).
Uint32("shard ID", d.bc.ShardID()).Msg("start long range sync")
n, err = d.doLongRangeSync()
} else {
d.logger.Info().Uint64("current number", d.bc.CurrentBlock().NumberU64()).
Uint32("shard ID", d.bc.ShardID()).Msg("start short range sync")
n, err = d.doShortRangeSync()
}
if err != nil {
pl := d.promLabels()
pl["error"] = err.Error()
numFailedDownloadCounterVec.With(pl).Inc()
return
}
return
}
func (d *Downloader) startSyncing() {
d.status.startSyncing()
d.evtDownloadStarted.Send(struct{}{})
}
func (d *Downloader) finishSyncing() {
d.status.finishSyncing()
d.evtDownloadFinished.Send(struct{}{})
}