The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
woop/hmy/downloader/downloader.go

311 lines
7.3 KiB

package downloader
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync"
)
type (
// Downloader is responsible for sync task of one shard
Downloader struct {
bc blockChain
syncProtocol syncProtocol
bh *beaconHelper
downloadC chan struct{}
closeC chan struct{}
ctx context.Context
cancel func()
evtDownloadFinished event.Feed // channel for each download task finished
evtDownloadStarted event.Feed // channel for each download has started
status status
config Config
logger zerolog.Logger
}
)
// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc *core.BlockChain, config Config) *Downloader {
config.fixValues()
sp := sync.NewProtocol(sync.Config{
Chain: bc,
Host: host.GetP2PHost(),
Discovery: host.GetDiscovery(),
ShardID: nodeconfig.ShardID(bc.ShardID()),
Network: config.Network,
SmSoftLowCap: config.SmSoftLowCap,
SmHardLowCap: config.SmHardLowCap,
SmHiCap: config.SmHiCap,
DiscBatch: config.SmDiscBatch,
})
host.AddStreamProtocol(sp)
var bh *beaconHelper
if config.BHConfig != nil && bc.ShardID() == 0 {
bh = newBeaconHelper(bc, config.BHConfig.BlockC, config.BHConfig.InsertHook)
}
ctx, cancel := context.WithCancel(context.Background())
return &Downloader{
bc: bc,
syncProtocol: sp,
bh: bh,
downloadC: make(chan struct{}),
closeC: make(chan struct{}),
ctx: ctx,
cancel: cancel,
status: newStatus(),
config: config,
logger: utils.Logger().With().Str("module", "downloader").Uint32("ShardID", bc.ShardID()).Logger(),
}
}
// Start start the downloader
func (d *Downloader) Start() {
go d.run()
if d.bh != nil {
d.bh.start()
}
}
// Close close the downloader
func (d *Downloader) Close() {
close(d.closeC)
d.cancel()
if d.bh != nil {
d.bh.close()
}
}
// DownloadAsync triggers the download async.
func (d *Downloader) DownloadAsync() {
select {
case d.downloadC <- struct{}{}:
consensusTriggeredDownloadCounterVec.With(d.promLabels()).Inc()
case <-time.After(100 * time.Millisecond):
}
}
// NumPeers returns the number of peers connected of a specific shard.
func (d *Downloader) NumPeers() int {
return d.syncProtocol.NumStreams()
}
// IsSyncing return the current sync status
func (d *Downloader) SyncStatus() (bool, uint64, uint64) {
syncing, target := d.status.get()
if !syncing {
target = d.bc.CurrentBlock().NumberU64()
}
return syncing, target, 0
}
// SubscribeDownloadStarted subscribe download started
func (d *Downloader) SubscribeDownloadStarted(ch chan struct{}) event.Subscription {
return d.evtDownloadStarted.Subscribe(ch)
}
// SubscribeDownloadFinishedEvent subscribe the download finished
func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscription {
return d.evtDownloadFinished.Subscribe(ch)
}
func (d *Downloader) run() {
d.waitForBootFinish()
d.loop()
}
// waitForBootFinish wait for stream manager to finish the initial discovery and have
// enough peers to start downloader
func (d *Downloader) waitForBootFinish() {
evtCh := make(chan streammanager.EvtStreamAdded, 1)
sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh)
defer sub.Unsubscribe()
checkCh := make(chan struct{}, 1)
trigger := func() {
select {
case checkCh <- struct{}{}:
default:
}
}
trigger()
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
d.logger.Info().Msg("waiting for initial bootstrap discovery")
select {
case <-t.C:
trigger()
case <-evtCh:
trigger()
case <-checkCh:
if d.syncProtocol.NumStreams() >= d.config.InitStreams {
return
}
case <-d.closeC:
return
}
}
}
func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
initSync := true
trigger := func() {
select {
case d.downloadC <- struct{}{}:
case <-time.After(100 * time.Millisecond):
}
}
go trigger()
for {
select {
case <-ticker.C:
go trigger()
case <-d.downloadC:
addedBN, err := d.doDownload(initSync)
if err != nil {
// If error happens, sleep 5 seconds and retry
d.logger.Warn().Err(err).Bool("bootstrap", initSync).Msg("failed to download")
go func() {
time.Sleep(5 * time.Second)
trigger()
}()
time.Sleep(1 * time.Second)
continue
}
d.logger.Info().Int("block added", addedBN).
Uint64("current height", d.bc.CurrentBlock().NumberU64()).
Bool("initSync", initSync).
Uint32("shard", d.bc.ShardID()).
Msg("sync finished")
if addedBN != 0 {
// If block number has been changed, trigger another sync
// and try to add last mile from pub-sub (blocking)
go trigger()
if d.bh != nil {
d.bh.insertSync()
}
}
initSync = false
case <-d.closeC:
return
}
}
}
func (d *Downloader) doDownload(initSync bool) (n int, err error) {
if initSync {
d.logger.Info().Uint64("current number", d.bc.CurrentBlock().NumberU64()).
Uint32("shard ID", d.bc.ShardID()).Msg("start long range sync")
n, err = d.doLongRangeSync()
} else {
d.logger.Info().Uint64("current number", d.bc.CurrentBlock().NumberU64()).
Uint32("shard ID", d.bc.ShardID()).Msg("start short range sync")
n, err = d.doShortRangeSync()
}
if err != nil {
pl := d.promLabels()
pl["error"] = err.Error()
numFailedDownloadCounterVec.With(pl).Inc()
return
}
return
}
func (d *Downloader) startSyncing() {
d.status.startSyncing()
d.evtDownloadStarted.Send(struct{}{})
}
func (d *Downloader) finishSyncing() {
d.status.finishSyncing()
d.evtDownloadFinished.Send(struct{}{})
}
var emptySigVerifyErr *sigVerifyErr
type sigVerifyErr struct {
err error
}
func (e *sigVerifyErr) Error() string {
return fmt.Sprintf("[VerifyHeaderSignature] %v", e.err.Error())
}
func verifyAndInsertBlocks(bc blockChain, blocks types.Blocks) (int, error) {
for i, block := range blocks {
if err := verifyAndInsertBlock(bc, block, blocks[i+1:]...); err != nil {
return i, err
}
}
return len(blocks), nil
}
func verifyAndInsertBlock(bc blockChain, block *types.Block, nextBlocks ...*types.Block) error {
var (
sigBytes bls.SerializedSignature
bitmap []byte
err error
)
if len(nextBlocks) > 0 {
// get commit sig from the next block
next := nextBlocks[0]
sigBytes = next.Header().LastCommitSignature()
bitmap = next.Header().LastCommitBitmap()
} else {
// get commit sig from current block
sigBytes, bitmap, err = chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig())
if err != nil {
return errors.Wrap(err, "parse commitSigAndBitmap")
}
}
if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sigBytes, bitmap); err != nil {
return &sigVerifyErr{err}
}
if err := bc.Engine().VerifyHeader(bc, block.Header(), true); err != nil {
return errors.Wrap(err, "[VerifyHeader]")
}
if _, err := bc.InsertChain(types.Blocks{block}, false); err != nil {
return errors.Wrap(err, "[InsertChain]")
}
return nil
}