add MaxAdvertiseWaitTime to handle advertisements interval and address stream connection issue

pull/4377/head
“GheisMohammadi” 2 years ago committed by Casey Gardiner
parent c9c57f8138
commit 7950394dc6
  1. 10
      api/service/stagedstreamsync/const.go
  2. 22
      api/service/stagedstreamsync/downloader.go
  3. 2
      cmd/harmony/config.go
  4. 8
      cmd/harmony/config_migrations.go
  5. 109
      cmd/harmony/default.go
  6. 9
      cmd/harmony/flags.go
  7. 21
      cmd/harmony/main.go
  8. 23
      internal/configs/harmony/harmony.go
  9. 17
      p2p/stream/protocols/sync/protocol.go
  10. 3
      rosetta/infra/harmony-mainnet.conf
  11. 3
      rosetta/infra/harmony-pstn.conf

@ -36,11 +36,11 @@ type (
ServerOnly bool ServerOnly bool
// parameters // parameters
Network nodeconfig.NetworkType Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests Concurrency int // Number of concurrent sync requests
MinStreams int // Minimum number of streams to do sync MinStreams int // Minimum number of streams to do sync
InitStreams int // Number of streams requirement for initial bootstrap InitStreams int // Number of streams requirement for initial bootstrap
MaxAdvertiseWaitTime int // maximum time duration between protocol advertisements
// stream manager config // stream manager config
SmSoftLowCap int SmSoftLowCap int
SmHardLowCap int SmHardLowCap int

@ -41,17 +41,17 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config
config.fixValues() config.fixValues()
sp := sync.NewProtocol(sync.Config{ sp := sync.NewProtocol(sync.Config{
Chain: bc, Chain: bc,
Host: host.GetP2PHost(), Host: host.GetP2PHost(),
Discovery: host.GetDiscovery(), Discovery: host.GetDiscovery(),
ShardID: nodeconfig.ShardID(bc.ShardID()), ShardID: nodeconfig.ShardID(bc.ShardID()),
Network: config.Network, Network: config.Network,
BeaconNode: isBeaconNode, BeaconNode: isBeaconNode,
MaxAdvertiseWaitTime: config.MaxAdvertiseWaitTime,
SmSoftLowCap: config.SmSoftLowCap, SmSoftLowCap: config.SmSoftLowCap,
SmHardLowCap: config.SmHardLowCap, SmHardLowCap: config.SmHardLowCap,
SmHiCap: config.SmHiCap, SmHiCap: config.SmHiCap,
DiscBatch: config.SmDiscBatch, DiscBatch: config.SmDiscBatch,
}) })
host.AddStreamProtocol(sp) host.AddStreamProtocol(sp)

@ -139,6 +139,8 @@ func getDefaultSyncConfig(nt nodeconfig.NetworkType) harmonyconfig.SyncConfig {
return defaultTestNetSyncConfig return defaultTestNetSyncConfig
case nodeconfig.Localnet: case nodeconfig.Localnet:
return defaultLocalNetSyncConfig return defaultLocalNetSyncConfig
case nodeconfig.Partner:
return defaultPartnerSyncConfig
default: default:
return defaultElseSyncConfig return defaultElseSyncConfig
} }

@ -318,6 +318,14 @@ func init() {
return confTree return confTree
} }
migrations["2.5.10"] = func(confTree *toml.Tree) *toml.Tree {
if confTree.Get("Sync.MaxAdvertiseWaitTime") == nil {
confTree.Set("Sync.MaxAdvertiseWaitTime", defaultConfig.Sync.MaxAdvertiseWaitTime)
}
confTree.Set("Version", "2.5.11")
return confTree
}
// check that the latest version here is the same as in default.go // check that the latest version here is the same as in default.go
largestKey := getNextVersion(migrations) largestKey := getNextVersion(migrations)
if largestKey != tomlConfigVersion { if largestKey != tomlConfigVersion {

@ -5,7 +5,7 @@ import (
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
) )
const tomlConfigVersion = "2.5.10" const tomlConfigVersion = "2.5.11"
const ( const (
defNetworkType = nodeconfig.Mainnet defNetworkType = nodeconfig.Mainnet
@ -159,59 +159,78 @@ var defaultStagedSyncConfig = harmonyconfig.StagedSyncConfig{
var ( var (
defaultMainnetSyncConfig = harmonyconfig.SyncConfig{ defaultMainnetSyncConfig = harmonyconfig.SyncConfig{
Enabled: false, Enabled: false,
Downloader: false, Downloader: false,
StagedSync: false, StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig, StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 6, Concurrency: 6,
MinPeers: 6, MinPeers: 6,
InitStreams: 8, InitStreams: 8,
DiscSoftLowCap: 8, MaxAdvertiseWaitTime: 60, //minutes
DiscHardLowCap: 6, DiscSoftLowCap: 8,
DiscHighCap: 128, DiscHardLowCap: 6,
DiscBatch: 8, DiscHighCap: 128,
DiscBatch: 8,
} }
defaultTestNetSyncConfig = harmonyconfig.SyncConfig{ defaultTestNetSyncConfig = harmonyconfig.SyncConfig{
Enabled: true, Enabled: true,
Downloader: false, Downloader: false,
StagedSync: false, StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig, StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 2, Concurrency: 2,
MinPeers: 2, MinPeers: 2,
InitStreams: 2, InitStreams: 2,
DiscSoftLowCap: 2, MaxAdvertiseWaitTime: 5, //minutes
DiscHardLowCap: 2, DiscSoftLowCap: 2,
DiscHighCap: 1024, DiscHardLowCap: 2,
DiscBatch: 3, DiscHighCap: 1024,
DiscBatch: 3,
} }
defaultLocalNetSyncConfig = harmonyconfig.SyncConfig{ defaultLocalNetSyncConfig = harmonyconfig.SyncConfig{
Enabled: true, Enabled: true,
Downloader: true, Downloader: true,
StagedSync: false, StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig, StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4, Concurrency: 4,
MinPeers: 5, MinPeers: 5,
InitStreams: 5, InitStreams: 5,
DiscSoftLowCap: 5, MaxAdvertiseWaitTime: 5, //minutes
DiscHardLowCap: 5, DiscSoftLowCap: 5,
DiscHighCap: 1024, DiscHardLowCap: 5,
DiscBatch: 8, DiscHighCap: 1024,
DiscBatch: 8,
}
defaultPartnerSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
MinPeers: 2,
InitStreams: 2,
MaxAdvertiseWaitTime: 2, //minutes
DiscSoftLowCap: 2,
DiscHardLowCap: 2,
DiscHighCap: 1024,
DiscBatch: 4,
} }
defaultElseSyncConfig = harmonyconfig.SyncConfig{ defaultElseSyncConfig = harmonyconfig.SyncConfig{
Enabled: true, Enabled: true,
Downloader: true, Downloader: true,
StagedSync: false, StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig, StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4, Concurrency: 4,
MinPeers: 4, MinPeers: 4,
InitStreams: 4, InitStreams: 4,
DiscSoftLowCap: 4, MaxAdvertiseWaitTime: 2, //minutes
DiscHardLowCap: 4, DiscSoftLowCap: 4,
DiscHighCap: 1024, DiscHardLowCap: 4,
DiscBatch: 8, DiscHighCap: 1024,
DiscBatch: 8,
} }
) )

@ -1692,6 +1692,11 @@ var (
Usage: "Initial shard-wise number of peers to start syncing", Usage: "Initial shard-wise number of peers to start syncing",
Hidden: true, Hidden: true,
} }
syncMaxAdvertiseWaitTimeFlag = cli.IntFlag{
Name: "sync.max-advertise-wait-time",
Usage: "The max time duration between two advertises for each p2p peer to tell other nodes what protocols it supports",
Hidden: true,
}
syncDiscSoftLowFlag = cli.IntFlag{ syncDiscSoftLowFlag = cli.IntFlag{
Name: "sync.disc.soft-low-cap", Name: "sync.disc.soft-low-cap",
Usage: "Soft low cap for sync stream management", Usage: "Soft low cap for sync stream management",
@ -1740,6 +1745,10 @@ func applySyncFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) {
config.Sync.InitStreams = cli.GetIntFlagValue(cmd, syncInitStreamsFlag) config.Sync.InitStreams = cli.GetIntFlagValue(cmd, syncInitStreamsFlag)
} }
if cli.IsFlagChanged(cmd, syncMaxAdvertiseWaitTimeFlag) {
config.Sync.MaxAdvertiseWaitTime = cli.GetIntFlagValue(cmd, syncMaxAdvertiseWaitTimeFlag)
}
if cli.IsFlagChanged(cmd, syncDiscSoftLowFlag) { if cli.IsFlagChanged(cmd, syncDiscSoftLowFlag) {
config.Sync.DiscSoftLowCap = cli.GetIntFlagValue(cmd, syncDiscSoftLowFlag) config.Sync.DiscSoftLowCap = cli.GetIntFlagValue(cmd, syncDiscSoftLowFlag)
} }

@ -918,16 +918,17 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
} }
sConfig := stagedstreamsync.Config{ sConfig := stagedstreamsync.Config{
ServerOnly: !hc.Sync.Downloader, ServerOnly: !hc.Sync.Downloader,
Network: nodeconfig.NetworkType(hc.Network.NetworkType), Network: nodeconfig.NetworkType(hc.Network.NetworkType),
Concurrency: hc.Sync.Concurrency, Concurrency: hc.Sync.Concurrency,
MinStreams: hc.Sync.MinPeers, MinStreams: hc.Sync.MinPeers,
InitStreams: hc.Sync.InitStreams, InitStreams: hc.Sync.InitStreams,
SmSoftLowCap: hc.Sync.DiscSoftLowCap, MaxAdvertiseWaitTime: hc.Sync.MaxAdvertiseWaitTime,
SmHardLowCap: hc.Sync.DiscHardLowCap, SmSoftLowCap: hc.Sync.DiscSoftLowCap,
SmHiCap: hc.Sync.DiscHighCap, SmHardLowCap: hc.Sync.DiscHardLowCap,
SmDiscBatch: hc.Sync.DiscBatch, SmHiCap: hc.Sync.DiscHighCap,
LogProgress: node.NodeConfig.LogProgress, SmDiscBatch: hc.Sync.DiscBatch,
LogProgress: node.NodeConfig.LogProgress,
} }
// If we are running side chain, we will need to do some extra works for beacon // If we are running side chain, we will need to do some extra works for beacon

@ -222,17 +222,18 @@ type PrometheusConfig struct {
type SyncConfig struct { type SyncConfig struct {
// TODO: Remove this bool after stream sync is fully up. // TODO: Remove this bool after stream sync is fully up.
Enabled bool // enable the stream sync protocol Enabled bool // enable the stream sync protocol
Downloader bool // start the sync downloader client Downloader bool // start the sync downloader client
StagedSync bool // use staged sync StagedSync bool // use staged sync
StagedSyncCfg StagedSyncConfig // staged sync configurations StagedSyncCfg StagedSyncConfig // staged sync configurations
Concurrency int // concurrency used for stream sync protocol Concurrency int // concurrency used for stream sync protocol
MinPeers int // minimum streams to start a sync task. MinPeers int // minimum streams to start a sync task.
InitStreams int // minimum streams in bootstrap to start sync loop. InitStreams int // minimum streams in bootstrap to start sync loop.
DiscSoftLowCap int // when number of streams is below this value, spin discover during check MaxAdvertiseWaitTime int // maximum time duration between advertisements
DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately DiscSoftLowCap int // when number of streams is below this value, spin discover during check
DiscHighCap int // upper limit of streams in one sync protocol DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately
DiscBatch int // size of each discovery DiscHighCap int // upper limit of streams in one sync protocol
DiscBatch int // size of each discovery
} }
type StagedSyncConfig struct { type StagedSyncConfig struct {

@ -59,12 +59,13 @@ type (
// Config is the sync protocol config // Config is the sync protocol config
Config struct { Config struct {
Chain engine.ChainReader Chain engine.ChainReader
Host libp2p_host.Host Host libp2p_host.Host
Discovery discovery.Discovery Discovery discovery.Discovery
ShardID nodeconfig.ShardID ShardID nodeconfig.ShardID
Network nodeconfig.NetworkType Network nodeconfig.NetworkType
BeaconNode bool BeaconNode bool
MaxAdvertiseWaitTime int
// stream manager config // stream manager config
SmSoftLowCap int SmSoftLowCap int
SmHardLowCap int SmHardLowCap int
@ -186,6 +187,10 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) {
func (p *Protocol) advertiseLoop() { func (p *Protocol) advertiseLoop() {
for { for {
sleep := p.advertise() sleep := p.advertise()
maxSleepTime := time.Duration(p.config.MaxAdvertiseWaitTime) * time.Minute
if sleep > maxSleepTime {
sleep = maxSleepTime
}
select { select {
case <-p.closeC: case <-p.closeC:
return return

@ -1,4 +1,4 @@
Version = "2.5.10" Version = "2.5.11"
[BLSKeys] [BLSKeys]
KMSConfigFile = "" KMSConfigFile = ""
@ -105,6 +105,7 @@ Version = "2.5.10"
Enabled = false Enabled = false
InitStreams = 8 InitStreams = 8
MinPeers = 5 MinPeers = 5
MaxAdvertiseWaitTime = 30
[TxPool] [TxPool]
AccountSlots = 16 AccountSlots = 16

@ -1,4 +1,4 @@
Version = "2.5.10" Version = "2.5.11"
[BLSKeys] [BLSKeys]
KMSConfigFile = "" KMSConfigFile = ""
@ -105,6 +105,7 @@ Version = "2.5.10"
Enabled = false Enabled = false
InitStreams = 8 InitStreams = 8
MinPeers = 2 MinPeers = 2
MaxAdvertiseWaitTime = 30
[TxPool] [TxPool]
AccountSlots = 16 AccountSlots = 16

Loading…
Cancel
Save