From 4b8bfabf284134a12947265a3d23310f2960e626 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Mon, 13 Feb 2023 15:24:35 +0800 Subject: [PATCH] add watermark low/high options for p2p connection manager --- cmd/harmony/config_migrations.go | 6 +++ cmd/harmony/default.go | 2 + cmd/harmony/flags.go | 20 +++++++++ cmd/harmony/flags_test.go | 12 ++++++ cmd/harmony/main.go | 2 + internal/configs/harmony/harmony.go | 2 + internal/configs/node/network.go | 8 +++- node/node_handler.go | 2 + p2p/discovery/option_test.go | 4 +- p2p/host.go | 64 +++++++++++++++++++---------- rosetta/infra/harmony-mainnet.conf | 2 + rosetta/infra/harmony-pstn.conf | 2 + 12 files changed, 101 insertions(+), 25 deletions(-) diff --git a/cmd/harmony/config_migrations.go b/cmd/harmony/config_migrations.go index 3a256f49d..bdbfefdc9 100644 --- a/cmd/harmony/config_migrations.go +++ b/cmd/harmony/config_migrations.go @@ -319,6 +319,12 @@ func init() { } migrations["2.5.10"] = func(confTree *toml.Tree) *toml.Tree { + if confTree.Get("P2P.ConnManagerLowWatermark") == nil { + confTree.Set("P2P.ConnManagerLowWatermark", defaultConfig.P2P.ConnManagerLowWatermark) + } + if confTree.Get("P2P.ConnManagerHighWatermark") == nil { + confTree.Set("P2P.ConnManagerHighWatermark", defaultConfig.P2P.ConnManagerHighWatermark) + } if confTree.Get("Sync.MaxAdvertiseWaitTime") == nil { confTree.Set("Sync.MaxAdvertiseWaitTime", defaultConfig.Sync.MaxAdvertiseWaitTime) } diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 2a6c7a75c..78d2643b6 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -32,6 +32,8 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, DisablePrivateIPScan: false, MaxPeers: nodeconfig.DefaultMaxPeers, + ConnManagerLowWatermark: nodeconfig.DefaultConnManagerLowWatermark, + ConnManagerHighWatermark: nodeconfig.DefaultConnManagerHighWatermark, WaitForEachPeerToConnect: nodeconfig.DefaultWaitForEachPeerToConnect, }, HTTP: harmonyconfig.HttpConfig{ diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index 168081660..55da51f55 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -63,6 +63,8 @@ var ( p2pDisablePrivateIPScanFlag, maxConnPerIPFlag, maxPeersFlag, + connManagerLowWatermarkFlag, + connManagerHighWatermarkFlag, } httpFlags = []cli.Flag{ @@ -579,6 +581,16 @@ var ( Usage: "maximum number of peers allowed, 0 means no limit", DefValue: defaultConfig.P2P.MaxConnsPerIP, } + connManagerLowWatermarkFlag = cli.IntFlag{ + Name: "p2p.connmgr-low", + Usage: "lowest number of connections that'll be maintained in connection manager", + DefValue: defaultConfig.P2P.ConnManagerLowWatermark, + } + connManagerHighWatermarkFlag = cli.IntFlag{ + Name: "p2p.connmgr-high", + Usage: "highest number of connections that'll be maintained in connection manager", + DefValue: defaultConfig.P2P.ConnManagerHighWatermark, + } waitForEachPeerToConnectFlag = cli.BoolFlag{ Name: "p2p.wait-for-connections", Usage: "node waits for each single peer to connect and it doesn't add them to peers list after timeout", @@ -624,6 +636,14 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { config.P2P.WaitForEachPeerToConnect = cli.GetBoolFlagValue(cmd, waitForEachPeerToConnectFlag) } + if cli.IsFlagChanged(cmd, connManagerLowWatermarkFlag) { + config.P2P.ConnManagerLowWatermark = cli.GetIntFlagValue(cmd, connManagerLowWatermarkFlag) + } + + if cli.IsFlagChanged(cmd, connManagerHighWatermarkFlag) { + config.P2P.ConnManagerHighWatermark = cli.GetIntFlagValue(cmd, connManagerHighWatermarkFlag) + } + if cli.IsFlagChanged(cmd, p2pDisablePrivateIPScanFlag) { config.P2P.DisablePrivateIPScan = cli.GetBoolFlagValue(cmd, p2pDisablePrivateIPScanFlag) } diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index 7bd3e7199..9ebae4bb3 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -65,6 +65,8 @@ func TestHarmonyFlags(t *testing.T) { MaxConnsPerIP: 5, DisablePrivateIPScan: false, MaxPeers: defaultConfig.P2P.MaxPeers, + ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark, + ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark, WaitForEachPeerToConnect: false, }, HTTP: harmonyconfig.HttpConfig{ @@ -374,6 +376,8 @@ func TestP2PFlags(t *testing.T) { MaxConnsPerIP: 10, DisablePrivateIPScan: false, MaxPeers: defaultConfig.P2P.MaxPeers, + ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark, + ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark, WaitForEachPeerToConnect: false, }, }, @@ -386,6 +390,8 @@ func TestP2PFlags(t *testing.T) { MaxConnsPerIP: 10, DisablePrivateIPScan: false, MaxPeers: defaultConfig.P2P.MaxPeers, + ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark, + ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark, WaitForEachPeerToConnect: false, }, }, @@ -399,6 +405,8 @@ func TestP2PFlags(t *testing.T) { MaxConnsPerIP: 5, DisablePrivateIPScan: false, MaxPeers: defaultConfig.P2P.MaxPeers, + ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark, + ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark, WaitForEachPeerToConnect: false, }, }, @@ -412,6 +420,8 @@ func TestP2PFlags(t *testing.T) { MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, DisablePrivateIPScan: true, MaxPeers: defaultConfig.P2P.MaxPeers, + ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark, + ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark, WaitForEachPeerToConnect: false, }, }, @@ -425,6 +435,8 @@ func TestP2PFlags(t *testing.T) { MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan, MaxPeers: 100, + ConnManagerLowWatermark: defaultConfig.P2P.ConnManagerLowWatermark, + ConnManagerHighWatermark: defaultConfig.P2P.ConnManagerHighWatermark, WaitForEachPeerToConnect: false, }, }, diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index b68e75742..eacc392f1 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -645,6 +645,8 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, MaxConnPerIP: hc.P2P.MaxConnsPerIP, DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan, MaxPeers: hc.P2P.MaxPeers, + ConnManagerLowWatermark: hc.P2P.ConnManagerLowWatermark, + ConnManagerHighWatermark: hc.P2P.ConnManagerHighWatermark, WaitForEachPeerToConnect: hc.P2P.WaitForEachPeerToConnect, ForceReachabilityPublic: forceReachabilityPublic, }) diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index 73ddd442d..af3c8c9b6 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -54,6 +54,8 @@ type P2pConfig struct { MaxConnsPerIP int DisablePrivateIPScan bool MaxPeers int64 + ConnManagerLowWatermark int + ConnManagerHighWatermark int WaitForEachPeerToConnect bool } diff --git a/internal/configs/node/network.go b/internal/configs/node/network.go index 332b5cce7..8b15d3359 100644 --- a/internal/configs/node/network.go +++ b/internal/configs/node/network.go @@ -63,7 +63,13 @@ const ( DefaultMaxConnPerIP = 10 // DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit DefaultMaxPeers = 0 - // DefaultWaitForEachPeerToConnect sets the sync configs to connect to neighbor peers one by one and waits for each peer to connect + // DefaultConnManagerLowWatermark is the lowest number of connections that'll be maintained in connection manager + DefaultConnManagerLowWatermark = 160 + // DefaultConnManagerHighWatermark is the highest number of connections that'll be maintained in connection manager + // When the peer count exceeds the 'high watermark', as many peers will be pruned (and + // their connections terminated) until 'low watermark' peers remain. + DefaultConnManagerHighWatermark = 192 + // DefaultWaitForEachPeerToConnect sets the sync configs to connect to neighbor peers one by one and waits for each peer to connect. DefaultWaitForEachPeerToConnect = false ) diff --git a/node/node_handler.go b/node/node_handler.go index 3db4f8dea..a34d73e94 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -3,6 +3,7 @@ package node import ( "bytes" "context" + "fmt" "math/rand" "time" @@ -447,6 +448,7 @@ func (node *Node) BootstrapConsensus() error { if numPeersNow >= min { utils.Logger().Info().Msg("[bootstrap] StartConsensus") enoughMinPeers <- struct{}{} + fmt.Println("Bootstrap consensus done.", numPeersNow, " peers are connected") return } utils.Logger().Info(). diff --git a/p2p/discovery/option_test.go b/p2p/discovery/option_test.go index e782151f5..1829e77c8 100644 --- a/p2p/discovery/option_test.go +++ b/p2p/discovery/option_test.go @@ -40,14 +40,14 @@ func TestDHTOption_getLibp2pRawOptions(t *testing.T) { opt: DHTConfig{ BootNodes: testAddrStr, }, - expLen: 2, + expLen: 1, }, { opt: DHTConfig{ BootNodes: testAddrStr, DataStoreFile: &validPath, }, - expLen: 3, + expLen: 2, }, { opt: DHTConfig{ diff --git a/p2p/host.go b/p2p/host.go index bab1da691..9395e1df4 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -96,6 +96,8 @@ type HostConfig struct { MaxConnPerIP int DisablePrivateIPScan bool MaxPeers int64 + ConnManagerLowWatermark int + ConnManagerHighWatermark int WaitForEachPeerToConnect bool ForceReachabilityPublic bool } @@ -114,19 +116,6 @@ func init() { libp2p_pubsub.GossipSubMaxIHaveLength = 1000 } -func forceReachabilityPublic(f bool) libp2p_config.Option { - if f { - return func(cfg *libp2p_config.Config) error { - public := libp2p_network.Reachability(libp2p_network.ReachabilityPublic) - cfg.AutoNATConfig.ForceReachability = &public - return nil - } - } - return func(p2pConfig *libp2p_config.Config) error { - return nil - } -} - // NewHost .. func NewHost(cfg HostConfig) (Host, error) { var ( @@ -141,16 +130,20 @@ func NewHost(cfg HostConfig) (Host, error) { ) ctx, cancel := context.WithCancel(context.Background()) - // TODO: move low and high to configs - connmgr, err := connmgr.NewConnManager( - int(cfg.MaxConnPerIP), // LowWater - int(1024)*cfg.MaxConnPerIP, // HighWater, - connmgr.WithGracePeriod(time.Minute), - ) - if err != nil { + + // create connection manager + low := cfg.ConnManagerLowWatermark + high := cfg.ConnManagerHighWatermark + if high < low { cancel() - return nil, err + utils.Logger().Error(). + Int("low", cfg.ConnManagerLowWatermark). + Int("high", cfg.ConnManagerHighWatermark). + Msg("connection manager watermarks are invalid") + return nil, errors.New("invalid connection manager watermarks") } + + // prepare host options var idht *dht.IpfsDHT var opt discovery.DHTConfig p2pHostConfig := []libp2p.Option{ @@ -164,7 +157,7 @@ func NewHost(cfg HostConfig) (Host, error) { libp2p.DefaultTransports, // Prevent the peer from having too many // connections by attaching a connection manager. - libp2p.ConnectionManager(connmgr), + connectionManager(low, high), // Attempt to open ports using uPNP for NATed hosts. libp2p.NATPortMap(), libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { @@ -286,6 +279,33 @@ func NewHost(cfg HostConfig) (Host, error) { return h, nil } +// connectionManager creates a new connection manager and configures libp2p to use the +// given connection manager. +// lo and hi are watermarks governing the number of connections that'll be maintained. +// When the peer count exceeds the 'high watermark', as many peers will be pruned (and +// their connections terminated) until 'low watermark' peers remain. +func connectionManager(low int, high int) libp2p_config.Option { + if low > 0 && high > low { + connmgr, err := connmgr.NewConnManager( + low, // Low Watermark + high, // High Watermark + connmgr.WithGracePeriod(time.Minute), + ) + if err != nil { + utils.Logger().Error(). + Err(err). + Int("low", low). + Int("high", high). + Msg("create connection manager failed") + return nil + } + return libp2p.ConnectionManager(connmgr) + } + return func(p2pConfig *libp2p_config.Config) error { + return nil + } +} + // HostV2 is the version 2 p2p host type HostV2 struct { h libp2p_host.Host diff --git a/rosetta/infra/harmony-mainnet.conf b/rosetta/infra/harmony-mainnet.conf index 3e7085ba6..64e78c7eb 100644 --- a/rosetta/infra/harmony-mainnet.conf +++ b/rosetta/infra/harmony-mainnet.conf @@ -68,6 +68,8 @@ Version = "2.5.11" MaxConnsPerIP = 10 MaxPeers = 0 Port = 9000 + ConnManagerLowWatermark = 160 + ConnManagerHighWatermark = 192 WaitForEachPeerToConnect = false [Pprof] diff --git a/rosetta/infra/harmony-pstn.conf b/rosetta/infra/harmony-pstn.conf index 458e82635..f9ded4d38 100644 --- a/rosetta/infra/harmony-pstn.conf +++ b/rosetta/infra/harmony-pstn.conf @@ -68,6 +68,8 @@ Version = "2.5.11" MaxConnsPerIP = 10 MaxPeers = 0 Port = 9000 + ConnManagerLowWatermark = 160 + ConnManagerHighWatermark = 192 WaitForEachPeerToConnect = false [Pprof]