From af7e0dee71dd5d0e5f4a8031d0d3521089b6dd51 Mon Sep 17 00:00:00 2001 From: MathxH Chen Date: Tue, 27 Jul 2021 06:42:54 +0800 Subject: [PATCH] Add DHT concurrency flag (#3829) --- cmd/harmony/config_migrations.go | 3 +++ cmd/harmony/default.go | 7 ++++--- cmd/harmony/flags.go | 11 ++++++++++- cmd/harmony/flags_test.go | 20 +++++++++++++++----- cmd/harmony/main.go | 9 +++++---- internal/configs/harmony/harmony.go | 9 +++++---- internal/configs/node/network.go | 2 ++ p2p/discovery/option.go | 12 +++++++++--- p2p/discovery/option_test.go | 4 ++-- p2p/host.go | 14 ++++++++------ 10 files changed, 63 insertions(+), 28 deletions(-) diff --git a/cmd/harmony/config_migrations.go b/cmd/harmony/config_migrations.go index 3a0a5676d..b0169021f 100644 --- a/cmd/harmony/config_migrations.go +++ b/cmd/harmony/config_migrations.go @@ -185,6 +185,9 @@ func init() { if confTree.Get("Pprof.ProfileDebugValues") == nil { confTree.Set("Pprof.ProfileDebugValues", defaultConfig.Pprof.ProfileDebugValues) } + if confTree.Get("P2P.DiscConcurrency") == nil { + confTree.Set("P2P.DiscConcurrency", defaultConfig.P2P.DiscConcurrency) + } confTree.Set("Version", "2.2.0") return confTree diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 395645f40..26133cde5 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -24,9 +24,10 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ }, Network: getDefaultNetworkConfig(defNetworkType), P2P: harmonyconfig.P2pConfig{ - Port: nodeconfig.DefaultP2PPort, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./.hmykey", + Port: nodeconfig.DefaultP2PPort, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./.hmykey", + DiscConcurrency: nodeconfig.DefaultP2PConcurrency, }, HTTP: harmonyconfig.HttpConfig{ Enabled: true, diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index db249ec6c..522113d2c 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -55,7 +55,7 @@ var ( p2pIPFlag, p2pKeyFileFlag, p2pDHTDataStoreFlag, - + p2pDiscoveryConcurrencyFlag, legacyKeyFileFlag, } @@ -517,6 +517,11 @@ var ( DefValue: defaultConfig.P2P.KeyFile, Deprecated: "use --p2p.keyfile", } + p2pDiscoveryConcurrencyFlag = cli.IntFlag{ + Name: "p2p.disc.concurrency", + Usage: "the pubsub's DHT discovery concurrency num (default with raw libp2p dht option)", + DefValue: defaultConfig.P2P.DiscConcurrency, + } ) func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { @@ -540,6 +545,10 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { ds := cli.GetStringFlagValue(cmd, p2pDHTDataStoreFlag) config.P2P.DHTDataStore = &ds } + + if cli.IsFlagChanged(cmd, p2pDiscoveryConcurrencyFlag) { + config.P2P.DiscConcurrency = cli.GetIntFlagValue(cmd, p2pDiscoveryConcurrencyFlag) + } } // http flags diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index 382ed9917..2b7506317 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -30,7 +30,7 @@ func TestHarmonyFlags(t *testing.T) { "2p/QmRVbTpEYup8dSaURZfF6ByrMTSKa4UyUzJhSjahFzRqNj --ip 8.8.8.8 --port 9000 --network_type=mainn" + "et --dns_zone=t.hmny.io --blacklist=./.hmy/blacklist.txt --min_peers=6 --max_bls_keys_per_node=" + "10 --broadcast_invalid_tx=true --verbosity=3 --is_archival=false --shard_id=-1 --staking=true -" + - "-aws-config-source file:config.json", + "-aws-config-source file:config.json --p2p.disc.concurrency 5", expConfig: harmonyconfig.HarmonyConfig{ Version: tomlConfigVersion, General: harmonyconfig.GeneralConfig{ @@ -57,9 +57,10 @@ func TestHarmonyFlags(t *testing.T) { ServerPort: nodeconfig.DefaultDNSPort, }, P2P: harmonyconfig.P2pConfig{ - Port: 9000, - IP: defaultConfig.P2P.IP, - KeyFile: defaultConfig.P2P.KeyFile, + Port: 9000, + IP: defaultConfig.P2P.IP, + KeyFile: defaultConfig.P2P.KeyFile, + DiscConcurrency: 5, }, HTTP: harmonyconfig.HttpConfig{ Enabled: true, @@ -373,6 +374,15 @@ func TestP2PFlags(t *testing.T) { KeyFile: "./key.file", }, }, + { + args: []string{"--p2p.port", "9001", "--p2p.disc.concurrency", "5"}, + expConfig: harmonyconfig.P2pConfig{ + Port: 9001, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./.hmykey", + DiscConcurrency: 5, + }, + }, } for i, test := range tests { ts := newFlagTestSuite(t, append(p2pFlags, legacyMiscFlags...), @@ -391,7 +401,7 @@ func TestP2PFlags(t *testing.T) { continue } if !reflect.DeepEqual(got.P2P, test.expConfig) { - t.Errorf("Test %v: unexpected config: \n\t%+v\n\t%+v", i, got.Network, test.expConfig) + t.Errorf("Test %v: unexpected config: \n\t%+v\n\t%+v", i, got.P2P, test.expConfig) } ts.tearDown() } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 857dd7447..786680334 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -578,10 +578,11 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, } myHost, err = p2p.NewHost(p2p.HostConfig{ - Self: &selfPeer, - BLSKey: nodeConfig.P2PPriKey, - BootNodes: hc.Network.BootNodes, - DataStoreFile: hc.P2P.DHTDataStore, + Self: &selfPeer, + BLSKey: nodeConfig.P2PPriKey, + BootNodes: hc.Network.BootNodes, + DataStoreFile: hc.P2P.DHTDataStore, + DiscConcurrency: hc.P2P.DiscConcurrency, }) if err != nil { return nil, errors.Wrap(err, "cannot create P2P network host") diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index f50d3c8ed..afe76f987 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -45,10 +45,11 @@ type NetworkConfig struct { } type P2pConfig struct { - Port int - IP string - KeyFile string - DHTDataStore *string `toml:",omitempty"` + Port int + IP string + KeyFile string + DHTDataStore *string `toml:",omitempty"` + DiscConcurrency int // Discovery Concurrency value } type GeneralConfig struct { diff --git a/internal/configs/node/network.go b/internal/configs/node/network.go index 9efd6500d..412d4cb53 100644 --- a/internal/configs/node/network.go +++ b/internal/configs/node/network.go @@ -54,6 +54,8 @@ const ( DefaultWSPort = 9800 // DefaultPrometheusPort is the default prometheus port. The actual port used is 9000+900 DefaultPrometheusPort = 9900 + // DefaultP2PConcurrency is the default P2P concurrency, 0 means is set the default value of P2P Discovery, the actual value is 10 + DefaultP2PConcurrency = 0 ) const ( diff --git a/p2p/discovery/option.go b/p2p/discovery/option.go index 5b44e66df..0afe6b8a2 100644 --- a/p2p/discovery/option.go +++ b/p2p/discovery/option.go @@ -11,8 +11,9 @@ import ( // DHTConfig is the configurable DHT options. // For normal nodes, only BootNodes field need to be specified. type DHTConfig struct { - BootNodes []string - DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes. + BootNodes []string + DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes. + DiscConcurrency int } // getLibp2pRawOptions get the raw libp2p options as a slice. @@ -33,7 +34,12 @@ func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) { opts = append(opts, dsOption) } - opts = append(opts, libp2p_dht.Concurrency(1)) + // if Concurrency <= 0, it uses default concurrency supplied from libp2p dht + // the concurrency num meaning you can see Section 2.3 in the KAD paper https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf + if opt.DiscConcurrency > 0 { + opts = append(opts, libp2p_dht.Concurrency(opt.DiscConcurrency)) + } + return opts, nil } diff --git a/p2p/discovery/option_test.go b/p2p/discovery/option_test.go index 8a5273087..747d7ca95 100644 --- a/p2p/discovery/option_test.go +++ b/p2p/discovery/option_test.go @@ -40,14 +40,14 @@ func TestDHTOption_getLibp2pRawOptions(t *testing.T) { opt: DHTConfig{ BootNodes: testAddrStr, }, - expLen: 2, + expLen: 1, }, { opt: DHTConfig{ BootNodes: testAddrStr, DataStoreFile: &validPath, }, - expLen: 3, + expLen: 2, }, { opt: DHTConfig{ diff --git a/p2p/host.go b/p2p/host.go index 020f45969..830d76ef3 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -74,10 +74,11 @@ const ( // HostConfig is the config structure to create a new host type HostConfig struct { - Self *Peer - BLSKey libp2p_crypto.PrivKey - BootNodes []string - DataStoreFile *string + Self *Peer + BLSKey libp2p_crypto.PrivKey + BootNodes []string + DataStoreFile *string + DiscConcurrency int } // NewHost .. @@ -104,8 +105,9 @@ func NewHost(cfg HostConfig) (Host, error) { } disc, err := discovery.NewDHTDiscovery(p2pHost, discovery.DHTConfig{ - BootNodes: cfg.BootNodes, - DataStoreFile: cfg.DataStoreFile, + BootNodes: cfg.BootNodes, + DataStoreFile: cfg.DataStoreFile, + DiscConcurrency: cfg.DiscConcurrency, }) if err != nil { return nil, errors.Wrap(err, "cannot create DHT discovery")