refactor p2p host and routing

pull/4377/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent 390c9f99c1
commit 905ff653cc
  1. 14
      api/service/stagedstreamsync/beacon_helper.go
  2. 22
      api/service/stagedstreamsync/downloader.go
  3. 2
      api/service/stagedstreamsync/stage_epoch.go
  4. 3
      api/service/stagedstreamsync/stage_short_range.go
  5. 29
      api/service/stagedstreamsync/staged_stream_sync.go
  6. 2
      api/service/stagedstreamsync/syncing.go
  7. 8
      cmd/harmony/main.go
  8. 14
      p2p/discovery/discovery.go
  9. 8
      p2p/discovery/option.go
  10. 31
      p2p/host.go
  11. 9
      p2p/stream/common/streammanager/streammanager.go
  12. 34
      p2p/stream/protocols/sync/protocol.go
  13. 2
      p2p/stream/types/interface.go

@ -76,17 +76,19 @@ func (bh *beaconHelper) loop() {
case it := <-bh.insertC:
inserted, bn, err := bh.insertLastMileBlocks()
numBlocksInsertedBeaconHelperCounter.Add(float64(inserted))
if err != nil {
bh.logger.Error().Err(err).
Msg(WrapStagedSyncMsg("insert last mile blocks error"))
close(it.doneC)
continue
}
bh.logger.Info().Int("inserted", inserted).
Uint64("end height", bn).
Uint32("shard", bh.bc.ShardID()).
Msg(WrapStagedSyncMsg("insert last mile blocks"))
if inserted > 0 {
numBlocksInsertedBeaconHelperCounter.Add(float64(inserted))
bh.logger.Info().Int("inserted", inserted).
Uint64("end height", bn).
Uint32("shard", bh.bc.ShardID()).
Msg(WrapStagedSyncMsg("insert last mile blocks"))
}
close(it.doneC)
case <-bh.closeC:

@ -24,6 +24,7 @@ type (
syncProtocol syncProtocol
bh *beaconHelper
stagedSyncInstance *StagedStreamSync
isBeaconNode bool
downloadC chan struct{}
closeC chan struct{}
@ -67,7 +68,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config
ctx, cancel := context.WithCancel(context.Background())
//TODO: use mem db should be in config file
stagedSyncInstance, err := CreateStagedSync(ctx, bc, false, sp, config, logger, config.LogProgress)
stagedSyncInstance, err := CreateStagedSync(ctx, bc, false, isBeaconNode, sp, config, logger, config.LogProgress)
if err != nil {
cancel()
return nil
@ -78,6 +79,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config
syncProtocol: sp,
bh: bh,
stagedSyncInstance: stagedSyncInstance,
isBeaconNode: isBeaconNode,
downloadC: make(chan struct{}),
closeC: make(chan struct{}),
@ -187,7 +189,11 @@ func (d *Downloader) waitForBootFinish() {
func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
initSync := true
// for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
initSync := d.isBeaconNode || d.bc.ShardID() != shard.BeaconChainShardID
trigger := func() {
select {
case d.downloadC <- struct{}{}:
@ -217,7 +223,7 @@ func (d *Downloader) loop() {
}
}
// If error happens, sleep 5 seconds and retry
// If any error happens, sleep 5 seconds and retry
d.logger.Error().
Err(err).
Bool("initSync", initSync).
@ -227,7 +233,7 @@ func (d *Downloader) loop() {
trigger()
}()
time.Sleep(1 * time.Second)
continue
break
}
if initSync {
d.logger.Info().Int("block added", addedBN).
@ -239,11 +245,11 @@ func (d *Downloader) loop() {
if addedBN != 0 {
// If block number has been changed, trigger another sync
// and try to add last mile from pub-sub (blocking)
go trigger()
if d.bh != nil {
d.bh.insertSync()
}
}
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
}
initSync = false

@ -49,7 +49,7 @@ func (sr *StageEpoch) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
return nil
}
if _, ok := sr.configs.bc.(*core.EpochChain); !ok {
if sr.configs.bc.ShardID() != shard.BeaconChainShardID || s.state.isBeaconNode {
return nil
}

@ -6,6 +6,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
)
@ -50,7 +51,7 @@ func (sr *StageShortRange) Exec(firstCycle bool, invalidBlockRevert bool, s *Sta
return nil
}
if _, ok := sr.configs.bc.(*core.EpochChain); ok {
if sr.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}

@ -54,19 +54,20 @@ func (ib *InvalidBlock) addBadStream(bsID sttypes.StreamID) {
}
type StagedStreamSync struct {
ctx context.Context
bc core.BlockChain
isBeacon bool
isExplorer bool
db kv.RwDB
protocol syncProtocol
gbm *blockDownloadManager // initialized when finished get block number
inserted int
config Config
logger zerolog.Logger
status *status //TODO: merge this with currentSyncCycle
initSync bool // if sets to true, node start long range syncing
UseMemDB bool
ctx context.Context
bc core.BlockChain
isBeacon bool
isExplorer bool
db kv.RwDB
protocol syncProtocol
isBeaconNode bool
gbm *blockDownloadManager // initialized when finished get block number
inserted int
config Config
logger zerolog.Logger
status *status //TODO: merge this with currentSyncCycle
initSync bool // if sets to true, node start long range syncing
UseMemDB bool
revertPoint *uint64 // used to run stages
prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon)
@ -254,6 +255,7 @@ func New(ctx context.Context,
stagesList []*Stage,
isBeacon bool,
protocol syncProtocol,
isBeaconNode bool,
useMemDB bool,
config Config,
logger zerolog.Logger,
@ -291,6 +293,7 @@ func New(ctx context.Context,
isBeacon: isBeacon,
db: db,
protocol: protocol,
isBeaconNode: isBeaconNode,
gbm: nil,
status: &status,
inserted: 0,

@ -38,6 +38,7 @@ var Buckets = []string{
func CreateStagedSync(ctx context.Context,
bc core.BlockChain,
UseMemDB bool,
isBeaconNode bool,
protocol syncProtocol,
config Config,
logger zerolog.Logger,
@ -87,6 +88,7 @@ func CreateStagedSync(ctx context.Context,
stages,
isBeacon,
protocol,
isBeaconNode,
UseMemDB,
config,
logger,

@ -878,8 +878,8 @@ func setupPrometheusService(node *node.Node, hc harmonyconfig.HarmonyConfig, sid
func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) {
blockchains := []core.BlockChain{node.Blockchain()}
if !node.IsRunningBeaconChain() {
blockchains = append(blockchains, node.Beaconchain())
if node.Blockchain().ShardID() != shard.BeaconChainShardID {
blockchains = append(blockchains, node.EpochChain())
}
dConfig := downloader.Config{
@ -913,8 +913,8 @@ func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyCo
func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) {
blockchains := []core.BlockChain{node.Blockchain()}
if !node.IsRunningBeaconChain() {
blockchains = append(blockchains, node.Beaconchain())
if node.Blockchain().ShardID() != shard.BeaconChainShardID {
blockchains = append(blockchains, node.EpochChain())
}
sConfig := stagedstreamsync.Config{

@ -5,6 +5,7 @@ import (
"time"
"github.com/harmony-one/harmony/internal/utils"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2p_dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/discovery"
libp2p_host "github.com/libp2p/go-libp2p/core/host"
@ -37,19 +38,8 @@ type dhtDiscovery struct {
}
// NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface.
func NewDHTDiscovery(host libp2p_host.Host, opt DHTConfig) (Discovery, error) {
opts, err := opt.getLibp2pRawOptions()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
dht, err := libp2p_dht.New(ctx, host, opts...)
if err != nil {
cancel()
return nil, err
}
func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p_host.Host, dht *dht.IpfsDHT, opt DHTConfig) (Discovery, error) {
d := libp2p_dis.NewRoutingDiscovery(dht)
logger := utils.Logger().With().Str("module", "discovery").Logger()
return &dhtDiscovery{
dht: dht,

@ -5,6 +5,7 @@ import (
p2ptypes "github.com/harmony-one/harmony/p2p/types"
badger "github.com/ipfs/go-ds-badger"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2p_dht "github.com/libp2p/go-libp2p-kad-dht"
)
@ -14,10 +15,11 @@ type DHTConfig struct {
BootNodes []string
DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes.
DiscConcurrency int
DHT *dht.IpfsDHT
}
// getLibp2pRawOptions get the raw libp2p options as a slice.
func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) {
// GetLibp2pRawOptions get the raw libp2p options as a slice.
func (opt DHTConfig) GetLibp2pRawOptions() ([]libp2p_dht.Option, error) {
var opts []libp2p_dht.Option
bootOption, err := getBootstrapOption(opt.BootNodes)
@ -40,6 +42,8 @@ func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) {
opts = append(opts, libp2p_dht.Concurrency(opt.DiscConcurrency))
}
opts = append(opts, libp2p_dht.DisableAutoRefresh())
return opts, nil
}

@ -128,7 +128,7 @@ func NewHost(cfg HostConfig) (Host, error) {
ctx, cancel := context.WithCancel(context.Background())
// TODO: move low and high to configs
connmgr, err := connmgr.NewConnManager(
int(10), // Lowwater
int(10), // LowWater
int(10000)*cfg.MaxConnPerIP, // HighWater,
connmgr.WithGracePeriod(time.Minute),
)
@ -137,6 +137,7 @@ func NewHost(cfg HostConfig) (Host, error) {
return nil, err
}
var idht *dht.IpfsDHT
var opt discovery.DHTConfig
p2pHost, err := libp2p.New(
listenAddr,
libp2p.Identity(key),
@ -152,33 +153,41 @@ func NewHost(cfg HostConfig) (Host, error) {
// Attempt to open ports using uPNP for NATed hosts.
libp2p.NATPortMap(),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
idht, err = dht.New(ctx, h)
opt = discovery.DHTConfig{
BootNodes: cfg.BootNodes,
DataStoreFile: cfg.DataStoreFile,
DiscConcurrency: cfg.DiscConcurrency,
}
opts, err := opt.GetLibp2pRawOptions()
if err != nil {
return nil, err
}
idht, err = dht.New(ctx, h, opts...)
return idht, err
}),
// to help other peers to figure out if they are behind
// NATs, launch the server-side of AutoNAT too (AutoRelay
// already runs the client)
// This service is highly rate-limited and should not cause any
// performance issues.
libp2p.EnableNATService(),
// Bandwidth Reporter
libp2p.BandwidthReporter(newCounter()),
// ForceReachabilityPublic overrides automatic reachability detection in the AutoNAT subsystem,
// forcing the local node to believe it is reachable externally.
// libp2p.ForceReachabilityPublic(),
// libp2p.DisableRelay(),
libp2p.EnableRelayService(),
// prevent dialing of public addresses
libp2p.ConnectionGater(NewGater(cfg.DisablePrivateIPScan)),
// libp2p.ConnectionGater(NewGater(cfg.DisablePrivateIPScan)),
)
if err != nil {
cancel()
return nil, errors.Wrapf(err, "cannot initialize libp2p host")
}
disc, err := discovery.NewDHTDiscovery(p2pHost, discovery.DHTConfig{
BootNodes: cfg.BootNodes,
DataStoreFile: cfg.DataStoreFile,
DiscConcurrency: cfg.DiscConcurrency,
})
disc, err := discovery.NewDHTDiscovery(ctx, cancel, p2pHost, idht, opt)
if err != nil {
cancel()
return nil, errors.Wrap(err, "cannot create DHT discovery")
@ -319,6 +328,10 @@ func (host *HostV2) AddStreamProtocol(protocols ...sttypes.Protocol) {
for _, proto := range protocols {
host.streamProtos = append(host.streamProtos, proto)
host.h.SetStreamHandlerMatch(protocol.ID(proto.ProtoID()), proto.Match, proto.HandleStream)
// TODO: do we need to add handler match for shard proto id?
// if proto.IsBeaconNode() {
// host.h.SetStreamHandlerMatch(protocol.ID(proto.ShardProtoID()), proto.Match, proto.HandleStream)
// }
}
}

@ -73,7 +73,6 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea
Str("protocol ID", string(pid)).Logger()
protoSpec, _ := sttypes.ProtoIDToProtoSpec(pid)
fmt.Println("my peer id: ", host.ID().String())
fmt.Println("my proto id: ", pid)
@ -238,9 +237,6 @@ func (sm *streamManager) sanityCheckStream(st sttypes.Stream) error {
if mySpec.ShardID != rmSpec.ShardID {
return fmt.Errorf("unexpected shard ID: %v/%v", rmSpec.ShardID, mySpec.ShardID)
}
if mySpec.ShardID == shard.BeaconChainShardID && !rmSpec.BeaconNode {
return fmt.Errorf("unexpected beacon node with shard ID: %v/%v", rmSpec.ShardID, mySpec.ShardID)
}
return nil
}
@ -311,7 +307,10 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e
connecting := 0
for peer := range peers {
if peer.ID == sm.host.ID() || sm.coolDownCache.Has(peer.ID) {
if peer.ID == sm.host.ID() {
continue
}
if sm.coolDownCache.Has(peer.ID) {
// If the peer has the same ID and was just connected, skip.
continue
}

@ -16,6 +16,7 @@ import (
"github.com/harmony-one/harmony/p2p/stream/common/requestmanager"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/hashicorp/go-version"
libp2p_host "github.com/libp2p/go-libp2p/core/host"
libp2p_network "github.com/libp2p/go-libp2p/core/network"
@ -107,7 +108,10 @@ func (p *Protocol) Start() {
p.sm.Start()
p.rm.Start()
p.rl.Start()
go p.advertiseLoop()
// If it's not EpochChain, advertise
if p.beaconNode || p.chain.ShardID() != shard.BeaconChainShardID {
go p.advertiseLoop()
}
}
// Close close the protocol
@ -129,11 +133,21 @@ func (p *Protocol) ProtoID() sttypes.ProtoID {
return p.protoIDByVersion(MyVersion)
}
// ShardProtoID returns the ProtoID of the sync protocol for shard nodes
func (p *Protocol) ShardProtoID() sttypes.ProtoID {
return p.protoIDByVersionForShardNodes(MyVersion)
}
// Version returns the sync protocol version
func (p *Protocol) Version() *version.Version {
return MyVersion
}
// IsBeaconNode returns true if it is a beacon chain node
func (p *Protocol) IsBeaconNode() bool {
return p.beaconNode
}
// Match checks the compatibility to the target protocol ID.
func (p *Protocol) Match(targetID string) bool {
target, err := sttypes.ProtoIDToProtoSpec(sttypes.ProtoID(targetID))
@ -173,9 +187,9 @@ func (p *Protocol) advertiseLoop() {
for {
sleep := p.advertise()
select {
case <-time.After(sleep):
case <-p.closeC:
return
case <-time.After(sleep):
}
}
}
@ -209,6 +223,11 @@ func (p *Protocol) supportedProtoIDs() []sttypes.ProtoID {
pids := make([]sttypes.ProtoID, 0, len(vs))
for _, v := range vs {
pids = append(pids, p.protoIDByVersion(v))
// beacon node needs to inform shard nodes about it supports them as well for EpochChain
// basically beacon node can accept connection from shard nodes to share last epoch blocks
if p.beaconNode {
pids = append(pids, p.protoIDByVersionForShardNodes(v))
}
}
return pids
}
@ -228,6 +247,17 @@ func (p *Protocol) protoIDByVersion(v *version.Version) sttypes.ProtoID {
return spec.ToProtoID()
}
func (p *Protocol) protoIDByVersionForShardNodes(v *version.Version) sttypes.ProtoID {
spec := sttypes.ProtoSpec{
Service: serviceSpecifier,
NetworkType: p.config.Network,
ShardID: p.config.ShardID,
Version: v,
BeaconNode: false,
}
return spec.ToProtoID()
}
// RemoveStream removes the stream of the given stream ID
// TODO: add reason to parameters
func (p *Protocol) RemoveStream(stID sttypes.StreamID) {

@ -13,6 +13,8 @@ type Protocol interface {
Specifier() string
Version() *version.Version
ProtoID() ProtoID
// ShardProtoID() ProtoID
IsBeaconNode() bool
Match(string) bool
HandleStream(st libp2p_network.Stream)
}

Loading…
Cancel
Save