refactor stream peer cooldown and fix protocol beacon node field

pull/4351/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent cc93332b5d
commit a4a656a6d5
  1. 7
      p2p/stream/common/streammanager/cooldown.go
  2. 4
      p2p/stream/common/streammanager/streammanager.go
  3. 20
      p2p/stream/protocols/sync/protocol.go

@ -30,11 +30,16 @@ func newCoolDownCache() *coolDownCache {
func (cache *coolDownCache) Has(id peer.ID) bool { func (cache *coolDownCache) Has(id peer.ID) bool {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
has := cache.timeCache.Has(string(id))
return has
}
// Add adds the peer ID to the cache
func (cache *coolDownCache) Add(id peer.ID) {
has := cache.timeCache.Has(string(id)) has := cache.timeCache.Has(string(id))
if !has { if !has {
cache.timeCache.Add(string(id)) cache.timeCache.Add(string(id))
} }
return has
} }
// Reset the cool down cache // Reset the cool down cache

@ -315,12 +315,16 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e
// If the peer has the same ID and was just connected, skip. // If the peer has the same ID and was just connected, skip.
continue continue
} }
if _, ok := sm.streams.get(sttypes.StreamID(peer.ID)); ok {
continue
}
discoveredPeersCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc() discoveredPeersCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc()
connecting += 1 connecting += 1
go func(pid libp2p_peer.ID) { go func(pid libp2p_peer.ID) {
// The ctx here is using the module context instead of discover context // The ctx here is using the module context instead of discover context
err := sm.setupStreamWithPeer(sm.ctx, pid) err := sm.setupStreamWithPeer(sm.ctx, pid)
if err != nil { if err != nil {
sm.coolDownCache.Add(peer.ID)
sm.logger.Warn().Err(err).Str("peerID", string(pid)).Msg("failed to setup stream with peer") sm.logger.Warn().Err(err).Str("peerID", string(pid)).Msg("failed to setup stream with peer")
return return
} }

@ -8,7 +8,6 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
@ -17,7 +16,6 @@ import (
"github.com/harmony-one/harmony/p2p/stream/common/requestmanager" "github.com/harmony-one/harmony/p2p/stream/common/requestmanager"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager" "github.com/harmony-one/harmony/p2p/stream/common/streammanager"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
libp2p_host "github.com/libp2p/go-libp2p/core/host" libp2p_host "github.com/libp2p/go-libp2p/core/host"
libp2p_network "github.com/libp2p/go-libp2p/core/network" libp2p_network "github.com/libp2p/go-libp2p/core/network"
@ -60,12 +58,12 @@ type (
// Config is the sync protocol config // Config is the sync protocol config
Config struct { Config struct {
Chain engine.ChainReader Chain engine.ChainReader
Host libp2p_host.Host Host libp2p_host.Host
Discovery discovery.Discovery Discovery discovery.Discovery
ShardID nodeconfig.ShardID ShardID nodeconfig.ShardID
Network nodeconfig.NetworkType Network nodeconfig.NetworkType
BeaconNode bool
// stream manager config // stream manager config
SmSoftLowCap int SmSoftLowCap int
SmHardLowCap int SmHardLowCap int
@ -78,13 +76,9 @@ type (
func NewProtocol(config Config) *Protocol { func NewProtocol(config Config) *Protocol {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
isBeaconNode := config.Chain.ShardID() == shard.BeaconChainShardID
if _, ok := config.Chain.(*core.EpochChain); ok {
isBeaconNode = false
}
sp := &Protocol{ sp := &Protocol{
chain: config.Chain, chain: config.Chain,
beaconNode: isBeaconNode, beaconNode: config.BeaconNode,
disc: config.Discovery, disc: config.Discovery,
config: config, config: config,
ctx: ctx, ctx: ctx,

Loading…
Cancel
Save