add watermark low/high options for p2p connection manager

pull/4366/head
“GheisMohammadi” 2 years ago committed by Soph
parent f07dd5681a
commit 4b8bfabf28
  1. 6
      cmd/harmony/config_migrations.go
  2. 2
      cmd/harmony/default.go
  3. 20
      cmd/harmony/flags.go
  4. 12
      cmd/harmony/flags_test.go
  5. 2
      cmd/harmony/main.go
  6. 2
      internal/configs/harmony/harmony.go
  7. 8
      internal/configs/node/network.go
  8. 2
      node/node_handler.go
  9. 4
      p2p/discovery/option_test.go
  10. 64
      p2p/host.go
  11. 2
      rosetta/infra/harmony-mainnet.conf
  12. 2
      rosetta/infra/harmony-pstn.conf

@ -319,6 +319,12 @@ func init() {
} }
migrations["2.5.10"] = func(confTree *toml.Tree) *toml.Tree { migrations["2.5.10"] = func(confTree *toml.Tree) *toml.Tree {
if confTree.Get("P2P.ConnManagerLowWatermark") == nil {
confTree.Set("P2P.ConnManagerLowWatermark", defaultConfig.P2P.ConnManagerLowWatermark)
}
if confTree.Get("P2P.ConnManagerHighWatermark") == nil {
confTree.Set("P2P.ConnManagerHighWatermark", defaultConfig.P2P.ConnManagerHighWatermark)
}
if confTree.Get("Sync.MaxAdvertiseWaitTime") == nil { if confTree.Get("Sync.MaxAdvertiseWaitTime") == nil {
confTree.Set("Sync.MaxAdvertiseWaitTime", defaultConfig.Sync.MaxAdvertiseWaitTime) confTree.Set("Sync.MaxAdvertiseWaitTime", defaultConfig.Sync.MaxAdvertiseWaitTime)
} }

@ -32,6 +32,8 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: nodeconfig.DefaultMaxPeers, MaxPeers: nodeconfig.DefaultMaxPeers,
ConnManagerLowWatermark: nodeconfig.DefaultConnManagerLowWatermark,
ConnManagerHighWatermark: nodeconfig.DefaultConnManagerHighWatermark,
WaitForEachPeerToConnect: nodeconfig.DefaultWaitForEachPeerToConnect, WaitForEachPeerToConnect: nodeconfig.DefaultWaitForEachPeerToConnect,
}, },
HTTP: harmonyconfig.HttpConfig{ HTTP: harmonyconfig.HttpConfig{

@ -63,6 +63,8 @@ var (
p2pDisablePrivateIPScanFlag, p2pDisablePrivateIPScanFlag,
maxConnPerIPFlag, maxConnPerIPFlag,
maxPeersFlag, maxPeersFlag,
connManagerLowWatermarkFlag,
connManagerHighWatermarkFlag,
} }
httpFlags = []cli.Flag{ httpFlags = []cli.Flag{
@ -579,6 +581,16 @@ var (
Usage: "maximum number of peers allowed, 0 means no limit", Usage: "maximum number of peers allowed, 0 means no limit",
DefValue: defaultConfig.P2P.MaxConnsPerIP, DefValue: defaultConfig.P2P.MaxConnsPerIP,
} }
connManagerLowWatermarkFlag = cli.IntFlag{
Name: "p2p.connmgr-low",
Usage: "lowest number of connections that'll be maintained in connection manager",
DefValue: defaultConfig.P2P.ConnManagerLowWatermark,
}
connManagerHighWatermarkFlag = cli.IntFlag{
Name: "p2p.connmgr-high",
Usage: "highest number of connections that'll be maintained in connection manager",
DefValue: defaultConfig.P2P.ConnManagerHighWatermark,
}
waitForEachPeerToConnectFlag = cli.BoolFlag{ waitForEachPeerToConnectFlag = cli.BoolFlag{
Name: "p2p.wait-for-connections", Name: "p2p.wait-for-connections",
Usage: "node waits for each single peer to connect and it doesn't add them to peers list after timeout", Usage: "node waits for each single peer to connect and it doesn't add them to peers list after timeout",
@ -624,6 +636,14 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
config.P2P.WaitForEachPeerToConnect = cli.GetBoolFlagValue(cmd, waitForEachPeerToConnectFlag) config.P2P.WaitForEachPeerToConnect = cli.GetBoolFlagValue(cmd, waitForEachPeerToConnectFlag)
} }
if cli.IsFlagChanged(cmd, connManagerLowWatermarkFlag) {
config.P2P.ConnManagerLowWatermark = cli.GetIntFlagValue(cmd, connManagerLowWatermarkFlag)
}
if cli.IsFlagChanged(cmd, connManagerHighWatermarkFlag) {
config.P2P.ConnManagerHighWatermark = cli.GetIntFlagValue(cmd, connManagerHighWatermarkFlag)
}
if cli.IsFlagChanged(cmd, p2pDisablePrivateIPScanFlag) { if cli.IsFlagChanged(cmd, p2pDisablePrivateIPScanFlag) {
config.P2P.DisablePrivateIPScan = cli.GetBoolFlagValue(cmd, p2pDisablePrivateIPScanFlag) config.P2P.DisablePrivateIPScan = cli.GetBoolFlagValue(cmd, p2pDisablePrivateIPScanFlag)
} }

@ -65,6 +65,8 @@ func TestHarmonyFlags(t *testing.T) {
MaxConnsPerIP: 5, MaxConnsPerIP: 5,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark,
WaitForEachPeerToConnect: false, WaitForEachPeerToConnect: false,
}, },
HTTP: harmonyconfig.HttpConfig{ HTTP: harmonyconfig.HttpConfig{
@ -374,6 +376,8 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: 10, MaxConnsPerIP: 10,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark,
WaitForEachPeerToConnect: false, WaitForEachPeerToConnect: false,
}, },
}, },
@ -386,6 +390,8 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: 10, MaxConnsPerIP: 10,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark,
WaitForEachPeerToConnect: false, WaitForEachPeerToConnect: false,
}, },
}, },
@ -399,6 +405,8 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: 5, MaxConnsPerIP: 5,
DisablePrivateIPScan: false, DisablePrivateIPScan: false,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark,
WaitForEachPeerToConnect: false, WaitForEachPeerToConnect: false,
}, },
}, },
@ -412,6 +420,8 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP,
DisablePrivateIPScan: true, DisablePrivateIPScan: true,
MaxPeers: defaultConfig.P2P.MaxPeers, MaxPeers: defaultConfig.P2P.MaxPeers,
ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark,
WaitForEachPeerToConnect: false, WaitForEachPeerToConnect: false,
}, },
}, },
@ -425,6 +435,8 @@ func TestP2PFlags(t *testing.T) {
MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP,
DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan, DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan,
MaxPeers: 100, MaxPeers: 100,
ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark,
WaitForEachPeerToConnect: false, WaitForEachPeerToConnect: false,
}, },
}, },

@ -645,6 +645,8 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
MaxConnPerIP: hc.P2P.MaxConnsPerIP, MaxConnPerIP: hc.P2P.MaxConnsPerIP,
DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan, DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan,
MaxPeers: hc.P2P.MaxPeers, MaxPeers: hc.P2P.MaxPeers,
ConnManagerLowWatermark: hc.P2P.ConnManagerLowWatermark,
ConnManagerHighWatermark: hc.P2P.ConnManagerHighWatermark,
WaitForEachPeerToConnect: hc.P2P.WaitForEachPeerToConnect, WaitForEachPeerToConnect: hc.P2P.WaitForEachPeerToConnect,
ForceReachabilityPublic: forceReachabilityPublic, ForceReachabilityPublic: forceReachabilityPublic,
}) })

@ -54,6 +54,8 @@ type P2pConfig struct {
MaxConnsPerIP int MaxConnsPerIP int
DisablePrivateIPScan bool DisablePrivateIPScan bool
MaxPeers int64 MaxPeers int64
ConnManagerLowWatermark int
ConnManagerHighWatermark int
WaitForEachPeerToConnect bool WaitForEachPeerToConnect bool
} }

@ -63,7 +63,13 @@ const (
DefaultMaxConnPerIP = 10 DefaultMaxConnPerIP = 10
// DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit // DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit
DefaultMaxPeers = 0 DefaultMaxPeers = 0
// DefaultWaitForEachPeerToConnect sets the sync configs to connect to neighbor peers one by one and waits for each peer to connect // DefaultConnManagerLowWatermark is the lowest number of connections that'll be maintained in connection manager
DefaultConnManagerLowWatermark = 160
// DefaultConnManagerHighWatermark is the highest number of connections that'll be maintained in connection manager
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
// their connections terminated) until 'low watermark' peers remain.
DefaultConnManagerHighWatermark = 192
// DefaultWaitForEachPeerToConnect sets the sync configs to connect to neighbor peers one by one and waits for each peer to connect.
DefaultWaitForEachPeerToConnect = false DefaultWaitForEachPeerToConnect = false
) )

@ -3,6 +3,7 @@ package node
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"math/rand" "math/rand"
"time" "time"
@ -447,6 +448,7 @@ func (node *Node) BootstrapConsensus() error {
if numPeersNow >= min { if numPeersNow >= min {
utils.Logger().Info().Msg("[bootstrap] StartConsensus") utils.Logger().Info().Msg("[bootstrap] StartConsensus")
enoughMinPeers <- struct{}{} enoughMinPeers <- struct{}{}
fmt.Println("Bootstrap consensus done.", numPeersNow, " peers are connected")
return return
} }
utils.Logger().Info(). utils.Logger().Info().

@ -40,14 +40,14 @@ func TestDHTOption_getLibp2pRawOptions(t *testing.T) {
opt: DHTConfig{ opt: DHTConfig{
BootNodes: testAddrStr, BootNodes: testAddrStr,
}, },
expLen: 2, expLen: 1,
}, },
{ {
opt: DHTConfig{ opt: DHTConfig{
BootNodes: testAddrStr, BootNodes: testAddrStr,
DataStoreFile: &validPath, DataStoreFile: &validPath,
}, },
expLen: 3, expLen: 2,
}, },
{ {
opt: DHTConfig{ opt: DHTConfig{

@ -96,6 +96,8 @@ type HostConfig struct {
MaxConnPerIP int MaxConnPerIP int
DisablePrivateIPScan bool DisablePrivateIPScan bool
MaxPeers int64 MaxPeers int64
ConnManagerLowWatermark int
ConnManagerHighWatermark int
WaitForEachPeerToConnect bool WaitForEachPeerToConnect bool
ForceReachabilityPublic bool ForceReachabilityPublic bool
} }
@ -114,19 +116,6 @@ func init() {
libp2p_pubsub.GossipSubMaxIHaveLength = 1000 libp2p_pubsub.GossipSubMaxIHaveLength = 1000
} }
func forceReachabilityPublic(f bool) libp2p_config.Option {
if f {
return func(cfg *libp2p_config.Config) error {
public := libp2p_network.Reachability(libp2p_network.ReachabilityPublic)
cfg.AutoNATConfig.ForceReachability = &public
return nil
}
}
return func(p2pConfig *libp2p_config.Config) error {
return nil
}
}
// NewHost .. // NewHost ..
func NewHost(cfg HostConfig) (Host, error) { func NewHost(cfg HostConfig) (Host, error) {
var ( var (
@ -141,16 +130,20 @@ func NewHost(cfg HostConfig) (Host, error) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// TODO: move low and high to configs
connmgr, err := connmgr.NewConnManager( // create connection manager
int(cfg.MaxConnPerIP), // LowWater low := cfg.ConnManagerLowWatermark
int(1024)*cfg.MaxConnPerIP, // HighWater, high := cfg.ConnManagerHighWatermark
connmgr.WithGracePeriod(time.Minute), if high < low {
)
if err != nil {
cancel() cancel()
return nil, err utils.Logger().Error().
Int("low", cfg.ConnManagerLowWatermark).
Int("high", cfg.ConnManagerHighWatermark).
Msg("connection manager watermarks are invalid")
return nil, errors.New("invalid connection manager watermarks")
} }
// prepare host options
var idht *dht.IpfsDHT var idht *dht.IpfsDHT
var opt discovery.DHTConfig var opt discovery.DHTConfig
p2pHostConfig := []libp2p.Option{ p2pHostConfig := []libp2p.Option{
@ -164,7 +157,7 @@ func NewHost(cfg HostConfig) (Host, error) {
libp2p.DefaultTransports, libp2p.DefaultTransports,
// Prevent the peer from having too many // Prevent the peer from having too many
// connections by attaching a connection manager. // connections by attaching a connection manager.
libp2p.ConnectionManager(connmgr), connectionManager(low, high),
// Attempt to open ports using uPNP for NATed hosts. // Attempt to open ports using uPNP for NATed hosts.
libp2p.NATPortMap(), libp2p.NATPortMap(),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
@ -286,6 +279,33 @@ func NewHost(cfg HostConfig) (Host, error) {
return h, nil return h, nil
} }
// connectionManager creates a new connection manager and configures libp2p to use the
// given connection manager.
// lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
// their connections terminated) until 'low watermark' peers remain.
func connectionManager(low int, high int) libp2p_config.Option {
if low > 0 && high > low {
connmgr, err := connmgr.NewConnManager(
low, // Low Watermark
high, // High Watermark
connmgr.WithGracePeriod(time.Minute),
)
if err != nil {
utils.Logger().Error().
Err(err).
Int("low", low).
Int("high", high).
Msg("create connection manager failed")
return nil
}
return libp2p.ConnectionManager(connmgr)
}
return func(p2pConfig *libp2p_config.Config) error {
return nil
}
}
// HostV2 is the version 2 p2p host // HostV2 is the version 2 p2p host
type HostV2 struct { type HostV2 struct {
h libp2p_host.Host h libp2p_host.Host

@ -68,6 +68,8 @@ Version = "2.5.11"
MaxConnsPerIP = 10 MaxConnsPerIP = 10
MaxPeers = 0 MaxPeers = 0
Port = 9000 Port = 9000
ConnManagerLowWatermark = 160
ConnManagerHighWatermark = 192
WaitForEachPeerToConnect = false WaitForEachPeerToConnect = false
[Pprof] [Pprof]

@ -68,6 +68,8 @@ Version = "2.5.11"
MaxConnsPerIP = 10 MaxConnsPerIP = 10
MaxPeers = 0 MaxPeers = 0
Port = 9000 Port = 9000
ConnManagerLowWatermark = 160
ConnManagerHighWatermark = 192
WaitForEachPeerToConnect = false WaitForEachPeerToConnect = false
[Pprof] [Pprof]

Loading…
Cancel
Save