Add DHT concurrency flag (#3829)

pull/3834/head
MathxH Chen 3 years ago committed by GitHub
parent 1ed74476ad
commit af7e0dee71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      cmd/harmony/config_migrations.go
  2. 7
      cmd/harmony/default.go
  3. 11
      cmd/harmony/flags.go
  4. 20
      cmd/harmony/flags_test.go
  5. 9
      cmd/harmony/main.go
  6. 9
      internal/configs/harmony/harmony.go
  7. 2
      internal/configs/node/network.go
  8. 12
      p2p/discovery/option.go
  9. 4
      p2p/discovery/option_test.go
  10. 14
      p2p/host.go

@ -185,6 +185,9 @@ func init() {
if confTree.Get("Pprof.ProfileDebugValues") == nil { if confTree.Get("Pprof.ProfileDebugValues") == nil {
confTree.Set("Pprof.ProfileDebugValues", defaultConfig.Pprof.ProfileDebugValues) confTree.Set("Pprof.ProfileDebugValues", defaultConfig.Pprof.ProfileDebugValues)
} }
if confTree.Get("P2P.DiscConcurrency") == nil {
confTree.Set("P2P.DiscConcurrency", defaultConfig.P2P.DiscConcurrency)
}
confTree.Set("Version", "2.2.0") confTree.Set("Version", "2.2.0")
return confTree return confTree

@ -24,9 +24,10 @@ var defaultConfig = harmonyconfig.HarmonyConfig{
}, },
Network: getDefaultNetworkConfig(defNetworkType), Network: getDefaultNetworkConfig(defNetworkType),
P2P: harmonyconfig.P2pConfig{ P2P: harmonyconfig.P2pConfig{
Port: nodeconfig.DefaultP2PPort, Port: nodeconfig.DefaultP2PPort,
IP: nodeconfig.DefaultPublicListenIP, IP: nodeconfig.DefaultPublicListenIP,
KeyFile: "./.hmykey", KeyFile: "./.hmykey",
DiscConcurrency: nodeconfig.DefaultP2PConcurrency,
}, },
HTTP: harmonyconfig.HttpConfig{ HTTP: harmonyconfig.HttpConfig{
Enabled: true, Enabled: true,

@ -55,7 +55,7 @@ var (
p2pIPFlag, p2pIPFlag,
p2pKeyFileFlag, p2pKeyFileFlag,
p2pDHTDataStoreFlag, p2pDHTDataStoreFlag,
p2pDiscoveryConcurrencyFlag,
legacyKeyFileFlag, legacyKeyFileFlag,
} }
@ -517,6 +517,11 @@ var (
DefValue: defaultConfig.P2P.KeyFile, DefValue: defaultConfig.P2P.KeyFile,
Deprecated: "use --p2p.keyfile", Deprecated: "use --p2p.keyfile",
} }
p2pDiscoveryConcurrencyFlag = cli.IntFlag{
Name: "p2p.disc.concurrency",
Usage: "the pubsub's DHT discovery concurrency num (default with raw libp2p dht option)",
DefValue: defaultConfig.P2P.DiscConcurrency,
}
) )
func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
@ -540,6 +545,10 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
ds := cli.GetStringFlagValue(cmd, p2pDHTDataStoreFlag) ds := cli.GetStringFlagValue(cmd, p2pDHTDataStoreFlag)
config.P2P.DHTDataStore = &ds config.P2P.DHTDataStore = &ds
} }
if cli.IsFlagChanged(cmd, p2pDiscoveryConcurrencyFlag) {
config.P2P.DiscConcurrency = cli.GetIntFlagValue(cmd, p2pDiscoveryConcurrencyFlag)
}
} }
// http flags // http flags

@ -30,7 +30,7 @@ func TestHarmonyFlags(t *testing.T) {
"2p/QmRVbTpEYup8dSaURZfF6ByrMTSKa4UyUzJhSjahFzRqNj --ip 8.8.8.8 --port 9000 --network_type=mainn" + "2p/QmRVbTpEYup8dSaURZfF6ByrMTSKa4UyUzJhSjahFzRqNj --ip 8.8.8.8 --port 9000 --network_type=mainn" +
"et --dns_zone=t.hmny.io --blacklist=./.hmy/blacklist.txt --min_peers=6 --max_bls_keys_per_node=" + "et --dns_zone=t.hmny.io --blacklist=./.hmy/blacklist.txt --min_peers=6 --max_bls_keys_per_node=" +
"10 --broadcast_invalid_tx=true --verbosity=3 --is_archival=false --shard_id=-1 --staking=true -" + "10 --broadcast_invalid_tx=true --verbosity=3 --is_archival=false --shard_id=-1 --staking=true -" +
"-aws-config-source file:config.json", "-aws-config-source file:config.json --p2p.disc.concurrency 5",
expConfig: harmonyconfig.HarmonyConfig{ expConfig: harmonyconfig.HarmonyConfig{
Version: tomlConfigVersion, Version: tomlConfigVersion,
General: harmonyconfig.GeneralConfig{ General: harmonyconfig.GeneralConfig{
@ -57,9 +57,10 @@ func TestHarmonyFlags(t *testing.T) {
ServerPort: nodeconfig.DefaultDNSPort, ServerPort: nodeconfig.DefaultDNSPort,
}, },
P2P: harmonyconfig.P2pConfig{ P2P: harmonyconfig.P2pConfig{
Port: 9000, Port: 9000,
IP: defaultConfig.P2P.IP, IP: defaultConfig.P2P.IP,
KeyFile: defaultConfig.P2P.KeyFile, KeyFile: defaultConfig.P2P.KeyFile,
DiscConcurrency: 5,
}, },
HTTP: harmonyconfig.HttpConfig{ HTTP: harmonyconfig.HttpConfig{
Enabled: true, Enabled: true,
@ -373,6 +374,15 @@ func TestP2PFlags(t *testing.T) {
KeyFile: "./key.file", KeyFile: "./key.file",
}, },
}, },
{
args: []string{"--p2p.port", "9001", "--p2p.disc.concurrency", "5"},
expConfig: harmonyconfig.P2pConfig{
Port: 9001,
IP: nodeconfig.DefaultPublicListenIP,
KeyFile: "./.hmykey",
DiscConcurrency: 5,
},
},
} }
for i, test := range tests { for i, test := range tests {
ts := newFlagTestSuite(t, append(p2pFlags, legacyMiscFlags...), ts := newFlagTestSuite(t, append(p2pFlags, legacyMiscFlags...),
@ -391,7 +401,7 @@ func TestP2PFlags(t *testing.T) {
continue continue
} }
if !reflect.DeepEqual(got.P2P, test.expConfig) { if !reflect.DeepEqual(got.P2P, test.expConfig) {
t.Errorf("Test %v: unexpected config: \n\t%+v\n\t%+v", i, got.Network, test.expConfig) t.Errorf("Test %v: unexpected config: \n\t%+v\n\t%+v", i, got.P2P, test.expConfig)
} }
ts.tearDown() ts.tearDown()
} }

@ -578,10 +578,11 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
} }
myHost, err = p2p.NewHost(p2p.HostConfig{ myHost, err = p2p.NewHost(p2p.HostConfig{
Self: &selfPeer, Self: &selfPeer,
BLSKey: nodeConfig.P2PPriKey, BLSKey: nodeConfig.P2PPriKey,
BootNodes: hc.Network.BootNodes, BootNodes: hc.Network.BootNodes,
DataStoreFile: hc.P2P.DHTDataStore, DataStoreFile: hc.P2P.DHTDataStore,
DiscConcurrency: hc.P2P.DiscConcurrency,
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "cannot create P2P network host") return nil, errors.Wrap(err, "cannot create P2P network host")

@ -45,10 +45,11 @@ type NetworkConfig struct {
} }
type P2pConfig struct { type P2pConfig struct {
Port int Port int
IP string IP string
KeyFile string KeyFile string
DHTDataStore *string `toml:",omitempty"` DHTDataStore *string `toml:",omitempty"`
DiscConcurrency int // Discovery Concurrency value
} }
type GeneralConfig struct { type GeneralConfig struct {

@ -54,6 +54,8 @@ const (
DefaultWSPort = 9800 DefaultWSPort = 9800
// DefaultPrometheusPort is the default prometheus port. The actual port used is 9000+900 // DefaultPrometheusPort is the default prometheus port. The actual port used is 9000+900
DefaultPrometheusPort = 9900 DefaultPrometheusPort = 9900
// DefaultP2PConcurrency is the default P2P concurrency, 0 means is set the default value of P2P Discovery, the actual value is 10
DefaultP2PConcurrency = 0
) )
const ( const (

@ -11,8 +11,9 @@ import (
// DHTConfig is the configurable DHT options. // DHTConfig is the configurable DHT options.
// For normal nodes, only BootNodes field need to be specified. // For normal nodes, only BootNodes field need to be specified.
type DHTConfig struct { type DHTConfig struct {
BootNodes []string BootNodes []string
DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes. DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes.
DiscConcurrency int
} }
// getLibp2pRawOptions get the raw libp2p options as a slice. // getLibp2pRawOptions get the raw libp2p options as a slice.
@ -33,7 +34,12 @@ func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) {
opts = append(opts, dsOption) opts = append(opts, dsOption)
} }
opts = append(opts, libp2p_dht.Concurrency(1)) // if Concurrency <= 0, it uses default concurrency supplied from libp2p dht
// the concurrency num meaning you can see Section 2.3 in the KAD paper https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf
if opt.DiscConcurrency > 0 {
opts = append(opts, libp2p_dht.Concurrency(opt.DiscConcurrency))
}
return opts, nil return opts, nil
} }

@ -40,14 +40,14 @@ func TestDHTOption_getLibp2pRawOptions(t *testing.T) {
opt: DHTConfig{ opt: DHTConfig{
BootNodes: testAddrStr, BootNodes: testAddrStr,
}, },
expLen: 2, expLen: 1,
}, },
{ {
opt: DHTConfig{ opt: DHTConfig{
BootNodes: testAddrStr, BootNodes: testAddrStr,
DataStoreFile: &validPath, DataStoreFile: &validPath,
}, },
expLen: 3, expLen: 2,
}, },
{ {
opt: DHTConfig{ opt: DHTConfig{

@ -74,10 +74,11 @@ const (
// HostConfig is the config structure to create a new host // HostConfig is the config structure to create a new host
type HostConfig struct { type HostConfig struct {
Self *Peer Self *Peer
BLSKey libp2p_crypto.PrivKey BLSKey libp2p_crypto.PrivKey
BootNodes []string BootNodes []string
DataStoreFile *string DataStoreFile *string
DiscConcurrency int
} }
// NewHost .. // NewHost ..
@ -104,8 +105,9 @@ func NewHost(cfg HostConfig) (Host, error) {
} }
disc, err := discovery.NewDHTDiscovery(p2pHost, discovery.DHTConfig{ disc, err := discovery.NewDHTDiscovery(p2pHost, discovery.DHTConfig{
BootNodes: cfg.BootNodes, BootNodes: cfg.BootNodes,
DataStoreFile: cfg.DataStoreFile, DataStoreFile: cfg.DataStoreFile,
DiscConcurrency: cfg.DiscConcurrency,
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "cannot create DHT discovery") return nil, errors.Wrap(err, "cannot create DHT discovery")

Loading…
Cancel
Save