From bd0cf8c131a2281749c1d29c60c6c9b3995a9088 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Fri, 13 Jan 2023 16:17:07 +0800 Subject: [PATCH] add MaxAdvertiseWaitTime to handle advertisements interval and address stream connection issue --- api/service/stagedstreamsync/const.go | 10 +- api/service/stagedstreamsync/downloader.go | 22 ++--- cmd/harmony/config.go | 2 + cmd/harmony/config_migrations.go | 8 ++ cmd/harmony/default.go | 109 ++++++++++++--------- cmd/harmony/flags.go | 9 ++ cmd/harmony/main.go | 21 ++-- internal/configs/harmony/harmony.go | 23 ++--- p2p/stream/protocols/sync/protocol.go | 17 ++-- rosetta/infra/harmony-mainnet.conf | 3 +- rosetta/infra/harmony-pstn.conf | 3 +- 11 files changed, 137 insertions(+), 90 deletions(-) diff --git a/api/service/stagedstreamsync/const.go b/api/service/stagedstreamsync/const.go index ed68d80b1..0e6bc6e2c 100644 --- a/api/service/stagedstreamsync/const.go +++ b/api/service/stagedstreamsync/const.go @@ -36,11 +36,11 @@ type ( ServerOnly bool // parameters - Network nodeconfig.NetworkType - Concurrency int // Number of concurrent sync requests - MinStreams int // Minimum number of streams to do sync - InitStreams int // Number of streams requirement for initial bootstrap - + Network nodeconfig.NetworkType + Concurrency int // Number of concurrent sync requests + MinStreams int // Minimum number of streams to do sync + InitStreams int // Number of streams requirement for initial bootstrap + MaxAdvertiseWaitTime int // maximum time duration between protocol advertisements // stream manager config SmSoftLowCap int SmHardLowCap int diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index f53a5c41a..cfcd180c0 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -41,17 +41,17 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, isBeaconNode bool, config config.fixValues() sp := sync.NewProtocol(sync.Config{ - Chain: bc, - Host: host.GetP2PHost(), - Discovery: host.GetDiscovery(), - ShardID: nodeconfig.ShardID(bc.ShardID()), - Network: config.Network, - BeaconNode: isBeaconNode, - - SmSoftLowCap: config.SmSoftLowCap, - SmHardLowCap: config.SmHardLowCap, - SmHiCap: config.SmHiCap, - DiscBatch: config.SmDiscBatch, + Chain: bc, + Host: host.GetP2PHost(), + Discovery: host.GetDiscovery(), + ShardID: nodeconfig.ShardID(bc.ShardID()), + Network: config.Network, + BeaconNode: isBeaconNode, + MaxAdvertiseWaitTime: config.MaxAdvertiseWaitTime, + SmSoftLowCap: config.SmSoftLowCap, + SmHardLowCap: config.SmHardLowCap, + SmHiCap: config.SmHiCap, + DiscBatch: config.SmDiscBatch, }) host.AddStreamProtocol(sp) diff --git a/cmd/harmony/config.go b/cmd/harmony/config.go index dea5fb2e3..30c72c2e2 100644 --- a/cmd/harmony/config.go +++ b/cmd/harmony/config.go @@ -139,6 +139,8 @@ func getDefaultSyncConfig(nt nodeconfig.NetworkType) harmonyconfig.SyncConfig { return defaultTestNetSyncConfig case nodeconfig.Localnet: return defaultLocalNetSyncConfig + case nodeconfig.Partner: + return defaultPartnerSyncConfig default: return defaultElseSyncConfig } diff --git a/cmd/harmony/config_migrations.go b/cmd/harmony/config_migrations.go index eb98ccb79..3a256f49d 100644 --- a/cmd/harmony/config_migrations.go +++ b/cmd/harmony/config_migrations.go @@ -318,6 +318,14 @@ func init() { return confTree } + migrations["2.5.10"] = func(confTree *toml.Tree) *toml.Tree { + if confTree.Get("Sync.MaxAdvertiseWaitTime") == nil { + confTree.Set("Sync.MaxAdvertiseWaitTime", defaultConfig.Sync.MaxAdvertiseWaitTime) + } + confTree.Set("Version", "2.5.11") + return confTree + } + // check that the latest version here is the same as in default.go largestKey := getNextVersion(migrations) if largestKey != tomlConfigVersion { diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 7de12af9c..7bdcc02d0 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -5,7 +5,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" ) -const tomlConfigVersion = "2.5.10" +const tomlConfigVersion = "2.5.11" const ( defNetworkType = nodeconfig.Mainnet @@ -159,59 +159,78 @@ var defaultStagedSyncConfig = harmonyconfig.StagedSyncConfig{ var ( defaultMainnetSyncConfig = harmonyconfig.SyncConfig{ - Enabled: false, - Downloader: false, - StagedSync: false, - StagedSyncCfg: defaultStagedSyncConfig, - Concurrency: 6, - MinPeers: 6, - InitStreams: 8, - DiscSoftLowCap: 8, - DiscHardLowCap: 6, - DiscHighCap: 128, - DiscBatch: 8, + Enabled: false, + Downloader: false, + StagedSync: false, + StagedSyncCfg: defaultStagedSyncConfig, + Concurrency: 6, + MinPeers: 6, + InitStreams: 8, + MaxAdvertiseWaitTime: 60, //minutes + DiscSoftLowCap: 8, + DiscHardLowCap: 6, + DiscHighCap: 128, + DiscBatch: 8, } defaultTestNetSyncConfig = harmonyconfig.SyncConfig{ - Enabled: true, - Downloader: false, - StagedSync: false, - StagedSyncCfg: defaultStagedSyncConfig, - Concurrency: 2, - MinPeers: 2, - InitStreams: 2, - DiscSoftLowCap: 2, - DiscHardLowCap: 2, - DiscHighCap: 1024, - DiscBatch: 3, + Enabled: true, + Downloader: false, + StagedSync: false, + StagedSyncCfg: defaultStagedSyncConfig, + Concurrency: 2, + MinPeers: 2, + InitStreams: 2, + MaxAdvertiseWaitTime: 5, //minutes + DiscSoftLowCap: 2, + DiscHardLowCap: 2, + DiscHighCap: 1024, + DiscBatch: 3, } defaultLocalNetSyncConfig = harmonyconfig.SyncConfig{ - Enabled: true, - Downloader: true, - StagedSync: false, - StagedSyncCfg: defaultStagedSyncConfig, - Concurrency: 4, - MinPeers: 5, - InitStreams: 5, - DiscSoftLowCap: 5, - DiscHardLowCap: 5, - DiscHighCap: 1024, - DiscBatch: 8, + Enabled: true, + Downloader: true, + StagedSync: false, + StagedSyncCfg: defaultStagedSyncConfig, + Concurrency: 4, + MinPeers: 5, + InitStreams: 5, + MaxAdvertiseWaitTime: 5, //minutes + DiscSoftLowCap: 5, + DiscHardLowCap: 5, + DiscHighCap: 1024, + DiscBatch: 8, + } + + defaultPartnerSyncConfig = harmonyconfig.SyncConfig{ + Enabled: true, + Downloader: true, + StagedSync: false, + StagedSyncCfg: defaultStagedSyncConfig, + Concurrency: 4, + MinPeers: 2, + InitStreams: 2, + MaxAdvertiseWaitTime: 2, //minutes + DiscSoftLowCap: 2, + DiscHardLowCap: 2, + DiscHighCap: 1024, + DiscBatch: 4, } defaultElseSyncConfig = harmonyconfig.SyncConfig{ - Enabled: true, - Downloader: true, - StagedSync: false, - StagedSyncCfg: defaultStagedSyncConfig, - Concurrency: 4, - MinPeers: 4, - InitStreams: 4, - DiscSoftLowCap: 4, - DiscHardLowCap: 4, - DiscHighCap: 1024, - DiscBatch: 8, + Enabled: true, + Downloader: true, + StagedSync: false, + StagedSyncCfg: defaultStagedSyncConfig, + Concurrency: 4, + MinPeers: 4, + InitStreams: 4, + MaxAdvertiseWaitTime: 2, //minutes + DiscSoftLowCap: 4, + DiscHardLowCap: 4, + DiscHighCap: 1024, + DiscBatch: 8, } ) diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index 26a250d95..168081660 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -1692,6 +1692,11 @@ var ( Usage: "Initial shard-wise number of peers to start syncing", Hidden: true, } + syncMaxAdvertiseWaitTimeFlag = cli.IntFlag{ + Name: "sync.max-advertise-wait-time", + Usage: "The max time duration between two advertises for each p2p peer to tell other nodes what protocols it supports", + Hidden: true, + } syncDiscSoftLowFlag = cli.IntFlag{ Name: "sync.disc.soft-low-cap", Usage: "Soft low cap for sync stream management", @@ -1740,6 +1745,10 @@ func applySyncFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig) { config.Sync.InitStreams = cli.GetIntFlagValue(cmd, syncInitStreamsFlag) } + if cli.IsFlagChanged(cmd, syncMaxAdvertiseWaitTimeFlag) { + config.Sync.MaxAdvertiseWaitTime = cli.GetIntFlagValue(cmd, syncMaxAdvertiseWaitTimeFlag) + } + if cli.IsFlagChanged(cmd, syncDiscSoftLowFlag) { config.Sync.DiscSoftLowCap = cli.GetIntFlagValue(cmd, syncDiscSoftLowFlag) } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 6ab3ed102..dbb18227d 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -918,16 +918,17 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har } sConfig := stagedstreamsync.Config{ - ServerOnly: !hc.Sync.Downloader, - Network: nodeconfig.NetworkType(hc.Network.NetworkType), - Concurrency: hc.Sync.Concurrency, - MinStreams: hc.Sync.MinPeers, - InitStreams: hc.Sync.InitStreams, - SmSoftLowCap: hc.Sync.DiscSoftLowCap, - SmHardLowCap: hc.Sync.DiscHardLowCap, - SmHiCap: hc.Sync.DiscHighCap, - SmDiscBatch: hc.Sync.DiscBatch, - LogProgress: node.NodeConfig.LogProgress, + ServerOnly: !hc.Sync.Downloader, + Network: nodeconfig.NetworkType(hc.Network.NetworkType), + Concurrency: hc.Sync.Concurrency, + MinStreams: hc.Sync.MinPeers, + InitStreams: hc.Sync.InitStreams, + MaxAdvertiseWaitTime: hc.Sync.MaxAdvertiseWaitTime, + SmSoftLowCap: hc.Sync.DiscSoftLowCap, + SmHardLowCap: hc.Sync.DiscHardLowCap, + SmHiCap: hc.Sync.DiscHighCap, + SmDiscBatch: hc.Sync.DiscBatch, + LogProgress: node.NodeConfig.LogProgress, } // If we are running side chain, we will need to do some extra works for beacon diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index 90569b96f..73ddd442d 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -222,17 +222,18 @@ type PrometheusConfig struct { type SyncConfig struct { // TODO: Remove this bool after stream sync is fully up. - Enabled bool // enable the stream sync protocol - Downloader bool // start the sync downloader client - StagedSync bool // use staged sync - StagedSyncCfg StagedSyncConfig // staged sync configurations - Concurrency int // concurrency used for stream sync protocol - MinPeers int // minimum streams to start a sync task. - InitStreams int // minimum streams in bootstrap to start sync loop. - DiscSoftLowCap int // when number of streams is below this value, spin discover during check - DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately - DiscHighCap int // upper limit of streams in one sync protocol - DiscBatch int // size of each discovery + Enabled bool // enable the stream sync protocol + Downloader bool // start the sync downloader client + StagedSync bool // use staged sync + StagedSyncCfg StagedSyncConfig // staged sync configurations + Concurrency int // concurrency used for stream sync protocol + MinPeers int // minimum streams to start a sync task. + InitStreams int // minimum streams in bootstrap to start sync loop. + MaxAdvertiseWaitTime int // maximum time duration between advertisements + DiscSoftLowCap int // when number of streams is below this value, spin discover during check + DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately + DiscHighCap int // upper limit of streams in one sync protocol + DiscBatch int // size of each discovery } type StagedSyncConfig struct { diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index 2a984a581..80ea0927b 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -59,12 +59,13 @@ type ( // Config is the sync protocol config Config struct { - Chain engine.ChainReader - Host libp2p_host.Host - Discovery discovery.Discovery - ShardID nodeconfig.ShardID - Network nodeconfig.NetworkType - BeaconNode bool + Chain engine.ChainReader + Host libp2p_host.Host + Discovery discovery.Discovery + ShardID nodeconfig.ShardID + Network nodeconfig.NetworkType + BeaconNode bool + MaxAdvertiseWaitTime int // stream manager config SmSoftLowCap int SmHardLowCap int @@ -186,6 +187,10 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) { func (p *Protocol) advertiseLoop() { for { sleep := p.advertise() + maxSleepTime := time.Duration(p.config.MaxAdvertiseWaitTime) * time.Minute + if sleep > maxSleepTime { + sleep = maxSleepTime + } select { case <-p.closeC: return diff --git a/rosetta/infra/harmony-mainnet.conf b/rosetta/infra/harmony-mainnet.conf index ddb2f87ee..3e7085ba6 100644 --- a/rosetta/infra/harmony-mainnet.conf +++ b/rosetta/infra/harmony-mainnet.conf @@ -1,4 +1,4 @@ -Version = "2.5.10" +Version = "2.5.11" [BLSKeys] KMSConfigFile = "" @@ -105,6 +105,7 @@ Version = "2.5.10" Enabled = false InitStreams = 8 MinPeers = 5 + MaxAdvertiseWaitTime = 30 [TxPool] AccountSlots = 16 diff --git a/rosetta/infra/harmony-pstn.conf b/rosetta/infra/harmony-pstn.conf index 1c029f828..458e82635 100644 --- a/rosetta/infra/harmony-pstn.conf +++ b/rosetta/infra/harmony-pstn.conf @@ -1,4 +1,4 @@ -Version = "2.5.10" +Version = "2.5.11" [BLSKeys] KMSConfigFile = "" @@ -105,6 +105,7 @@ Version = "2.5.10" Enabled = false InitStreams = 8 MinPeers = 2 + MaxAdvertiseWaitTime = 30 [TxPool] AccountSlots = 16