From fecd4fdda3a8e5dfc32e57776b0a0a643fbec58e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Wed, 11 Jan 2023 22:55:15 +0800 Subject: [PATCH] refactor p2p host and routing --- api/service/stagedstreamsync/beacon_helper.go | 14 ++++---- api/service/stagedstreamsync/downloader.go | 22 +++++++----- api/service/stagedstreamsync/stage_epoch.go | 2 +- .../stagedstreamsync/stage_short_range.go | 3 +- .../stagedstreamsync/staged_stream_sync.go | 29 +++++++++------- api/service/stagedstreamsync/syncing.go | 2 ++ cmd/harmony/main.go | 8 ++--- p2p/discovery/discovery.go | 14 ++------ p2p/discovery/option.go | 8 +++-- p2p/host.go | 31 ++++++++++++----- .../common/streammanager/streammanager.go | 9 +++-- p2p/stream/protocols/sync/protocol.go | 34 +++++++++++++++++-- p2p/stream/types/interface.go | 2 ++ 13 files changed, 115 insertions(+), 63 deletions(-) diff --git a/api/service/stagedstreamsync/beacon_helper.go b/api/service/stagedstreamsync/beacon_helper.go index 2f684e0e2..a996f368b 100644 --- a/api/service/stagedstreamsync/beacon_helper.go +++ b/api/service/stagedstreamsync/beacon_helper.go @@ -76,17 +76,19 @@ func (bh *beaconHelper) loop() { case it := <-bh.insertC: inserted, bn, err := bh.insertLastMileBlocks() - numBlocksInsertedBeaconHelperCounter.Add(float64(inserted)) if err != nil { bh.logger.Error().Err(err). Msg(WrapStagedSyncMsg("insert last mile blocks error")) + close(it.doneC) continue } - bh.logger.Info().Int("inserted", inserted). - Uint64("end height", bn). - Uint32("shard", bh.bc.ShardID()). - Msg(WrapStagedSyncMsg("insert last mile blocks")) - + if inserted > 0 { + numBlocksInsertedBeaconHelperCounter.Add(float64(inserted)) + bh.logger.Info().Int("inserted", inserted). + Uint64("end height", bn). + Uint32("shard", bh.bc.ShardID()). + Msg(WrapStagedSyncMsg("insert last mile blocks")) + } close(it.doneC) case <-bh.closeC: diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index a7b7402c2..f53a5c41a 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -24,6 +24,7 @@ type ( syncProtocol syncProtocol bh *beaconHelper stagedSyncInstance *StagedStreamSync + isBeaconNode bool downloadC chan struct{} closeC chan struct{} @@ -67,7 +68,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config ctx, cancel := context.WithCancel(context.Background()) //TODO: use mem db should be in config file - stagedSyncInstance, err := CreateStagedSync(ctx, bc, false, sp, config, logger, config.LogProgress) + stagedSyncInstance, err := CreateStagedSync(ctx, bc, false, isBeaconNode, sp, config, logger, config.LogProgress) if err != nil { cancel() return nil @@ -78,6 +79,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config syncProtocol: sp, bh: bh, stagedSyncInstance: stagedSyncInstance, + isBeaconNode: isBeaconNode, downloadC: make(chan struct{}), closeC: make(chan struct{}), @@ -187,7 +189,11 @@ func (d *Downloader) waitForBootFinish() { func (d *Downloader) loop() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - initSync := true + // for shard chain and beacon chain node, first we start with initSync=true to + // make sure it goes through the long range sync first. + // for epoch chain we do only need to go through epoch sync process + initSync := d.isBeaconNode || d.bc.ShardID() != shard.BeaconChainShardID + trigger := func() { select { case d.downloadC <- struct{}{}: @@ -217,7 +223,7 @@ func (d *Downloader) loop() { } } - // If error happens, sleep 5 seconds and retry + // If any error happens, sleep 5 seconds and retry d.logger.Error(). Err(err). Bool("initSync", initSync). @@ -227,7 +233,7 @@ func (d *Downloader) loop() { trigger() }() time.Sleep(1 * time.Second) - continue + break } if initSync { d.logger.Info().Int("block added", addedBN). @@ -239,11 +245,11 @@ func (d *Downloader) loop() { if addedBN != 0 { // If block number has been changed, trigger another sync - // and try to add last mile from pub-sub (blocking) go trigger() - if d.bh != nil { - d.bh.insertSync() - } + } + // try to add last mile from pub-sub (blocking) + if d.bh != nil { + d.bh.insertSync() } initSync = false diff --git a/api/service/stagedstreamsync/stage_epoch.go b/api/service/stagedstreamsync/stage_epoch.go index 6320d8217..77dc57bfd 100644 --- a/api/service/stagedstreamsync/stage_epoch.go +++ b/api/service/stagedstreamsync/stage_epoch.go @@ -49,7 +49,7 @@ func (sr *StageEpoch) Exec(firstCycle bool, invalidBlockRevert bool, s *StageSta return nil } - if _, ok := sr.configs.bc.(*core.EpochChain); !ok { + if sr.configs.bc.ShardID() != shard.BeaconChainShardID || s.state.isBeaconNode { return nil } diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index cfe29f39e..75f51ee1e 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -6,6 +6,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" ) @@ -50,7 +51,7 @@ func (sr *StageShortRange) Exec(firstCycle bool, invalidBlockRevert bool, s *Sta return nil } - if _, ok := sr.configs.bc.(*core.EpochChain); ok { + if sr.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { return nil } diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 9603f5b06..e73edd622 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -54,19 +54,20 @@ func (ib *InvalidBlock) addBadStream(bsID sttypes.StreamID) { } type StagedStreamSync struct { - ctx context.Context - bc core.BlockChain - isBeacon bool - isExplorer bool - db kv.RwDB - protocol syncProtocol - gbm *blockDownloadManager // initialized when finished get block number - inserted int - config Config - logger zerolog.Logger - status *status //TODO: merge this with currentSyncCycle - initSync bool // if sets to true, node start long range syncing - UseMemDB bool + ctx context.Context + bc core.BlockChain + isBeacon bool + isExplorer bool + db kv.RwDB + protocol syncProtocol + isBeaconNode bool + gbm *blockDownloadManager // initialized when finished get block number + inserted int + config Config + logger zerolog.Logger + status *status //TODO: merge this with currentSyncCycle + initSync bool // if sets to true, node start long range syncing + UseMemDB bool revertPoint *uint64 // used to run stages prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon) @@ -254,6 +255,7 @@ func New(ctx context.Context, stagesList []*Stage, isBeacon bool, protocol syncProtocol, + isBeaconNode bool, useMemDB bool, config Config, logger zerolog.Logger, @@ -291,6 +293,7 @@ func New(ctx context.Context, isBeacon: isBeacon, db: db, protocol: protocol, + isBeaconNode: isBeaconNode, gbm: nil, status: &status, inserted: 0, diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index b793151c6..de9b88481 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -38,6 +38,7 @@ var Buckets = []string{ func CreateStagedSync(ctx context.Context, bc core.BlockChain, UseMemDB bool, + isBeaconNode bool, protocol syncProtocol, config Config, logger zerolog.Logger, @@ -87,6 +88,7 @@ func CreateStagedSync(ctx context.Context, stages, isBeacon, protocol, + isBeaconNode, UseMemDB, config, logger, diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 6c8363667..6ab3ed102 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -878,8 +878,8 @@ func setupPrometheusService(node *node.Node, hc harmonyconfig.HarmonyConfig, sid func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) { blockchains := []core.BlockChain{node.Blockchain()} - if !node.IsRunningBeaconChain() { - blockchains = append(blockchains, node.Beaconchain()) + if node.Blockchain().ShardID() != shard.BeaconChainShardID { + blockchains = append(blockchains, node.EpochChain()) } dConfig := downloader.Config{ @@ -913,8 +913,8 @@ func setupSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyCo func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.HarmonyConfig) { blockchains := []core.BlockChain{node.Blockchain()} - if !node.IsRunningBeaconChain() { - blockchains = append(blockchains, node.Beaconchain()) + if node.Blockchain().ShardID() != shard.BeaconChainShardID { + blockchains = append(blockchains, node.EpochChain()) } sConfig := stagedstreamsync.Config{ diff --git a/p2p/discovery/discovery.go b/p2p/discovery/discovery.go index fb9591c26..53372edd6 100644 --- a/p2p/discovery/discovery.go +++ b/p2p/discovery/discovery.go @@ -5,6 +5,7 @@ import ( "time" "github.com/harmony-one/harmony/internal/utils" + dht "github.com/libp2p/go-libp2p-kad-dht" libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/discovery" libp2p_host "github.com/libp2p/go-libp2p/core/host" @@ -37,19 +38,8 @@ type dhtDiscovery struct { } // NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface. -func NewDHTDiscovery(host libp2p_host.Host, opt DHTConfig) (Discovery, error) { - opts, err := opt.getLibp2pRawOptions() - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) - dht, err := libp2p_dht.New(ctx, host, opts...) - if err != nil { - cancel() - return nil, err - } +func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p_host.Host, dht *dht.IpfsDHT, opt DHTConfig) (Discovery, error) { d := libp2p_dis.NewRoutingDiscovery(dht) - logger := utils.Logger().With().Str("module", "discovery").Logger() return &dhtDiscovery{ dht: dht, diff --git a/p2p/discovery/option.go b/p2p/discovery/option.go index 0afe6b8a2..ce4259270 100644 --- a/p2p/discovery/option.go +++ b/p2p/discovery/option.go @@ -5,6 +5,7 @@ import ( p2ptypes "github.com/harmony-one/harmony/p2p/types" badger "github.com/ipfs/go-ds-badger" + dht "github.com/libp2p/go-libp2p-kad-dht" libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" ) @@ -14,10 +15,11 @@ type DHTConfig struct { BootNodes []string DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes. DiscConcurrency int + DHT *dht.IpfsDHT } -// getLibp2pRawOptions get the raw libp2p options as a slice. -func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) { +// GetLibp2pRawOptions get the raw libp2p options as a slice. +func (opt DHTConfig) GetLibp2pRawOptions() ([]libp2p_dht.Option, error) { var opts []libp2p_dht.Option bootOption, err := getBootstrapOption(opt.BootNodes) @@ -40,6 +42,8 @@ func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) { opts = append(opts, libp2p_dht.Concurrency(opt.DiscConcurrency)) } + opts = append(opts, libp2p_dht.DisableAutoRefresh()) + return opts, nil } diff --git a/p2p/host.go b/p2p/host.go index a2bd5996b..fcd2ea3b4 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -128,7 +128,7 @@ func NewHost(cfg HostConfig) (Host, error) { ctx, cancel := context.WithCancel(context.Background()) // TODO: move low and high to configs connmgr, err := connmgr.NewConnManager( - int(10), // Lowwater + int(10), // LowWater int(10000)*cfg.MaxConnPerIP, // HighWater, connmgr.WithGracePeriod(time.Minute), ) @@ -137,6 +137,7 @@ func NewHost(cfg HostConfig) (Host, error) { return nil, err } var idht *dht.IpfsDHT + var opt discovery.DHTConfig p2pHost, err := libp2p.New( listenAddr, libp2p.Identity(key), @@ -152,33 +153,41 @@ func NewHost(cfg HostConfig) (Host, error) { // Attempt to open ports using uPNP for NATed hosts. libp2p.NATPortMap(), libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - idht, err = dht.New(ctx, h) + opt = discovery.DHTConfig{ + BootNodes: cfg.BootNodes, + DataStoreFile: cfg.DataStoreFile, + DiscConcurrency: cfg.DiscConcurrency, + } + opts, err := opt.GetLibp2pRawOptions() + if err != nil { + return nil, err + } + idht, err = dht.New(ctx, h, opts...) return idht, err }), + // to help other peers to figure out if they are behind // NATs, launch the server-side of AutoNAT too (AutoRelay // already runs the client) // This service is highly rate-limited and should not cause any // performance issues. libp2p.EnableNATService(), + // Bandwidth Reporter libp2p.BandwidthReporter(newCounter()), // ForceReachabilityPublic overrides automatic reachability detection in the AutoNAT subsystem, // forcing the local node to believe it is reachable externally. // libp2p.ForceReachabilityPublic(), - + // libp2p.DisableRelay(), + libp2p.EnableRelayService(), // prevent dialing of public addresses - libp2p.ConnectionGater(NewGater(cfg.DisablePrivateIPScan)), + // libp2p.ConnectionGater(NewGater(cfg.DisablePrivateIPScan)), ) if err != nil { cancel() return nil, errors.Wrapf(err, "cannot initialize libp2p host") } - disc, err := discovery.NewDHTDiscovery(p2pHost, discovery.DHTConfig{ - BootNodes: cfg.BootNodes, - DataStoreFile: cfg.DataStoreFile, - DiscConcurrency: cfg.DiscConcurrency, - }) + disc, err := discovery.NewDHTDiscovery(ctx, cancel, p2pHost, idht, opt) if err != nil { cancel() return nil, errors.Wrap(err, "cannot create DHT discovery") @@ -319,6 +328,10 @@ func (host *HostV2) AddStreamProtocol(protocols ...sttypes.Protocol) { for _, proto := range protocols { host.streamProtos = append(host.streamProtos, proto) host.h.SetStreamHandlerMatch(protocol.ID(proto.ProtoID()), proto.Match, proto.HandleStream) + // TODO: do we need to add handler match for shard proto id? + // if proto.IsBeaconNode() { + // host.h.SetStreamHandlerMatch(protocol.ID(proto.ShardProtoID()), proto.Match, proto.HandleStream) + // } } } diff --git a/p2p/stream/common/streammanager/streammanager.go b/p2p/stream/common/streammanager/streammanager.go index 60d3a5521..064cb0c2d 100644 --- a/p2p/stream/common/streammanager/streammanager.go +++ b/p2p/stream/common/streammanager/streammanager.go @@ -73,7 +73,6 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea Str("protocol ID", string(pid)).Logger() protoSpec, _ := sttypes.ProtoIDToProtoSpec(pid) - fmt.Println("my peer id: ", host.ID().String()) fmt.Println("my proto id: ", pid) @@ -238,9 +237,6 @@ func (sm *streamManager) sanityCheckStream(st sttypes.Stream) error { if mySpec.ShardID != rmSpec.ShardID { return fmt.Errorf("unexpected shard ID: %v/%v", rmSpec.ShardID, mySpec.ShardID) } - if mySpec.ShardID == shard.BeaconChainShardID && !rmSpec.BeaconNode { - return fmt.Errorf("unexpected beacon node with shard ID: %v/%v", rmSpec.ShardID, mySpec.ShardID) - } return nil } @@ -311,7 +307,10 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e connecting := 0 for peer := range peers { - if peer.ID == sm.host.ID() || sm.coolDownCache.Has(peer.ID) { + if peer.ID == sm.host.ID() { + continue + } + if sm.coolDownCache.Has(peer.ID) { // If the peer has the same ID and was just connected, skip. continue } diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index 7b5a1ff2d..2a984a581 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -16,6 +16,7 @@ import ( "github.com/harmony-one/harmony/p2p/stream/common/requestmanager" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/hashicorp/go-version" libp2p_host "github.com/libp2p/go-libp2p/core/host" libp2p_network "github.com/libp2p/go-libp2p/core/network" @@ -107,7 +108,10 @@ func (p *Protocol) Start() { p.sm.Start() p.rm.Start() p.rl.Start() - go p.advertiseLoop() + // If it's not EpochChain, advertise + if p.beaconNode || p.chain.ShardID() != shard.BeaconChainShardID { + go p.advertiseLoop() + } } // Close close the protocol @@ -129,11 +133,21 @@ func (p *Protocol) ProtoID() sttypes.ProtoID { return p.protoIDByVersion(MyVersion) } +// ShardProtoID returns the ProtoID of the sync protocol for shard nodes +func (p *Protocol) ShardProtoID() sttypes.ProtoID { + return p.protoIDByVersionForShardNodes(MyVersion) +} + // Version returns the sync protocol version func (p *Protocol) Version() *version.Version { return MyVersion } +// IsBeaconNode returns true if it is a beacon chain node +func (p *Protocol) IsBeaconNode() bool { + return p.beaconNode +} + // Match checks the compatibility to the target protocol ID. func (p *Protocol) Match(targetID string) bool { target, err := sttypes.ProtoIDToProtoSpec(sttypes.ProtoID(targetID)) @@ -173,9 +187,9 @@ func (p *Protocol) advertiseLoop() { for { sleep := p.advertise() select { - case <-time.After(sleep): case <-p.closeC: return + case <-time.After(sleep): } } } @@ -209,6 +223,11 @@ func (p *Protocol) supportedProtoIDs() []sttypes.ProtoID { pids := make([]sttypes.ProtoID, 0, len(vs)) for _, v := range vs { pids = append(pids, p.protoIDByVersion(v)) + // beacon node needs to inform shard nodes about it supports them as well for EpochChain + // basically beacon node can accept connection from shard nodes to share last epoch blocks + if p.beaconNode { + pids = append(pids, p.protoIDByVersionForShardNodes(v)) + } } return pids } @@ -228,6 +247,17 @@ func (p *Protocol) protoIDByVersion(v *version.Version) sttypes.ProtoID { return spec.ToProtoID() } +func (p *Protocol) protoIDByVersionForShardNodes(v *version.Version) sttypes.ProtoID { + spec := sttypes.ProtoSpec{ + Service: serviceSpecifier, + NetworkType: p.config.Network, + ShardID: p.config.ShardID, + Version: v, + BeaconNode: false, + } + return spec.ToProtoID() +} + // RemoveStream removes the stream of the given stream ID // TODO: add reason to parameters func (p *Protocol) RemoveStream(stID sttypes.StreamID) { diff --git a/p2p/stream/types/interface.go b/p2p/stream/types/interface.go index 424382cc8..d7b60f78c 100644 --- a/p2p/stream/types/interface.go +++ b/p2p/stream/types/interface.go @@ -13,6 +13,8 @@ type Protocol interface { Specifier() string Version() *version.Version ProtoID() ProtoID + // ShardProtoID() ProtoID + IsBeaconNode() bool Match(string) bool HandleStream(st libp2p_network.Stream) }