refactor p2p host and routing

pull/4351/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent a4a656a6d5
commit fecd4fdda3
  1. 6
      api/service/stagedstreamsync/beacon_helper.go
  2. 18
      api/service/stagedstreamsync/downloader.go
  3. 2
      api/service/stagedstreamsync/stage_epoch.go
  4. 3
      api/service/stagedstreamsync/stage_short_range.go
  5. 3
      api/service/stagedstreamsync/staged_stream_sync.go
  6. 2
      api/service/stagedstreamsync/syncing.go
  7. 8
      cmd/harmony/main.go
  8. 14
      p2p/discovery/discovery.go
  9. 8
      p2p/discovery/option.go
  10. 31
      p2p/host.go
  11. 9
      p2p/stream/common/streammanager/streammanager.go
  12. 32
      p2p/stream/protocols/sync/protocol.go
  13. 2
      p2p/stream/types/interface.go

@ -76,17 +76,19 @@ func (bh *beaconHelper) loop() {
case it := <-bh.insertC: case it := <-bh.insertC:
inserted, bn, err := bh.insertLastMileBlocks() inserted, bn, err := bh.insertLastMileBlocks()
numBlocksInsertedBeaconHelperCounter.Add(float64(inserted))
if err != nil { if err != nil {
bh.logger.Error().Err(err). bh.logger.Error().Err(err).
Msg(WrapStagedSyncMsg("insert last mile blocks error")) Msg(WrapStagedSyncMsg("insert last mile blocks error"))
close(it.doneC)
continue continue
} }
if inserted > 0 {
numBlocksInsertedBeaconHelperCounter.Add(float64(inserted))
bh.logger.Info().Int("inserted", inserted). bh.logger.Info().Int("inserted", inserted).
Uint64("end height", bn). Uint64("end height", bn).
Uint32("shard", bh.bc.ShardID()). Uint32("shard", bh.bc.ShardID()).
Msg(WrapStagedSyncMsg("insert last mile blocks")) Msg(WrapStagedSyncMsg("insert last mile blocks"))
}
close(it.doneC) close(it.doneC)
case <-bh.closeC: case <-bh.closeC:

@ -24,6 +24,7 @@ type (
syncProtocol syncProtocol syncProtocol syncProtocol
bh *beaconHelper bh *beaconHelper
stagedSyncInstance *StagedStreamSync stagedSyncInstance *StagedStreamSync
isBeaconNode bool
downloadC chan struct{} downloadC chan struct{}
closeC chan struct{} closeC chan struct{}
@ -67,7 +68,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
//TODO: use mem db should be in config file //TODO: use mem db should be in config file
stagedSyncInstance, err := CreateStagedSync(ctx, bc, false, sp, config, logger, config.LogProgress) stagedSyncInstance, err := CreateStagedSync(ctx, bc, false, isBeaconNode, sp, config, logger, config.LogProgress)
if err != nil { if err != nil {
cancel() cancel()
return nil return nil
@ -78,6 +79,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config
syncProtocol: sp, syncProtocol: sp,
bh: bh, bh: bh,
stagedSyncInstance: stagedSyncInstance, stagedSyncInstance: stagedSyncInstance,
isBeaconNode: isBeaconNode,
downloadC: make(chan struct{}), downloadC: make(chan struct{}),
closeC: make(chan struct{}), closeC: make(chan struct{}),
@ -187,7 +189,11 @@ func (d *Downloader) waitForBootFinish() {
func (d *Downloader) loop() { func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second) ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop() defer ticker.Stop()
initSync := true // for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
initSync := d.isBeaconNode || d.bc.ShardID() != shard.BeaconChainShardID
trigger := func() { trigger := func() {
select { select {
case d.downloadC <- struct{}{}: case d.downloadC <- struct{}{}:
@ -217,7 +223,7 @@ func (d *Downloader) loop() {
} }
} }
// If error happens, sleep 5 seconds and retry // If any error happens, sleep 5 seconds and retry
d.logger.Error(). d.logger.Error().
Err(err). Err(err).
Bool("initSync", initSync). Bool("initSync", initSync).
@ -227,7 +233,7 @@ func (d *Downloader) loop() {
trigger() trigger()
}() }()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
continue break
} }
if initSync { if initSync {
d.logger.Info().Int("block added", addedBN). d.logger.Info().Int("block added", addedBN).
@ -239,12 +245,12 @@ func (d *Downloader) loop() {
if addedBN != 0 { if addedBN != 0 {
// If block number has been changed, trigger another sync // If block number has been changed, trigger another sync
// and try to add last mile from pub-sub (blocking)
go trigger() go trigger()
}
// try to add last mile from pub-sub (blocking)
if d.bh != nil { if d.bh != nil {
d.bh.insertSync() d.bh.insertSync()
} }
}
initSync = false initSync = false
case <-d.closeC: case <-d.closeC:

@ -49,7 +49,7 @@ func (sr *StageEpoch) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta
return nil return nil
} }
if _, ok := sr.configs.bc.(*core.EpochChain); !ok { if sr.configs.bc.ShardID() != shard.BeaconChainShardID || s.state.isBeaconNode {
return nil return nil
} }

@ -6,6 +6,7 @@ import (
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -50,7 +51,7 @@ func (sr *StageShortRange) Exec(firstCycle bool, invalidBlockRevert bool, s *Sta
return nil return nil
} }
if _, ok := sr.configs.bc.(*core.EpochChain); ok { if sr.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil return nil
} }

@ -60,6 +60,7 @@ type StagedStreamSync struct {
isExplorer bool isExplorer bool
db kv.RwDB db kv.RwDB
protocol syncProtocol protocol syncProtocol
isBeaconNode bool
gbm *blockDownloadManager // initialized when finished get block number gbm *blockDownloadManager // initialized when finished get block number
inserted int inserted int
config Config config Config
@ -254,6 +255,7 @@ func New(ctx context.Context,
stagesList []*Stage, stagesList []*Stage,
isBeacon bool, isBeacon bool,
protocol syncProtocol, protocol syncProtocol,
isBeaconNode bool,
useMemDB bool, useMemDB bool,
config Config, config Config,
logger zerolog.Logger, logger zerolog.Logger,
@ -291,6 +293,7 @@ func New(ctx context.Context,
isBeacon: isBeacon, isBeacon: isBeacon,
db: db, db: db,
protocol: protocol, protocol: protocol,
isBeaconNode: isBeaconNode,
gbm: nil, gbm: nil,
status: &status, status: &status,
inserted: 0, inserted: 0,

@ -38,6 +38,7 @@ var Buckets = []string{
func CreateStagedSync(ctx context.Context, func CreateStagedSync(ctx context.Context,
bc core.BlockChain, bc core.BlockChain,
UseMemDB bool, UseMemDB bool,
isBeaconNode bool,
protocol syncProtocol, protocol syncProtocol,
config Config, config Config,
logger zerolog.Logger, logger zerolog.Logger,
@ -87,6 +88,7 @@ func CreateStagedSync(ctx context.Context,
stages, stages,
isBeacon, isBeacon,
protocol, protocol,
isBeaconNode,
UseMemDB, UseMemDB,
config, config,
logger, logger,

@ -878,8 +878,8 @@ func setupPrometheusService(node *node.Node, hc harmonyconfig.HarmonyConfig, sid
func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) { func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) {
blockchains := []core.BlockChain{node.Blockchain()} blockchains := []core.BlockChain{node.Blockchain()}
if !node.IsRunningBeaconChain() { if node.Blockchain().ShardID() != shard.BeaconChainShardID {
blockchains = append(blockchains, node.Beaconchain()) blockchains = append(blockchains, node.EpochChain())
} }
dConfig := downloader.Config{ dConfig := downloader.Config{
@ -913,8 +913,8 @@ func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyCo
func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) { func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) {
blockchains := []core.BlockChain{node.Blockchain()} blockchains := []core.BlockChain{node.Blockchain()}
if !node.IsRunningBeaconChain() { if node.Blockchain().ShardID() != shard.BeaconChainShardID {
blockchains = append(blockchains, node.Beaconchain()) blockchains = append(blockchains, node.EpochChain())
} }
sConfig := stagedstreamsync.Config{ sConfig := stagedstreamsync.Config{

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" libp2p_dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/discovery"
libp2p_host "github.com/libp2p/go-libp2p/core/host" libp2p_host "github.com/libp2p/go-libp2p/core/host"
@ -37,19 +38,8 @@ type dhtDiscovery struct {
} }
// NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface. // NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface.
func NewDHTDiscovery(host libp2p_host.Host, opt DHTConfig) (Discovery, error) { func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p_host.Host, dht *dht.IpfsDHT, opt DHTConfig) (Discovery, error) {
opts, err := opt.getLibp2pRawOptions()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
dht, err := libp2p_dht.New(ctx, host, opts...)
if err != nil {
cancel()
return nil, err
}
d := libp2p_dis.NewRoutingDiscovery(dht) d := libp2p_dis.NewRoutingDiscovery(dht)
logger := utils.Logger().With().Str("module", "discovery").Logger() logger := utils.Logger().With().Str("module", "discovery").Logger()
return &dhtDiscovery{ return &dhtDiscovery{
dht: dht, dht: dht,

@ -5,6 +5,7 @@ import (
p2ptypes "github.com/harmony-one/harmony/p2p/types" p2ptypes "github.com/harmony-one/harmony/p2p/types"
badger "github.com/ipfs/go-ds-badger" badger "github.com/ipfs/go-ds-badger"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" libp2p_dht "github.com/libp2p/go-libp2p-kad-dht"
) )
@ -14,10 +15,11 @@ type DHTConfig struct {
BootNodes []string BootNodes []string
DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes. DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes.
DiscConcurrency int DiscConcurrency int
DHT *dht.IpfsDHT
} }
// getLibp2pRawOptions get the raw libp2p options as a slice. // GetLibp2pRawOptions get the raw libp2p options as a slice.
func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) { func (opt DHTConfig) GetLibp2pRawOptions() ([]libp2p_dht.Option, error) {
var opts []libp2p_dht.Option var opts []libp2p_dht.Option
bootOption, err := getBootstrapOption(opt.BootNodes) bootOption, err := getBootstrapOption(opt.BootNodes)
@ -40,6 +42,8 @@ func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) {
opts = append(opts, libp2p_dht.Concurrency(opt.DiscConcurrency)) opts = append(opts, libp2p_dht.Concurrency(opt.DiscConcurrency))
} }
opts = append(opts, libp2p_dht.DisableAutoRefresh())
return opts, nil return opts, nil
} }

@ -128,7 +128,7 @@ func NewHost(cfg HostConfig) (Host, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// TODO: move low and high to configs // TODO: move low and high to configs
connmgr, err := connmgr.NewConnManager( connmgr, err := connmgr.NewConnManager(
int(10), // Lowwater int(10), // LowWater
int(10000)*cfg.MaxConnPerIP, // HighWater, int(10000)*cfg.MaxConnPerIP, // HighWater,
connmgr.WithGracePeriod(time.Minute), connmgr.WithGracePeriod(time.Minute),
) )
@ -137,6 +137,7 @@ func NewHost(cfg HostConfig) (Host, error) {
return nil, err return nil, err
} }
var idht *dht.IpfsDHT var idht *dht.IpfsDHT
var opt discovery.DHTConfig
p2pHost, err := libp2p.New( p2pHost, err := libp2p.New(
listenAddr, listenAddr,
libp2p.Identity(key), libp2p.Identity(key),
@ -152,33 +153,41 @@ func NewHost(cfg HostConfig) (Host, error) {
// Attempt to open ports using uPNP for NATed hosts. // Attempt to open ports using uPNP for NATed hosts.
libp2p.NATPortMap(), libp2p.NATPortMap(),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
idht, err = dht.New(ctx, h) opt = discovery.DHTConfig{
BootNodes: cfg.BootNodes,
DataStoreFile: cfg.DataStoreFile,
DiscConcurrency: cfg.DiscConcurrency,
}
opts, err := opt.GetLibp2pRawOptions()
if err != nil {
return nil, err
}
idht, err = dht.New(ctx, h, opts...)
return idht, err return idht, err
}), }),
// to help other peers to figure out if they are behind // to help other peers to figure out if they are behind
// NATs, launch the server-side of AutoNAT too (AutoRelay // NATs, launch the server-side of AutoNAT too (AutoRelay
// already runs the client) // already runs the client)
// This service is highly rate-limited and should not cause any // This service is highly rate-limited and should not cause any
// performance issues. // performance issues.
libp2p.EnableNATService(), libp2p.EnableNATService(),
// Bandwidth Reporter
libp2p.BandwidthReporter(newCounter()), libp2p.BandwidthReporter(newCounter()),
// ForceReachabilityPublic overrides automatic reachability detection in the AutoNAT subsystem, // ForceReachabilityPublic overrides automatic reachability detection in the AutoNAT subsystem,
// forcing the local node to believe it is reachable externally. // forcing the local node to believe it is reachable externally.
// libp2p.ForceReachabilityPublic(), // libp2p.ForceReachabilityPublic(),
// libp2p.DisableRelay(),
libp2p.EnableRelayService(),
// prevent dialing of public addresses // prevent dialing of public addresses
libp2p.ConnectionGater(NewGater(cfg.DisablePrivateIPScan)), // libp2p.ConnectionGater(NewGater(cfg.DisablePrivateIPScan)),
) )
if err != nil { if err != nil {
cancel() cancel()
return nil, errors.Wrapf(err, "cannot initialize libp2p host") return nil, errors.Wrapf(err, "cannot initialize libp2p host")
} }
disc, err := discovery.NewDHTDiscovery(p2pHost, discovery.DHTConfig{ disc, err := discovery.NewDHTDiscovery(ctx, cancel, p2pHost, idht, opt)
BootNodes: cfg.BootNodes,
DataStoreFile: cfg.DataStoreFile,
DiscConcurrency: cfg.DiscConcurrency,
})
if err != nil { if err != nil {
cancel() cancel()
return nil, errors.Wrap(err, "cannot create DHT discovery") return nil, errors.Wrap(err, "cannot create DHT discovery")
@ -319,6 +328,10 @@ func (host *HostV2) AddStreamProtocol(protocols ...sttypes.Protocol) {
for _, proto := range protocols { for _, proto := range protocols {
host.streamProtos = append(host.streamProtos, proto) host.streamProtos = append(host.streamProtos, proto)
host.h.SetStreamHandlerMatch(protocol.ID(proto.ProtoID()), proto.Match, proto.HandleStream) host.h.SetStreamHandlerMatch(protocol.ID(proto.ProtoID()), proto.Match, proto.HandleStream)
// TODO: do we need to add handler match for shard proto id?
// if proto.IsBeaconNode() {
// host.h.SetStreamHandlerMatch(protocol.ID(proto.ShardProtoID()), proto.Match, proto.HandleStream)
// }
} }
} }

@ -73,7 +73,6 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea
Str("protocol ID", string(pid)).Logger() Str("protocol ID", string(pid)).Logger()
protoSpec, _ := sttypes.ProtoIDToProtoSpec(pid) protoSpec, _ := sttypes.ProtoIDToProtoSpec(pid)
fmt.Println("my peer id: ", host.ID().String()) fmt.Println("my peer id: ", host.ID().String())
fmt.Println("my proto id: ", pid) fmt.Println("my proto id: ", pid)
@ -238,9 +237,6 @@ func (sm *streamManager) sanityCheckStream(st sttypes.Stream) error {
if mySpec.ShardID != rmSpec.ShardID { if mySpec.ShardID != rmSpec.ShardID {
return fmt.Errorf("unexpected shard ID: %v/%v", rmSpec.ShardID, mySpec.ShardID) return fmt.Errorf("unexpected shard ID: %v/%v", rmSpec.ShardID, mySpec.ShardID)
} }
if mySpec.ShardID == shard.BeaconChainShardID && !rmSpec.BeaconNode {
return fmt.Errorf("unexpected beacon node with shard ID: %v/%v", rmSpec.ShardID, mySpec.ShardID)
}
return nil return nil
} }
@ -311,7 +307,10 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e
connecting := 0 connecting := 0
for peer := range peers { for peer := range peers {
if peer.ID == sm.host.ID() || sm.coolDownCache.Has(peer.ID) { if peer.ID == sm.host.ID() {
continue
}
if sm.coolDownCache.Has(peer.ID) {
// If the peer has the same ID and was just connected, skip. // If the peer has the same ID and was just connected, skip.
continue continue
} }

@ -16,6 +16,7 @@ import (
"github.com/harmony-one/harmony/p2p/stream/common/requestmanager" "github.com/harmony-one/harmony/p2p/stream/common/requestmanager"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager" "github.com/harmony-one/harmony/p2p/stream/common/streammanager"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
libp2p_host "github.com/libp2p/go-libp2p/core/host" libp2p_host "github.com/libp2p/go-libp2p/core/host"
libp2p_network "github.com/libp2p/go-libp2p/core/network" libp2p_network "github.com/libp2p/go-libp2p/core/network"
@ -107,7 +108,10 @@ func (p *Protocol) Start() {
p.sm.Start() p.sm.Start()
p.rm.Start() p.rm.Start()
p.rl.Start() p.rl.Start()
// If it's not EpochChain, advertise
if p.beaconNode || p.chain.ShardID() != shard.BeaconChainShardID {
go p.advertiseLoop() go p.advertiseLoop()
}
} }
// Close close the protocol // Close close the protocol
@ -129,11 +133,21 @@ func (p *Protocol) ProtoID() sttypes.ProtoID {
return p.protoIDByVersion(MyVersion) return p.protoIDByVersion(MyVersion)
} }
// ShardProtoID returns the ProtoID of the sync protocol for shard nodes
func (p *Protocol) ShardProtoID() sttypes.ProtoID {
return p.protoIDByVersionForShardNodes(MyVersion)
}
// Version returns the sync protocol version // Version returns the sync protocol version
func (p *Protocol) Version() *version.Version { func (p *Protocol) Version() *version.Version {
return MyVersion return MyVersion
} }
// IsBeaconNode returns true if it is a beacon chain node
func (p *Protocol) IsBeaconNode() bool {
return p.beaconNode
}
// Match checks the compatibility to the target protocol ID. // Match checks the compatibility to the target protocol ID.
func (p *Protocol) Match(targetID string) bool { func (p *Protocol) Match(targetID string) bool {
target, err := sttypes.ProtoIDToProtoSpec(sttypes.ProtoID(targetID)) target, err := sttypes.ProtoIDToProtoSpec(sttypes.ProtoID(targetID))
@ -173,9 +187,9 @@ func (p *Protocol) advertiseLoop() {
for { for {
sleep := p.advertise() sleep := p.advertise()
select { select {
case <-time.After(sleep):
case <-p.closeC: case <-p.closeC:
return return
case <-time.After(sleep):
} }
} }
} }
@ -209,6 +223,11 @@ func (p *Protocol) supportedProtoIDs() []sttypes.ProtoID {
pids := make([]sttypes.ProtoID, 0, len(vs)) pids := make([]sttypes.ProtoID, 0, len(vs))
for _, v := range vs { for _, v := range vs {
pids = append(pids, p.protoIDByVersion(v)) pids = append(pids, p.protoIDByVersion(v))
// beacon node needs to inform shard nodes about it supports them as well for EpochChain
// basically beacon node can accept connection from shard nodes to share last epoch blocks
if p.beaconNode {
pids = append(pids, p.protoIDByVersionForShardNodes(v))
}
} }
return pids return pids
} }
@ -228,6 +247,17 @@ func (p *Protocol) protoIDByVersion(v *version.Version) sttypes.ProtoID {
return spec.ToProtoID() return spec.ToProtoID()
} }
func (p *Protocol) protoIDByVersionForShardNodes(v *version.Version) sttypes.ProtoID {
spec := sttypes.ProtoSpec{
Service: serviceSpecifier,
NetworkType: p.config.Network,
ShardID: p.config.ShardID,
Version: v,
BeaconNode: false,
}
return spec.ToProtoID()
}
// RemoveStream removes the stream of the given stream ID // RemoveStream removes the stream of the given stream ID
// TODO: add reason to parameters // TODO: add reason to parameters
func (p *Protocol) RemoveStream(stID sttypes.StreamID) { func (p *Protocol) RemoveStream(stID sttypes.StreamID) {

@ -13,6 +13,8 @@ type Protocol interface {
Specifier() string Specifier() string
Version() *version.Version Version() *version.Version
ProtoID() ProtoID ProtoID() ProtoID
// ShardProtoID() ProtoID
IsBeaconNode() bool
Match(string) bool Match(string) bool
HandleStream(st libp2p_network.Stream) HandleStream(st libp2p_network.Stream)
} }

Loading…
Cancel
Save