From cf5a264b158d270120644082c5f443112da47620 Mon Sep 17 00:00:00 2001 From: Max <82761650+MaxMustermann2@users.noreply.github.com> Date: Wed, 24 Aug 2022 18:30:40 +0000 Subject: [PATCH] [p2p] feat: add maximum peers limit (#4272) --- cmd/harmony/config_migrations.go | 34 ++++++++++- cmd/harmony/default.go | 3 +- cmd/harmony/flags.go | 12 +++- cmd/harmony/flags_test.go | 17 ++++++ cmd/harmony/main.go | 1 + internal/configs/harmony/harmony.go | 1 + internal/configs/node/network.go | 5 +- p2p/host.go | 3 +- p2p/security/security.go | 88 +++++++++++++++++++++++++---- p2p/security/security_test.go | 11 ++-- rosetta/infra/harmony-mainnet.conf | 6 +- rosetta/infra/harmony-pstn.conf | 6 +- 12 files changed, 159 insertions(+), 28 deletions(-) diff --git a/cmd/harmony/config_migrations.go b/cmd/harmony/config_migrations.go index 5098f27fe..f8e7e6111 100644 --- a/cmd/harmony/config_migrations.go +++ b/cmd/harmony/config_migrations.go @@ -3,6 +3,7 @@ package main import ( "errors" "fmt" + "sort" goversion "github.com/hashicorp/go-version" "github.com/pelletier/go-toml" @@ -273,13 +274,42 @@ func init() { confTree.Set("Version", "2.5.5") return confTree } - migrations["2.5.5"] = func(confTree *toml.Tree) *toml.Tree { if confTree.Get("Log.Console") == nil { confTree.Set("Log.Console", defaultConfig.Log.Console) } - confTree.Set("Version", "2.5.6") return confTree } + migrations["2.5.6"] = func(confTree *toml.Tree) *toml.Tree { + if confTree.Get("P2P.MaxPeers") == nil { + confTree.Set("P2P.MaxPeers", defaultConfig.P2P.MaxPeers) + } + confTree.Set("Version", "2.5.7") + return confTree + } + + // check that the latest version here is the same as in default.go + largestKey := getNextVersion(migrations) + if largestKey != tomlConfigVersion { + panic(fmt.Sprintf("next migration value: %s, toml version: %s", largestKey, tomlConfigVersion)) + } +} + +func getNextVersion(x map[string]configMigrationFunc) string { + versionMap := make(map[string]interface{}, 1) + versionMap["Version"] = "FakeVersion" + tree, _ := toml.TreeFromMap(versionMap) + + // needs to be sorted in case the order is incorrect + keys := make([]string, len(x)) + i := 0 + for k := range x { + keys[i] = k + i++ + } + sort.Strings(keys) + requiredFunc := x[keys[len(keys)-1]] + tree = requiredFunc(tree) + return tree.Get("Version").(string) } diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 827677745..58db92594 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -5,7 +5,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" ) -const tomlConfigVersion = "2.5.5" +const tomlConfigVersion = "2.5.7" const ( defNetworkType = nodeconfig.Mainnet @@ -31,6 +31,7 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ DiscConcurrency: nodeconfig.DefaultP2PConcurrency, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, DisablePrivateIPScan: false, + MaxPeers: nodeconfig.DefaultMaxPeers, }, HTTP: harmonyconfig.HttpConfig{ Enabled: true, diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index dd9a49929..43c0b6103 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -62,6 +62,7 @@ var ( legacyKeyFileFlag, p2pDisablePrivateIPScanFlag, maxConnPerIPFlag, + maxPeersFlag, } httpFlags = []cli.Flag{ @@ -573,7 +574,12 @@ var ( } maxConnPerIPFlag = cli.IntFlag{ Name: "p2p.security.max-conn-per-ip", - Usage: "maximum number of connections allowed per node", + Usage: "maximum number of connections allowed per remote node, 0 means no limit", + DefValue: defaultConfig.P2P.MaxConnsPerIP, + } + maxPeersFlag = cli.IntFlag{ + Name: "p2p.security.max-peers", + Usage: "maximum number of peers allowed, 0 means no limit", DefValue: defaultConfig.P2P.MaxConnsPerIP, } ) @@ -608,6 +614,10 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { config.P2P.MaxConnsPerIP = cli.GetIntFlagValue(cmd, maxConnPerIPFlag) } + if cli.IsFlagChanged(cmd, maxPeersFlag) { + config.P2P.MaxPeers = int64(cli.GetIntFlagValue(cmd, maxPeersFlag)) + } + if cli.IsFlagChanged(cmd, p2pDisablePrivateIPScanFlag) { config.P2P.DisablePrivateIPScan = cli.GetBoolFlagValue(cmd, p2pDisablePrivateIPScanFlag) } diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index d269dabc6..59fa817d4 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -64,6 +64,7 @@ func TestHarmonyFlags(t *testing.T) { DiscConcurrency: 5, MaxConnsPerIP: 5, DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, }, HTTP: harmonyconfig.HttpConfig{ Enabled: true, @@ -390,6 +391,7 @@ func TestP2PFlags(t *testing.T) { DHTDataStore: &defDataStore, MaxConnsPerIP: 10, DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, }, }, { @@ -400,6 +402,7 @@ func TestP2PFlags(t *testing.T) { KeyFile: "./key.file", MaxConnsPerIP: 10, DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, }, }, { @@ -411,6 +414,7 @@ func TestP2PFlags(t *testing.T) { DiscConcurrency: 5, MaxConnsPerIP: 5, DisablePrivateIPScan: false, + MaxPeers: defaultConfig.P2P.MaxPeers, }, }, { @@ -422,6 +426,19 @@ func TestP2PFlags(t *testing.T) { DiscConcurrency: nodeconfig.DefaultP2PConcurrency, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, DisablePrivateIPScan: true, + MaxPeers: defaultConfig.P2P.MaxPeers, + }, + }, + { + args: []string{"--p2p.security.max-peers", "100"}, + expConfig: harmonyconfig.P2pConfig{ + Port: nodeconfig.DefaultP2PPort, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./.hmykey", + DiscConcurrency: nodeconfig.DefaultP2PConcurrency, + MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, + DisablePrivateIPScan: defaultConfig.P2P.DisablePrivateIPScan, + MaxPeers: 100, }, }, } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 695ae47dd..c8d9a3f53 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -619,6 +619,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, DiscConcurrency: hc.P2P.DiscConcurrency, MaxConnPerIP: hc.P2P.MaxConnsPerIP, DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan, + MaxPeers: hc.P2P.MaxPeers, }) if err != nil { return nil, errors.Wrap(err, "cannot create P2P network host") diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index 60265781c..c611e9422 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -54,6 +54,7 @@ type P2pConfig struct { DiscConcurrency int // Discovery Concurrency value MaxConnsPerIP int DisablePrivateIPScan bool + MaxPeers int64 } type GeneralConfig struct { diff --git a/internal/configs/node/network.go b/internal/configs/node/network.go index c36aaf7e1..0749d43e7 100644 --- a/internal/configs/node/network.go +++ b/internal/configs/node/network.go @@ -59,7 +59,10 @@ const ( DefaultPrometheusPort = 9900 // DefaultP2PConcurrency is the default P2P concurrency, 0 means is set the default value of P2P Discovery, the actual value is 10 DefaultP2PConcurrency = 0 - DefaultMaxConnPerIP = 10 + // DefaultMaxConnPerIP is the maximum number of connections to/from a remote IP + DefaultMaxConnPerIP = 10 + // DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit + DefaultMaxPeers = 0 ) const ( diff --git a/p2p/host.go b/p2p/host.go index 766a2995a..e81b50501 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -87,6 +87,7 @@ type HostConfig struct { DiscConcurrency int MaxConnPerIP int DisablePrivateIPScan bool + MaxPeers int64 } func init() { @@ -182,7 +183,7 @@ func NewHost(cfg HostConfig) (Host, error) { self.PeerID = p2pHost.ID() subLogger := utils.Logger().With().Str("hostID", p2pHost.ID().Pretty()).Logger() - security := security.NewManager(cfg.MaxConnPerIP) + security := security.NewManager(cfg.MaxConnPerIP, cfg.MaxPeers) // has to save the private key for host h := &HostV2{ h: p2pHost, diff --git a/p2p/security/security.go b/p2p/security/security.go index cdc8a3f89..02d1b963e 100644 --- a/p2p/security/security.go +++ b/p2p/security/security.go @@ -3,6 +3,7 @@ package security import ( "fmt" "sync" + "sync/atomic" "github.com/harmony-one/harmony/internal/utils" libp2p_network "github.com/libp2p/go-libp2p-core/network" @@ -17,14 +18,65 @@ type Security interface { type Manager struct { maxConnPerIP int + maxPeers int64 mutex sync.Mutex - peers sync.Map // All the connected nodes, key is the Peer's IP, value is the peer's ID array + peers peerMap // All the connected nodes, key is the Peer's IP, value is the peer's ID array } -func NewManager(maxConnPerIP int) *Manager { +type peerMap struct { + count int64 + peers sync.Map +} + +func (peerMap *peerMap) Len() int64 { + return atomic.LoadInt64(&peerMap.count) +} + +func (peerMap *peerMap) Store(key, value interface{}) { + // only increment if you didn't have this key + hasKey := peerMap.HasKey(key) + peerMap.peers.Store(key, value) + if !hasKey { + atomic.AddInt64(&peerMap.count, 1) + } +} + +func (peerMap *peerMap) HasKey(key interface{}) bool { + hasKey := false + peerMap.peers.Range(func(k, v interface{}) bool { + if k == key { + hasKey = true + return false + } + return true + }) + return hasKey +} + +func (peerMap *peerMap) Delete(key interface{}) { + peerMap.peers.Delete(key) + atomic.AddInt64(&peerMap.count, -1) +} + +func (peerMap *peerMap) Load(key interface{}) (value interface{}, ok bool) { + return peerMap.peers.Load(key) +} + +func (peerMap *peerMap) Range(f func(key, value any) bool) { + peerMap.peers.Range(f) +} + +func NewManager(maxConnPerIP int, maxPeers int64) *Manager { + if maxConnPerIP < 0 { + panic("maximum connections per IP must not be negative") + } + if maxPeers < 0 { + panic("maximum peers must not be negative") + } return &Manager{ maxConnPerIP: maxConnPerIP, + maxPeers: maxPeers, } } @@ -32,12 +84,12 @@ func (m *Manager) OnConnectCheck(net libp2p_network.Network, conn libp2p_network m.mutex.Lock() defer m.mutex.Unlock() - ip, err := getIP(conn) + remoteIp, err := getRemoteIP(conn) if err != nil { - return errors.Wrap(err, "failed on get ip") + return errors.Wrap(err, "failed on get remote ip") } - value, ok := m.peers.Load(ip) + value, ok := m.peers.Load(remoteIp) if !ok { value = []string{} } @@ -54,13 +106,24 @@ func (m *Manager) OnConnectCheck(net libp2p_network.Network, conn libp2p_network peers = append(peers, peerID) } - if len(peers) > m.maxConnPerIP { - utils.Logger().Warn().Int("len(peers)", len(peers)).Int("maxConnPerIP", m.maxConnPerIP). - Msg("Too much peers, closing") + if m.maxConnPerIP > 0 && len(peers) > m.maxConnPerIP { + utils.Logger().Warn(). + Int("len(peers)", len(peers)). + Int("maxConnPerIP", m.maxConnPerIP). + Msgf("too many connections from %s, closing", remoteIp) return net.ClosePeer(conn.RemotePeer()) } - m.peers.Store(ip, peers) + currentPeerCount := m.peers.Len() + // only limit addition if it's a new peer and not an existing peer with new connection + if m.maxPeers > 0 && currentPeerCount >= m.maxPeers && !m.peers.HasKey(remoteIp) { + utils.Logger().Warn(). + Int64("connected peers", currentPeerCount). + Str("new peer", remoteIp). + Msg("too many peers, closing") + return net.ClosePeer(conn.RemotePeer()) + } + m.peers.Store(remoteIp, peers) return nil } @@ -68,7 +131,7 @@ func (m *Manager) OnDisconnectCheck(conn libp2p_network.Conn) error { m.mutex.Lock() defer m.mutex.Unlock() - ip, err := getIP(conn) + ip, err := getRemoteIP(conn) if err != nil { return errors.Wrap(err, "failed on get ip") } @@ -87,9 +150,10 @@ func (m *Manager) OnDisconnectCheck(conn libp2p_network.Conn) error { index, ok := find(peers, peerID) if ok { peers = append(peers[:index], peers[index+1:]...) - m.peers.Store(ip, peers) if len(peers) == 0 { m.peers.Delete(ip) + } else { + m.peers.Store(ip, peers) } } @@ -106,7 +170,7 @@ func find(slice []string, val string) (int, bool) { return -1, false } -func getIP(conn libp2p_network.Conn) (string, error) { +func getRemoteIP(conn libp2p_network.Conn) (string, error) { for _, protocol := range conn.RemoteMultiaddr().Protocols() { switch protocol.Code { case ma.P_IP4: diff --git a/p2p/security/security_test.go b/p2p/security/security_test.go index 98a19696d..3f8b66e4f 100644 --- a/p2p/security/security_test.go +++ b/p2p/security/security_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/crypto" ic "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -58,7 +57,7 @@ func TestManager_OnConnectCheck(t *testing.T) { defer h1.Close() fakeHost := &fakeHost{} - security := NewManager(2) + security := NewManager(2, 1) h1.Network().Notify(fakeHost) fakeHost.SetConnectCallback(security.OnConnectCheck) fakeHost.SetDisconnectCallback(security.OnDisconnectCheck) @@ -103,7 +102,7 @@ func TestManager_OnDisconnectCheck(t *testing.T) { defer h1.Close() fakeHost := &fakeHost{} - security := NewManager(2) + security := NewManager(2, 0) h1.Network().Notify(fakeHost) fakeHost.SetConnectCallback(security.OnConnectCheck) fakeHost.SetDisconnectCallback(security.OnDisconnectCheck) @@ -130,7 +129,7 @@ func TestManager_OnDisconnectCheck(t *testing.T) { } func newPeer(port int) (host.Host, error) { - priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) + priv, _, err := ic.GenerateKeyPair(ic.RSA, 2048) if err != nil { return nil, err } @@ -160,8 +159,8 @@ func (conn *fakeConn) ID() string { retur func (conn *fakeConn) NewStream(context.Context) (network.Stream, error) { return nil, nil } func (conn *fakeConn) GetStreams() []network.Stream { return nil } func (conn *fakeConn) Stat() network.Stat { return network.Stat{} } -func TestGetIP(t *testing.T) { - ip, err := getIP(&fakeConn{}) +func TestGetRemoteIP(t *testing.T) { + ip, err := getRemoteIP(&fakeConn{}) assert.Nil(t, err) assert.Equal(t, "fe80::7802:31ff:fee9:c093", ip) } diff --git a/rosetta/infra/harmony-mainnet.conf b/rosetta/infra/harmony-mainnet.conf index 475dbcc59..cdbce636a 100644 --- a/rosetta/infra/harmony-mainnet.conf +++ b/rosetta/infra/harmony-mainnet.conf @@ -1,4 +1,4 @@ -Version = "2.5.5" +Version = "2.5.7" [BLSKeys] KMSConfigFile = "" @@ -33,6 +33,7 @@ Version = "2.5.5" IsOffline = false NoStaking = true NodeType = "explorer" + RunElasticMode = false ShardID = 0 TraceEnable = false @@ -66,6 +67,7 @@ Version = "2.5.5" IP = "0.0.0.0" KeyFile = "./.hmykey" MaxConnsPerIP = 10 + MaxPeers = 0 Port = 9000 [Pprof] @@ -107,9 +109,9 @@ Version = "2.5.5" AccountSlots = 16 AllowedTxsFile = "./.hmy/allowedtxs.txt" BlacklistFile = "./.hmy/blacklist.txt" + GlobalSlots = 5120 LocalAccountsFile = "./.hmy/locals.txt" RosettaFixFile = "./rosetta_local_fix.csv" - GlobalSlots = 5120 [WS] AuthPort = 9801 diff --git a/rosetta/infra/harmony-pstn.conf b/rosetta/infra/harmony-pstn.conf index 730a3278c..2e22b491a 100644 --- a/rosetta/infra/harmony-pstn.conf +++ b/rosetta/infra/harmony-pstn.conf @@ -1,4 +1,4 @@ -Version = "2.5.4" +Version = "2.5.7" [BLSKeys] KMSConfigFile = "" @@ -33,6 +33,7 @@ Version = "2.5.4" IsOffline = false NoStaking = true NodeType = "explorer" + RunElasticMode = false ShardID = 0 TraceEnable = false @@ -66,6 +67,7 @@ Version = "2.5.4" IP = "0.0.0.0" KeyFile = "./.hmykey" MaxConnsPerIP = 10 + MaxPeers = 0 Port = 9000 [Pprof] @@ -107,9 +109,9 @@ Version = "2.5.4" AccountSlots = 16 AllowedTxsFile = "./.hmy/allowedtxs.txt" BlacklistFile = "./.hmy/blacklist.txt" + GlobalSlots = 5120 LocalAccountsFile = "./.hmy/locals.txt" RosettaFixFile = "" - GlobalSlots = 5120 [WS] AuthPort = 9801