add MaxAdvertiseWaitTime to handle advertisements interval and address stream connection issue

pull/4351/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent e49dfd7824
commit bd0cf8c131
  1. 10
      api/service/stagedstreamsync/const.go
  2. 22
      api/service/stagedstreamsync/downloader.go
  3. 2
      cmd/harmony/config.go
  4. 8
      cmd/harmony/config_migrations.go
  5. 109
      cmd/harmony/default.go
  6. 9
      cmd/harmony/flags.go
  7. 21
      cmd/harmony/main.go
  8. 23
      internal/configs/harmony/harmony.go
  9. 17
      p2p/stream/protocols/sync/protocol.go
  10. 3
      rosetta/infra/harmony-mainnet.conf
  11. 3
      rosetta/infra/harmony-pstn.conf

@ -36,11 +36,11 @@ type (
ServerOnly bool
// parameters
Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests
MinStreams int // Minimum number of streams to do sync
InitStreams int // Number of streams requirement for initial bootstrap
Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests
MinStreams int // Minimum number of streams to do sync
InitStreams int // Number of streams requirement for initial bootstrap
MaxAdvertiseWaitTime int // maximum time duration between protocol advertisements
// stream manager config
SmSoftLowCap int
SmHardLowCap int

@ -41,17 +41,17 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config
config.fixValues()
sp := sync.NewProtocol(sync.Config{
Chain: bc,
Host: host.GetP2PHost(),
Discovery: host.GetDiscovery(),
ShardID: nodeconfig.ShardID(bc.ShardID()),
Network: config.Network,
BeaconNode: isBeaconNode,
SmSoftLowCap: config.SmSoftLowCap,
SmHardLowCap: config.SmHardLowCap,
SmHiCap: config.SmHiCap,
DiscBatch: config.SmDiscBatch,
Chain: bc,
Host: host.GetP2PHost(),
Discovery: host.GetDiscovery(),
ShardID: nodeconfig.ShardID(bc.ShardID()),
Network: config.Network,
BeaconNode: isBeaconNode,
MaxAdvertiseWaitTime: config.MaxAdvertiseWaitTime,
SmSoftLowCap: config.SmSoftLowCap,
SmHardLowCap: config.SmHardLowCap,
SmHiCap: config.SmHiCap,
DiscBatch: config.SmDiscBatch,
})
host.AddStreamProtocol(sp)

@ -139,6 +139,8 @@ func getDefaultSyncConfig(nt nodeconfig.NetworkType) harmonyconfig.SyncConfig {
return defaultTestNetSyncConfig
case nodeconfig.Localnet:
return defaultLocalNetSyncConfig
case nodeconfig.Partner:
return defaultPartnerSyncConfig
default:
return defaultElseSyncConfig
}

@ -318,6 +318,14 @@ func init() {
return confTree
}
migrations["2.5.10"] = func(confTree *toml.Tree) *toml.Tree {
if confTree.Get("Sync.MaxAdvertiseWaitTime") == nil {
confTree.Set("Sync.MaxAdvertiseWaitTime", defaultConfig.Sync.MaxAdvertiseWaitTime)
}
confTree.Set("Version", "2.5.11")
return confTree
}
// check that the latest version here is the same as in default.go
largestKey := getNextVersion(migrations)
if largestKey != tomlConfigVersion {

@ -5,7 +5,7 @@ import (
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
)
const tomlConfigVersion = "2.5.10"
const tomlConfigVersion = "2.5.11"
const (
defNetworkType = nodeconfig.Mainnet
@ -159,59 +159,78 @@ var defaultStagedSyncConfig = harmonyconfig.StagedSyncConfig{
var (
defaultMainnetSyncConfig = harmonyconfig.SyncConfig{
Enabled: false,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 6,
MinPeers: 6,
InitStreams: 8,
DiscSoftLowCap: 8,
DiscHardLowCap: 6,
DiscHighCap: 128,
DiscBatch: 8,
Enabled: false,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 6,
MinPeers: 6,
InitStreams: 8,
MaxAdvertiseWaitTime: 60, //minutes
DiscSoftLowCap: 8,
DiscHardLowCap: 6,
DiscHighCap: 128,
DiscBatch: 8,
}
defaultTestNetSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 2,
MinPeers: 2,
InitStreams: 2,
DiscSoftLowCap: 2,
DiscHardLowCap: 2,
DiscHighCap: 1024,
DiscBatch: 3,
Enabled: true,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 2,
MinPeers: 2,
InitStreams: 2,
MaxAdvertiseWaitTime: 5, //minutes
DiscSoftLowCap: 2,
DiscHardLowCap: 2,
DiscHighCap: 1024,
DiscBatch: 3,
}
defaultLocalNetSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
MinPeers: 5,
InitStreams: 5,
DiscSoftLowCap: 5,
DiscHardLowCap: 5,
DiscHighCap: 1024,
DiscBatch: 8,
Enabled: true,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
MinPeers: 5,
InitStreams: 5,
MaxAdvertiseWaitTime: 5, //minutes
DiscSoftLowCap: 5,
DiscHardLowCap: 5,
DiscHighCap: 1024,
DiscBatch: 8,
}
defaultPartnerSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
MinPeers: 2,
InitStreams: 2,
MaxAdvertiseWaitTime: 2, //minutes
DiscSoftLowCap: 2,
DiscHardLowCap: 2,
DiscHighCap: 1024,
DiscBatch: 4,
}
defaultElseSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
MinPeers: 4,
InitStreams: 4,
DiscSoftLowCap: 4,
DiscHardLowCap: 4,
DiscHighCap: 1024,
DiscBatch: 8,
Enabled: true,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
MinPeers: 4,
InitStreams: 4,
MaxAdvertiseWaitTime: 2, //minutes
DiscSoftLowCap: 4,
DiscHardLowCap: 4,
DiscHighCap: 1024,
DiscBatch: 8,
}
)

@ -1692,6 +1692,11 @@ var (
Usage: "Initial shard-wise number of peers to start syncing",
Hidden: true,
}
syncMaxAdvertiseWaitTimeFlag = cli.IntFlag{
Name: "sync.max-advertise-wait-time",
Usage: "The max time duration between two advertises for each p2p peer to tell other nodes what protocols it supports",
Hidden: true,
}
syncDiscSoftLowFlag = cli.IntFlag{
Name: "sync.disc.soft-low-cap",
Usage: "Soft low cap for sync stream management",
@ -1740,6 +1745,10 @@ func applySyncFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
config.Sync.InitStreams = cli.GetIntFlagValue(cmd, syncInitStreamsFlag)
}
if cli.IsFlagChanged(cmd, syncMaxAdvertiseWaitTimeFlag) {
config.Sync.MaxAdvertiseWaitTime = cli.GetIntFlagValue(cmd, syncMaxAdvertiseWaitTimeFlag)
}
if cli.IsFlagChanged(cmd, syncDiscSoftLowFlag) {
config.Sync.DiscSoftLowCap = cli.GetIntFlagValue(cmd, syncDiscSoftLowFlag)
}

@ -918,16 +918,17 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
}
sConfig := stagedstreamsync.Config{
ServerOnly: !hc.Sync.Downloader,
Network: nodeconfig.NetworkType(hc.Network.NetworkType),
Concurrency: hc.Sync.Concurrency,
MinStreams: hc.Sync.MinPeers,
InitStreams: hc.Sync.InitStreams,
SmSoftLowCap: hc.Sync.DiscSoftLowCap,
SmHardLowCap: hc.Sync.DiscHardLowCap,
SmHiCap: hc.Sync.DiscHighCap,
SmDiscBatch: hc.Sync.DiscBatch,
LogProgress: node.NodeConfig.LogProgress,
ServerOnly: !hc.Sync.Downloader,
Network: nodeconfig.NetworkType(hc.Network.NetworkType),
Concurrency: hc.Sync.Concurrency,
MinStreams: hc.Sync.MinPeers,
InitStreams: hc.Sync.InitStreams,
MaxAdvertiseWaitTime: hc.Sync.MaxAdvertiseWaitTime,
SmSoftLowCap: hc.Sync.DiscSoftLowCap,
SmHardLowCap: hc.Sync.DiscHardLowCap,
SmHiCap: hc.Sync.DiscHighCap,
SmDiscBatch: hc.Sync.DiscBatch,
LogProgress: node.NodeConfig.LogProgress,
}
// If we are running side chain, we will need to do some extra works for beacon

@ -222,17 +222,18 @@ type PrometheusConfig struct {
type SyncConfig struct {
// TODO: Remove this bool after stream sync is fully up.
Enabled bool // enable the stream sync protocol
Downloader bool // start the sync downloader client
StagedSync bool // use staged sync
StagedSyncCfg StagedSyncConfig // staged sync configurations
Concurrency int // concurrency used for stream sync protocol
MinPeers int // minimum streams to start a sync task.
InitStreams int // minimum streams in bootstrap to start sync loop.
DiscSoftLowCap int // when number of streams is below this value, spin discover during check
DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately
DiscHighCap int // upper limit of streams in one sync protocol
DiscBatch int // size of each discovery
Enabled bool // enable the stream sync protocol
Downloader bool // start the sync downloader client
StagedSync bool // use staged sync
StagedSyncCfg StagedSyncConfig // staged sync configurations
Concurrency int // concurrency used for stream sync protocol
MinPeers int // minimum streams to start a sync task.
InitStreams int // minimum streams in bootstrap to start sync loop.
MaxAdvertiseWaitTime int // maximum time duration between advertisements
DiscSoftLowCap int // when number of streams is below this value, spin discover during check
DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately
DiscHighCap int // upper limit of streams in one sync protocol
DiscBatch int // size of each discovery
}
type StagedSyncConfig struct {

@ -59,12 +59,13 @@ type (
// Config is the sync protocol config
Config struct {
Chain engine.ChainReader
Host libp2p_host.Host
Discovery discovery.Discovery
ShardID nodeconfig.ShardID
Network nodeconfig.NetworkType
BeaconNode bool
Chain engine.ChainReader
Host libp2p_host.Host
Discovery discovery.Discovery
ShardID nodeconfig.ShardID
Network nodeconfig.NetworkType
BeaconNode bool
MaxAdvertiseWaitTime int
// stream manager config
SmSoftLowCap int
SmHardLowCap int
@ -186,6 +187,10 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) {
func (p *Protocol) advertiseLoop() {
for {
sleep := p.advertise()
maxSleepTime := time.Duration(p.config.MaxAdvertiseWaitTime) * time.Minute
if sleep > maxSleepTime {
sleep = maxSleepTime
}
select {
case <-p.closeC:
return

@ -1,4 +1,4 @@
Version = "2.5.10"
Version = "2.5.11"
[BLSKeys]
KMSConfigFile = ""
@ -105,6 +105,7 @@ Version = "2.5.10"
Enabled = false
InitStreams = 8
MinPeers = 5
MaxAdvertiseWaitTime = 30
[TxPool]
AccountSlots = 16

@ -1,4 +1,4 @@
Version = "2.5.10"
Version = "2.5.11"
[BLSKeys]
KMSConfigFile = ""
@ -105,6 +105,7 @@ Version = "2.5.10"
Enabled = false
InitStreams = 8
MinPeers = 2
MaxAdvertiseWaitTime = 30
[TxPool]
AccountSlots = 16

Loading…
Cancel
Save