diff --git a/p2p/stream/common/streammanager/cooldown.go b/p2p/stream/common/streammanager/cooldown.go index d8b58346c..f22f4956f 100644 --- a/p2p/stream/common/streammanager/cooldown.go +++ b/p2p/stream/common/streammanager/cooldown.go @@ -30,11 +30,16 @@ func newCoolDownCache() *coolDownCache { func (cache *coolDownCache) Has(id peer.ID) bool { cache.mu.Lock() defer cache.mu.Unlock() + has := cache.timeCache.Has(string(id)) + return has +} + +// Add adds the peer ID to the cache +func (cache *coolDownCache) Add(id peer.ID) { has := cache.timeCache.Has(string(id)) if !has { cache.timeCache.Add(string(id)) } - return has } // Reset the cool down cache diff --git a/p2p/stream/common/streammanager/streammanager.go b/p2p/stream/common/streammanager/streammanager.go index fe4505029..60d3a5521 100644 --- a/p2p/stream/common/streammanager/streammanager.go +++ b/p2p/stream/common/streammanager/streammanager.go @@ -315,12 +315,16 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e // If the peer has the same ID and was just connected, skip. continue } + if _, ok := sm.streams.get(sttypes.StreamID(peer.ID)); ok { + continue + } discoveredPeersCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc() connecting += 1 go func(pid libp2p_peer.ID) { // The ctx here is using the module context instead of discover context err := sm.setupStreamWithPeer(sm.ctx, pid) if err != nil { + sm.coolDownCache.Add(peer.ID) sm.logger.Warn().Err(err).Str("peerID", string(pid)).Msg("failed to setup stream with peer") return } diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index 68f92bb8e..7b5a1ff2d 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/harmony-one/harmony/consensus/engine" - "github.com/harmony-one/harmony/core" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/internal/utils" @@ -17,7 +16,6 @@ import ( "github.com/harmony-one/harmony/p2p/stream/common/requestmanager" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/harmony-one/harmony/shard" "github.com/hashicorp/go-version" libp2p_host "github.com/libp2p/go-libp2p/core/host" libp2p_network "github.com/libp2p/go-libp2p/core/network" @@ -60,12 +58,12 @@ type ( // Config is the sync protocol config Config struct { - Chain engine.ChainReader - Host libp2p_host.Host - Discovery discovery.Discovery - ShardID nodeconfig.ShardID - Network nodeconfig.NetworkType - + Chain engine.ChainReader + Host libp2p_host.Host + Discovery discovery.Discovery + ShardID nodeconfig.ShardID + Network nodeconfig.NetworkType + BeaconNode bool // stream manager config SmSoftLowCap int SmHardLowCap int @@ -78,13 +76,9 @@ type ( func NewProtocol(config Config) *Protocol { ctx, cancel := context.WithCancel(context.Background()) - isBeaconNode := config.Chain.ShardID() == shard.BeaconChainShardID - if _, ok := config.Chain.(*core.EpochChain); ok { - isBeaconNode = false - } sp := &Protocol{ chain: config.Chain, - beaconNode: isBeaconNode, + beaconNode: config.BeaconNode, disc: config.Discovery, config: config, ctx: ctx,