diff --git a/api/service/manager.go b/api/service/manager.go index baaaba29b..1bb53137b 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -21,6 +21,7 @@ const ( BlockProposal NetworkInfo Prometheus + Synchronize ) func (t Type) String() string { @@ -37,6 +38,8 @@ func (t Type) String() string { return "NetworkInfo" case Prometheus: return "Prometheus" + case Synchronize: + return "Synchronize" default: return "Unknown" } @@ -82,6 +85,11 @@ func (m *Manager) GetServices() []Service { return m.services } +// GetService get the specified service +func (m *Manager) GetService(t Type) Service { + return m.serviceMap[t] +} + // StartServices run all registered services. If one of the starting service returns // an error, closing all started services. func (m *Manager) StartServices() (err error) { diff --git a/api/service/synchronize/service.go b/api/service/synchronize/service.go new file mode 100644 index 000000000..92aac3206 --- /dev/null +++ b/api/service/synchronize/service.go @@ -0,0 +1,37 @@ +package synchronize + +import ( + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/hmy/downloader" + "github.com/harmony-one/harmony/p2p" +) + +// Service is simply a adapter of Downloaders, which support block synchronization +type Service struct { + Downloaders *downloader.Downloaders +} + +// NewService creates the a new downloader service +func NewService(host p2p.Host, bcs []*core.BlockChain, config downloader.Config) *Service { + return &Service{ + Downloaders: downloader.NewDownloaders(host, bcs, config), + } +} + +// Start start the service +func (s *Service) Start() error { + s.Downloaders.Start() + return nil +} + +// Stop stop the service +func (s *Service) Stop() error { + s.Downloaders.Close() + return nil +} + +// APIs return all APIs of the service +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/cmd/harmony/config.go b/cmd/harmony/config.go index 5923f7197..f569e9ea1 100644 --- a/cmd/harmony/config.go +++ b/cmd/harmony/config.go @@ -28,6 +28,7 @@ type harmonyConfig struct { TxPool txPoolConfig Pprof pprofConfig Log logConfig + Sync syncConfig Sys *sysConfig `toml:",omitempty"` Consensus *consensusConfig `toml:",omitempty"` Devnet *devnetConfig `toml:",omitempty"` @@ -152,6 +153,20 @@ type prometheusConfig struct { Gateway string } +type syncConfig struct { + // TODO: Remove this bool after stream sync is fully up. + Downloader bool // start the sync downloader client + LegacyServer bool // provide the gRPC sync protocol server + LegacyClient bool // aside from stream sync protocol, also run gRPC client to get blocks + Concurrency int // concurrency used for stream sync protocol + MinPeers int // minimum streams to start a sync task. + InitStreams int // minimum streams in bootstrap to start sync loop. + DiscSoftLowCap int // when number of streams is below this value, spin discover during check + DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately + DiscHighCap int // upper limit of streams in one sync protocol + DiscBatch int // size of each discovery +} + // TODO: use specific type wise validation instead of general string types assertion. func validateHarmonyConfig(config harmonyConfig) error { var accepts []string @@ -188,6 +203,11 @@ func validateHarmonyConfig(config harmonyConfig) error { return fmt.Errorf("flag --run.offline must have p2p IP be %v", nodeconfig.DefaultLocalListenIP) } + if !config.Sync.Downloader && !config.Sync.LegacyClient { + // There is no module up for sync + return errors.New("either --sync.downloader or --sync.legacy.client shall be enabled") + } + return nil } @@ -235,6 +255,17 @@ func parseNetworkType(nt string) nodeconfig.NetworkType { } } +func getDefaultSyncConfig(nt nodeconfig.NetworkType) syncConfig { + switch nt { + case nodeconfig.Mainnet: + return defaultMainnetSyncConfig + case nodeconfig.Testnet: + return defaultTestNetSyncConfig + default: + return defaultElseSyncConfig + } +} + var dumpConfigCmd = &cobra.Command{ Use: "dumpconfig [config_file]", Short: "dump the config file for harmony binary configurations", diff --git a/cmd/harmony/config_test.go b/cmd/harmony/config_test.go index 75aa71ef5..b0aecc939 100644 --- a/cmd/harmony/config_test.go +++ b/cmd/harmony/config_test.go @@ -30,7 +30,7 @@ func init() { } func TestV1_0_0Config(t *testing.T) { - testConfig := `Version = "1.0.3" + testConfig := `Version = "1.0.4" [BLSKeys] KMSConfigFile = "" @@ -80,6 +80,18 @@ func TestV1_0_0Config(t *testing.T) { [TxPool] BlacklistFile = "./.hmy/blacklist.txt" +[Sync] + Downloader = false + Concurrency = 6 + DiscBatch = 8 + DiscHardLowCap = 6 + DiscHighCap = 128 + DiscSoftLowCap = 8 + InitStreams = 8 + LegacyClient = true + LegacyServer = true + MinPeers = 6 + [WS] Enabled = true IP = "127.0.0.1" @@ -96,20 +108,21 @@ func TestV1_0_0Config(t *testing.T) { if err != nil { t.Fatal(err) } + defConf := getDefaultHmyConfigCopy(nodeconfig.Mainnet) if config.HTTP.RosettaEnabled { t.Errorf("Expected rosetta http server to be disabled when loading old config") } if config.General.IsOffline { t.Errorf("Expect node to de online when loading old config") } - if config.P2P.IP != defaultConfig.P2P.IP { + if config.P2P.IP != defConf.P2P.IP { t.Errorf("Expect default p2p IP if old config is provided") } - if config.Version != "1.0.3" { - t.Errorf("Expected config version: 1.0.3, not %v", config.Version) + if config.Version != "1.0.4" { + t.Errorf("Expected config version: 1.0.4, not %v", config.Version) } - config.Version = defaultConfig.Version // Shortcut for testing, value checked above - if !reflect.DeepEqual(config, defaultConfig) { + config.Version = defConf.Version // Shortcut for testing, value checked above + if !reflect.DeepEqual(config, defConf) { t.Errorf("Unexpected config \n\t%+v \n\t%+v", config, defaultConfig) } } diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 4c04b826f..a6be5c8d1 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -2,7 +2,7 @@ package main import nodeconfig "github.com/harmony-one/harmony/internal/configs/node" -const tomlConfigVersion = "1.0.3" +const tomlConfigVersion = "1.0.4" const ( defNetworkType = nodeconfig.Mainnet @@ -102,6 +102,47 @@ var defaultPrometheusConfig = prometheusConfig{ Gateway: "https://gateway.harmony.one", } +var ( + defaultMainnetSyncConfig = syncConfig{ + Downloader: false, + LegacyServer: true, + LegacyClient: true, + Concurrency: 6, + MinPeers: 6, + InitStreams: 8, + DiscSoftLowCap: 8, + DiscHardLowCap: 6, + DiscHighCap: 128, + DiscBatch: 8, + } + + defaultTestNetSyncConfig = syncConfig{ + Downloader: false, + LegacyServer: true, + LegacyClient: true, + Concurrency: 4, + MinPeers: 4, + InitStreams: 4, + DiscSoftLowCap: 4, + DiscHardLowCap: 4, + DiscHighCap: 1024, + DiscBatch: 8, + } + + defaultElseSyncConfig = syncConfig{ + Downloader: true, + LegacyServer: true, + LegacyClient: false, + Concurrency: 4, + MinPeers: 4, + InitStreams: 4, + DiscSoftLowCap: 4, + DiscHardLowCap: 4, + DiscHighCap: 1024, + DiscBatch: 8, + } +) + const ( defaultBroadcastInvalidTx = true ) @@ -114,6 +155,8 @@ func getDefaultHmyConfigCopy(nt nodeconfig.NetworkType) harmonyConfig { devnet := getDefaultDevnetConfigCopy() config.Devnet = &devnet } + config.Sync = getDefaultSyncConfig(nt) + return config } diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index 5be21de29..34ffd516a 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -175,6 +175,19 @@ var ( prometheusGatewayFlag, prometheusEnablePushFlag, } + + syncFlags = []cli.Flag{ + syncDownloaderFlag, + syncLegacyClientFlag, + syncLegacyServerFlag, + syncConcurrencyFlag, + syncMinPeersFlag, + syncInitStreamsFlag, + syncDiscSoftLowFlag, + syncDiscHardLowFlag, + syncDiscHighFlag, + syncDiscBatchFlag, + } ) var ( @@ -267,6 +280,7 @@ func getRootFlags() []cli.Flag { flags = append(flags, revertFlags...) flags = append(flags, legacyMiscFlags...) flags = append(flags, prometheusFlags...) + flags = append(flags, syncFlags...) return flags } @@ -1269,3 +1283,108 @@ func applyPrometheusFlags(cmd *cobra.Command, config *harmonyConfig) { config.Prometheus.EnablePush = cli.GetBoolFlagValue(cmd, prometheusEnablePushFlag) } } + +var ( + // TODO: Deprecate this flag, and always set to true after stream sync is fully up. + syncDownloaderFlag = cli.BoolFlag{ + Name: "sync.downloader", + Usage: "Enable the downloader module to sync through stream sync protocol", + Hidden: true, + DefValue: false, + } + syncLegacyServerFlag = cli.BoolFlag{ + Name: "sync.legacy.server", + Usage: "Enable the gRPC sync server for backward compatibility", + Hidden: true, + DefValue: true, + } + syncLegacyClientFlag = cli.BoolFlag{ + Name: "sync.legacy.client", + Usage: "Enable the legacy centralized sync service for block synchronization", + Hidden: true, + DefValue: false, + } + syncConcurrencyFlag = cli.IntFlag{ + Name: "sync.concurrency", + Usage: "Concurrency when doing p2p sync requests", + Hidden: true, + } + syncMinPeersFlag = cli.IntFlag{ + Name: "sync.min-peers", + Usage: "Minimum peers check for each shard-wise sync loop", + Hidden: true, + } + syncInitStreamsFlag = cli.IntFlag{ + Name: "sync.init-peers", + Usage: "Initial shard-wise number of peers to start syncing", + Hidden: true, + } + syncDiscSoftLowFlag = cli.IntFlag{ + Name: "sync.disc.soft-low-cap", + Usage: "Soft low cap for sync stream management", + Hidden: true, + } + syncDiscHardLowFlag = cli.IntFlag{ + Name: "sync.disc.hard-low-cap", + Usage: "Hard low cap for sync stream management", + Hidden: true, + } + syncDiscHighFlag = cli.IntFlag{ + Name: "sync.disc.hi-cap", + Usage: "High cap for sync stream management", + Hidden: true, + } + syncDiscBatchFlag = cli.IntFlag{ + Name: "sync.disc.batch", + Usage: "batch size of the sync discovery", + Hidden: true, + } +) + +// applySyncFlags apply the sync flags. +func applySyncFlags(cmd *cobra.Command, config *harmonyConfig) { + if config.Sync == (syncConfig{}) { + nt := nodeconfig.NetworkType(config.Network.NetworkType) + config.Sync = getDefaultSyncConfig(nt) + } + + if cli.IsFlagChanged(cmd, syncDownloaderFlag) { + config.Sync.Downloader = cli.GetBoolFlagValue(cmd, syncDownloaderFlag) + } + + if cli.IsFlagChanged(cmd, syncLegacyServerFlag) { + config.Sync.LegacyServer = cli.GetBoolFlagValue(cmd, syncLegacyServerFlag) + } + + if cli.IsFlagChanged(cmd, syncLegacyClientFlag) { + config.Sync.LegacyClient = cli.GetBoolFlagValue(cmd, syncLegacyClientFlag) + } + + if cli.IsFlagChanged(cmd, syncConcurrencyFlag) { + config.Sync.Concurrency = cli.GetIntFlagValue(cmd, syncConcurrencyFlag) + } + + if cli.IsFlagChanged(cmd, syncMinPeersFlag) { + config.Sync.MinPeers = cli.GetIntFlagValue(cmd, syncMinPeersFlag) + } + + if cli.IsFlagChanged(cmd, syncInitStreamsFlag) { + config.Sync.InitStreams = cli.GetIntFlagValue(cmd, syncInitStreamsFlag) + } + + if cli.IsFlagChanged(cmd, syncDiscSoftLowFlag) { + config.Sync.DiscSoftLowCap = cli.GetIntFlagValue(cmd, syncDiscSoftLowFlag) + } + + if cli.IsFlagChanged(cmd, syncDiscHardLowFlag) { + config.Sync.DiscHardLowCap = cli.GetIntFlagValue(cmd, syncDiscHardLowFlag) + } + + if cli.IsFlagChanged(cmd, syncDiscHighFlag) { + config.Sync.DiscHighCap = cli.GetIntFlagValue(cmd, syncDiscHighFlag) + } + + if cli.IsFlagChanged(cmd, syncDiscBatchFlag) { + config.Sync.DiscBatch = cli.GetIntFlagValue(cmd, syncDiscBatchFlag) + } +} diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index 2f9d0ddf0..2f2738057 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -112,6 +112,7 @@ func TestHarmonyFlags(t *testing.T) { EnablePush: true, Gateway: "https://gateway.harmony.one", }, + Sync: defaultMainnetSyncConfig, }, }, } @@ -947,6 +948,79 @@ func TestRevertFlags(t *testing.T) { } } +func TestSyncFlags(t *testing.T) { + tests := []struct { + args []string + network string + expConfig syncConfig + expErr error + }{ + { + args: []string{}, + network: "mainnet", + expConfig: defaultMainnetSyncConfig, + }, + { + args: []string{"--sync.legacy.server", "--sync.legacy.client"}, + network: "mainnet", + expConfig: func() syncConfig { + cfg := defaultMainnetSyncConfig + cfg.LegacyClient = true + cfg.LegacyServer = true + return cfg + }(), + }, + { + args: []string{"--sync.legacy.server", "--sync.legacy.client"}, + network: "testnet", + expConfig: func() syncConfig { + cfg := defaultTestNetSyncConfig + cfg.LegacyClient = true + cfg.LegacyServer = true + return cfg + }(), + }, + { + args: []string{"--sync.downloader", "--sync.concurrency", "10", "--sync.min-peers", "10", + "--sync.init-peers", "10", "--sync.disc.soft-low-cap", "10", + "--sync.disc.hard-low-cap", "10", "--sync.disc.hi-cap", "10", + "--sync.disc.batch", "10", + }, + network: "mainnet", + expConfig: func() syncConfig { + cfg := defaultMainnetSyncConfig + cfg.Downloader = true + cfg.Concurrency = 10 + cfg.MinPeers = 10 + cfg.InitStreams = 10 + cfg.DiscSoftLowCap = 10 + cfg.DiscHardLowCap = 10 + cfg.DiscHighCap = 10 + cfg.DiscBatch = 10 + return cfg + }(), + }, + } + for i, test := range tests { + ts := newFlagTestSuite(t, syncFlags, func(command *cobra.Command, config *harmonyConfig) { + config.Network.NetworkType = test.network + applySyncFlags(command, config) + }) + hc, err := ts.run(test.args) + + if assErr := assertError(err, test.expErr); assErr != nil { + t.Fatalf("Test %v: %v", i, assErr) + } + if err != nil || test.expErr != nil { + continue + } + if !reflect.DeepEqual(hc.Sync, test.expConfig) { + t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.Sync, test.expConfig) + } + ts.tearDown() + } +} + type flagTestSuite struct { t *testing.T diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index c6905d351..388aa8fed 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -16,6 +16,9 @@ import ( "syscall" "time" + "github.com/harmony-one/harmony/api/service/synchronize" + "github.com/harmony-one/harmony/hmy/downloader" + ethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/bls/ffi/go/bls" @@ -204,6 +207,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyConfig) { applyDevnetFlags(cmd, config) applyRevertFlags(cmd, config) applyPrometheusFlags(cmd, config) + applySyncFlags(cmd, config) } func setupNodeLog(config harmonyConfig) { @@ -278,6 +282,7 @@ func setupNodeAndRun(hc harmonyConfig) { currentNode := setupConsensusAndNode(hc, nodeConfig) nodeconfig.GetDefaultConfig().ShardID = nodeConfig.ShardID nodeconfig.GetDefaultConfig().IsOffline = nodeConfig.IsOffline + nodeconfig.GetDefaultConfig().Downloader = nodeConfig.Downloader // Check NTP configuration accurate, err := ntp.CheckLocalTimeAccurate(nodeConfig.NtpServer) @@ -304,12 +309,6 @@ func setupNodeAndRun(hc harmonyConfig) { WSPort: hc.WS.Port, DebugEnabled: hc.RPCOpt.DebugEnabled, } - if nodeConfig.ShardID != shard.BeaconChainShardID { - utils.Logger().Info(). - Uint32("shardID", currentNode.Blockchain().ShardID()). - Uint32("shardID", nodeConfig.ShardID).Msg("SupportBeaconSyncing") - currentNode.SupportBeaconSyncing() - } // Parse rosetta config nodeConfig.RosettaServer = nodeconfig.RosettaServerConfig{ @@ -360,6 +359,9 @@ func setupNodeAndRun(hc harmonyConfig) { nodeconfig.SetPeerID(myHost.GetID()) + // Setup services + setupSyncService(currentNode, myHost, hc) + if currentNode.NodeConfig.Role() == nodeconfig.Validator { currentNode.RegisterValidatorServices() } else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { @@ -369,8 +371,14 @@ func setupNodeAndRun(hc harmonyConfig) { setupPrometheusService(currentNode, hc, nodeConfig.ShardID) } - // TODO: replace this legacy syncing - currentNode.SupportSyncing() + if hc.Sync.LegacyServer && !hc.General.IsOffline { + utils.Logger().Info().Msg("support gRPC sync server") + currentNode.SupportGRPCSyncServer() + } + if hc.Sync.LegacyClient && !hc.General.IsOffline { + utils.Logger().Info().Msg("go with gRPC sync client") + currentNode.StartGRPCSyncClient() + } if err := currentNode.StartServices(); err != nil { fmt.Fprint(os.Stderr, err.Error()) @@ -391,22 +399,24 @@ func setupNodeAndRun(hc harmonyConfig) { go listenOSSigAndShutDown(currentNode) - if err := myHost.Start(); err != nil { - utils.Logger().Fatal(). - Err(err). - Msg("Start p2p host failed") - } + if !hc.General.IsOffline { + if err := myHost.Start(); err != nil { + utils.Logger().Fatal(). + Err(err). + Msg("Start p2p host failed") + } - if err := currentNode.BootstrapConsensus(); err != nil { - fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error()) - if !currentNode.NodeConfig.IsOffline { - os.Exit(-1) + if err := currentNode.BootstrapConsensus(); err != nil { + fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error()) + if !currentNode.NodeConfig.IsOffline { + os.Exit(-1) + } } - } - if err := currentNode.StartPubSub(); err != nil { - fmt.Fprint(os.Stderr, "could not begin network message handling for node", err.Error()) - os.Exit(-1) + if err := currentNode.StartPubSub(); err != nil { + fmt.Fprint(os.Stderr, "could not begin network message handling for node", err.Error()) + os.Exit(-1) + } } select {} @@ -529,6 +539,7 @@ func createGlobalConfig(hc harmonyConfig) (*nodeconfig.ConfigType, error) { nodeConfig.SetShardID(initialAccounts[0].ShardID) // sets shard ID nodeConfig.SetArchival(hc.General.IsBeaconArchival, hc.General.IsArchival) nodeConfig.IsOffline = hc.General.IsOffline + nodeConfig.Downloader = hc.Sync.Downloader // P2P private key is used for secure message transfer between p2p nodes. nodeConfig.P2PPriKey, _, err = utils.LoadKeyFromFile(hc.P2P.KeyFile) @@ -699,6 +710,39 @@ func setupPrometheusService(node *node.Node, hc harmonyConfig, sid uint32) { node.RegisterService(service.Prometheus, p) } +func setupSyncService(node *node.Node, host p2p.Host, hc harmonyConfig) { + blockchains := []*core.BlockChain{node.Blockchain()} + if !node.IsRunningBeaconChain() { + blockchains = append(blockchains, node.Beaconchain()) + } + + dConfig := downloader.Config{ + ServerOnly: !hc.Sync.Downloader, + Network: nodeconfig.NetworkType(hc.Network.NetworkType), + Concurrency: hc.Sync.Concurrency, + MinStreams: hc.Sync.MinPeers, + InitStreams: hc.Sync.InitStreams, + SmSoftLowCap: hc.Sync.DiscSoftLowCap, + SmHardLowCap: hc.Sync.DiscHardLowCap, + SmHiCap: hc.Sync.DiscHighCap, + SmDiscBatch: hc.Sync.DiscBatch, + } + // If we are running side chain, we will need to do some extra works for beacon + // sync + if !node.IsRunningBeaconChain() { + dConfig.BHConfig = &downloader.BeaconHelperConfig{ + BlockC: node.BeaconBlockChannel, + InsertHook: node.BeaconSyncHook, + } + } + s := synchronize.NewService(host, blockchains, dConfig) + + node.RegisterService(service.Synchronize, s) + + d := s.Downloaders.GetShardDownloader(node.Blockchain().ShardID()) + node.Consensus.SetDownloader(d) +} + func setupBlacklist(hc harmonyConfig) (map[ethCommon.Address]struct{}, error) { utils.Logger().Debug().Msgf("Using blacklist file at `%s`", hc.TxPool.BlacklistFile) dat, err := ioutil.ReadFile(hc.TxPool.BlacklistFile) diff --git a/hmy/downloader/beaconhelper.go b/hmy/downloader/beaconhelper.go index 4b0e643e6..9d689534c 100644 --- a/hmy/downloader/beaconhelper.go +++ b/hmy/downloader/beaconhelper.go @@ -120,7 +120,9 @@ func (bh *beaconHelper) insertLastMileBlocks() (inserted int, bn uint64, err err bn-- return } - if err = bh.ih.verifyAndInsertBlock(b); err != nil { + // TODO: Instruct the beacon helper to verify signatures. This may require some forks + // in pub-sub message (add commit sigs in node.block.sync messages) + if _, err = bh.bc.InsertChain(types.Blocks{b}, true); err != nil { bn-- return } diff --git a/hmy/downloader/const.go b/hmy/downloader/const.go index c7246c64c..06da5c742 100644 --- a/hmy/downloader/const.go +++ b/hmy/downloader/const.go @@ -25,6 +25,10 @@ const ( type ( // Config is the downloader config Config struct { + // Only run stream sync protocol as a server. + // TODO: remove this when stream sync is fully up. + ServerOnly bool + // parameters Network nodeconfig.NetworkType Concurrency int // Number of concurrent sync requests diff --git a/hmy/downloader/downloader.go b/hmy/downloader/downloader.go index a4e8d23f3..de967a61a 100644 --- a/hmy/downloader/downloader.go +++ b/hmy/downloader/downloader.go @@ -82,6 +82,10 @@ func NewDownloader(host p2p.Host, bc *core.BlockChain, config Config) *Downloade // Start start the downloader func (d *Downloader) Start() { + if d.config.ServerOnly { + return + } + go d.run() if d.bh != nil { @@ -91,6 +95,10 @@ func (d *Downloader) Start() { // Close close the downloader func (d *Downloader) Close() { + if d.config.ServerOnly { + return + } + close(d.closeC) d.cancel() diff --git a/hmy/downloader/shortrange.go b/hmy/downloader/shortrange.go index 2c0e52e8c..f898f20fd 100644 --- a/hmy/downloader/shortrange.go +++ b/hmy/downloader/shortrange.go @@ -15,6 +15,8 @@ import ( "github.com/rs/zerolog" ) +var emptySigVerifyError *sigVerifyError + // doShortRangeSync does the short range sync. // Compared with long range sync, short range sync is more focused on syncing to the latest block. // It consist of 3 steps: @@ -56,7 +58,7 @@ func (d *Downloader) doShortRangeSync() (int, error) { } n, err := d.ih.verifyAndInsertBlocks(blocks) if err != nil { - if !errors.As(err, &sigVerifyError{}) { + if !errors.As(err, &emptySigVerifyError) { sh.removeStreams(whitelist) // Data provided by remote nodes is corrupted } return n, errors.Wrap(err, "InsertChain") diff --git a/hmy/hmy.go b/hmy/hmy.go index 8962c51b5..d2f4c992b 100644 --- a/hmy/hmy.go +++ b/hmy/hmy.go @@ -87,8 +87,9 @@ type NodeAPI interface { GetTransactionsCount(address, txType string) (uint64, error) GetStakingTransactionsCount(address, txType string) (uint64, error) IsCurrentlyLeader() bool - IsOutOfSync(*core.BlockChain) bool - GetMaxPeerHeight() uint64 + IsOutOfSync(shardID uint32) bool + SyncStatus(shardID uint32) (bool, uint64) + SyncPeers() map[string]int ReportStakingErrorSink() types.TransactionErrorReports ReportPlainErrorSink() types.TransactionErrorReports PendingCXReceipts() []*types.CXReceiptsProof @@ -186,6 +187,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata { c := commonRPC.C{} c.TotalKnownPeers, c.Connected, c.NotConnected = hmy.NodeAPI.PeerConnectivity() + syncPeers := hmy.NodeAPI.SyncPeers() consensusInternal := hmy.NodeAPI.GetConsensusInternal() return commonRPC.NodeMetadata{ @@ -204,6 +206,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata { PeerID: nodeconfig.GetPeerID(), Consensus: consensusInternal, C: c, + SyncPeers: syncPeers, } } diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index bebc515b6..4897c6e43 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -80,6 +80,7 @@ type ConfigType struct { RPCServer RPCServerConfig // RPC server port and ip RosettaServer RosettaServerConfig // rosetta server port and ip IsOffline bool + Downloader bool // Whether stream downloader is running; TODO: remove this after sync up NtpServer string StringRole string P2PPriKey p2p_crypto.PrivKey diff --git a/node/double_signing.go b/node/double_signing.go index a8c40eda8..dd9787941 100644 --- a/node/double_signing.go +++ b/node/double_signing.go @@ -3,13 +3,12 @@ package node import ( "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/staking/slash" ) // ProcessSlashCandidateMessage .. func (node *Node) processSlashCandidateMessage(msgPayload []byte) { - if node.NodeConfig.ShardID != shard.BeaconChainShardID { + if !node.IsRunningBeaconChain() { return } candidates := slash.Records{} diff --git a/node/node.go b/node/node.go index bb479ebf4..2de5dc463 100644 --- a/node/node.go +++ b/node/node.go @@ -217,7 +217,7 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) []error { // Add new staking transactions to the pending staking transaction list. func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) []error { - if node.NodeConfig.ShardID == shard.BeaconChainShardID { + if node.IsRunningBeaconChain() { if node.Blockchain().Config().IsPreStaking(node.Blockchain().CurrentHeader().Epoch()) { poolTxs := types.PoolTransactions{} for _, tx := range newStakingTxs { @@ -245,7 +245,7 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra func (node *Node) AddPendingStakingTransaction( newStakingTx *staking.StakingTransaction, ) error { - if node.NodeConfig.ShardID == shard.BeaconChainShardID { + if node.IsRunningBeaconChain() { errs := node.addPendingStakingTransactions(staking.StakingTransactions{newStakingTx}) var err error for i := range errs { @@ -404,7 +404,7 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) ( case proto_node.SlashCandidate: nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "slash"}).Inc() // only beacon chain node process slash candidate messages - if node.NodeConfig.ShardID != shard.BeaconChainShardID { + if !node.IsRunningBeaconChain() { return nil, 0, errIgnoreBeaconMsg } case proto_node.Receipt: @@ -412,7 +412,7 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) ( case proto_node.CrossLink: nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "crosslink"}).Inc() // only beacon chain node process crosslink messages - if node.NodeConfig.ShardID != shard.BeaconChainShardID || + if !node.IsRunningBeaconChain() || node.NodeConfig.Role() == nodeconfig.ExplorerNode { return nil, 0, errIgnoreBeaconMsg } @@ -1016,7 +1016,7 @@ func New( go func() { webhooks.DoPost(url, &doubleSign) }() } } - if node.NodeConfig.ShardID != shard.BeaconChainShardID { + if !node.IsRunningBeaconChain() { go node.BroadcastSlash(&doubleSign) } else { records := slash.Records{doubleSign} @@ -1282,3 +1282,8 @@ func (node *Node) GetAddresses(epoch *big.Int) map[string]common.Address { // self addresses map can never be nil return node.KeysToAddrs } + +// IsRunningBeaconChain returns whether the node is running on beacon chain. +func (node *Node) IsRunningBeaconChain() bool { + return node.NodeConfig.ShardID == shard.BeaconChainShardID +} diff --git a/node/node_cross_link.go b/node/node_cross_link.go index f951807ce..0d52ccd46 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -61,7 +61,7 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error { // ProcessCrossLinkMessage verify and process Node/CrossLink message into crosslink when it's valid func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { - if node.NodeConfig.ShardID == shard.BeaconChainShardID { + if node.IsRunningBeaconChain() { pendingCLs, err := node.Blockchain().ReadPendingCrossLinks() if err == nil && len(pendingCLs) >= maxPendingCrossLinkSize { utils.Logger().Debug(). diff --git a/node/node_handler.go b/node/node_handler.go index 8a86a8fa9..3e02a6b94 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -171,7 +171,7 @@ func (node *Node) BroadcastCrossLink() { return } - if node.NodeConfig.ShardID == shard.BeaconChainShardID || + if node.IsRunningBeaconChain() || !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { // no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch return @@ -308,7 +308,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error { // Verify cross links // TODO: move into ValidateNewBlock - if node.NodeConfig.ShardID == shard.BeaconChainShardID { + if node.IsRunningBeaconChain() { err := node.VerifyBlockCrossLinks(newBlock) if err != nil { utils.Logger().Debug().Err(err).Msg("ops2 VerifyBlockCrossLinks Failed") @@ -336,7 +336,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error { // 3. [leader] send cross shard tx receipts to destination shard func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { if node.Consensus.IsLeader() { - if node.NodeConfig.ShardID == shard.BeaconChainShardID { + if node.IsRunningBeaconChain() { node.BroadcastNewBlock(newBlock) } node.BroadcastCXReceipts(newBlock) @@ -360,7 +360,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { rnd := rand.Intn(100) if rnd < 1 { // Beacon validators also broadcast new blocks to make sure beacon sync is strong. - if node.NodeConfig.ShardID == shard.BeaconChainShardID { + if node.IsRunningBeaconChain() { node.BroadcastNewBlock(newBlock) } node.BroadcastCXReceipts(newBlock) diff --git a/node/node_syncing.go b/node/node_syncing.go index 75ace9333..2f7e32cc8 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -9,17 +9,22 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" + lru "github.com/hashicorp/golang-lru" + "github.com/pkg/errors" + + "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/legacysync" - "github.com/harmony-one/harmony/api/service/legacysync/downloader" + legdownloader "github.com/harmony-one/harmony/api/service/legacysync/downloader" downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto" + "github.com/harmony-one/harmony/api/service/synchronize" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/hmy/downloader" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" - lru "github.com/hashicorp/golang-lru" - "github.com/pkg/errors" + "github.com/harmony-one/harmony/shard" ) // Constants related to doing syncing. @@ -33,6 +38,16 @@ func init() { rand.Seed(time.Now().UnixNano()) } +// BeaconSyncHook is the hook function called after inserted beacon in downloader +// TODO: This is a small misc piece of consensus logic. Better put it to consensus module. +func (node *Node) BeaconSyncHook() { + if node.Consensus.IsLeader() { + // TODO: Instead of leader, it would better be validator do this broadcast since leader do + // not have much idle resources. + node.BroadcastCrossLink() + } +} + // GenerateRandomString generates a random string with given length func GenerateRandomString(n int) string { b := make([]rune, n) @@ -176,28 +191,31 @@ func (p *LocalSyncingPeerProvider) SyncingPeers(shardID uint32) (peers []p2p.Pee return peers, nil } -// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks -func (node *Node) DoBeaconSyncing() { +// doBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks +func (node *Node) doBeaconSyncing() { if node.NodeConfig.IsOffline { return } - go func(node *Node) { - // TODO ek – infinite loop; add shutdown/cleanup logic - for beaconBlock := range node.BeaconBlockChannel { - if node.beaconSync != nil { - err := node.beaconSync.UpdateBlockAndStatus( - beaconBlock, node.Beaconchain(), node.BeaconWorker, true, - ) - if err != nil { - node.beaconSync.AddLastMileBlock(beaconBlock) - } else if node.Consensus.IsLeader() { - // Only leader broadcast crosslink to avoid spamming p2p - node.BroadcastCrossLink() + if !node.NodeConfig.Downloader { + // If Downloader is not working, we need also deal with blocks from beaconBlockChannel + go func(node *Node) { + // TODO ek – infinite loop; add shutdown/cleanup logic + for beaconBlock := range node.BeaconBlockChannel { + if node.beaconSync != nil { + err := node.beaconSync.UpdateBlockAndStatus( + beaconBlock, node.Beaconchain(), node.BeaconWorker, true, + ) + if err != nil { + node.beaconSync.AddLastMileBlock(beaconBlock) + } else if node.Consensus.IsLeader() { + // Only leader broadcast crosslink to avoid spamming p2p + node.BroadcastCrossLink() + } } } - } - }(node) + }(node) + } // TODO ek – infinite loop; add shutdown/cleanup logic for { @@ -279,16 +297,25 @@ func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinCon node.IsInSync.Set() } -// SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node -func (node *Node) SupportBeaconSyncing() { - go node.DoBeaconSyncing() -} - -// SupportSyncing keeps sleeping until it's doing consensus or it's a leader. -func (node *Node) SupportSyncing() { +// SupportGRPCSyncServer do gRPC sync server +func (node *Node) SupportGRPCSyncServer() { node.InitSyncingServer() node.StartSyncingServer() +} +// StartGRPCSyncClient start the legacy gRPC sync process +func (node *Node) StartGRPCSyncClient() { + if node.Blockchain().ShardID() != shard.BeaconChainShardID { + utils.Logger().Info(). + Uint32("shardID", node.Blockchain().ShardID()). + Msg("SupportBeaconSyncing") + go node.doBeaconSyncing() + } + node.supportSyncing() +} + +// supportSyncing keeps sleeping until it's doing consensus or it's a leader. +func (node *Node) supportSyncing() { joinConsensus := false // Check if the current node is explorer node. switch node.NodeConfig.Role() { @@ -313,7 +340,7 @@ func (node *Node) SupportSyncing() { // InitSyncingServer starts downloader server. func (node *Node) InitSyncingServer() { if node.downloaderServer == nil { - node.downloaderServer = downloader.NewServer(node) + node.downloaderServer = legdownloader.NewServer(node) } } @@ -466,7 +493,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in } else { response.Type = downloader_pb.DownloaderResponse_FAIL syncPort := legacysync.GetSyncingPort(port) - client := downloader.ClientSetup(ip, syncPort) + client := legdownloader.ClientSetup(ip, syncPort) if client == nil { utils.Logger().Warn(). Str("ip", ip). @@ -540,12 +567,49 @@ func (node *Node) getEncodedBlockByHash(hash common.Hash) ([]byte, error) { return b, nil } -// GetMaxPeerHeight ... -func (node *Node) GetMaxPeerHeight() uint64 { - return node.stateSync.GetMaxPeerHeight() +// SyncStatus return the syncing status, including whether node is syncing +// and the target block number. +func (node *Node) SyncStatus(shardID uint32) (bool, uint64) { + ds := node.getDownloaders() + if ds == nil { + return false, 0 + } + return ds.SyncStatus(shardID) +} + +// IsOutOfSync return whether the node is out of sync of the given hsardID +func (node *Node) IsOutOfSync(shardID uint32) bool { + ds := node.getDownloaders() + if ds == nil { + return false + } + isSyncing, _ := ds.SyncStatus(shardID) + return !isSyncing } -// IsOutOfSync ... -func (node *Node) IsOutOfSync(bc *core.BlockChain) bool { - return node.stateSync.IsOutOfSync(bc, false) +// SyncPeers return connected sync peers for each shard +func (node *Node) SyncPeers() map[string]int { + ds := node.getDownloaders() + if ds == nil { + return nil + } + nums := ds.NumPeers() + res := make(map[string]int) + for sid, num := range nums { + s := fmt.Sprintf("shard-%v", sid) + res[s] = num + } + return res +} + +func (node *Node) getDownloaders() *downloader.Downloaders { + syncService := node.serviceManager.GetService(service.Synchronize) + if syncService == nil { + return nil + } + dsService, ok := syncService.(*synchronize.Service) + if !ok { + return nil + } + return dsService.Downloaders } diff --git a/p2p/host.go b/p2p/host.go index ba8b1ad77..020f45969 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -10,6 +10,8 @@ import ( "strings" "sync" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/harmony-one/bls/ffi/go/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" @@ -230,8 +232,9 @@ func (host *HostV2) C() (int, int, int) { // AddStreamProtocol adds the stream protocols to the host to be started and closed // when the host starts or close func (host *HostV2) AddStreamProtocol(protocols ...sttypes.Protocol) { - for _, protocol := range protocols { - host.streamProtos = append(host.streamProtos, protocol) + for _, proto := range protocols { + host.streamProtos = append(host.streamProtos, proto) + host.h.SetStreamHandlerMatch(protocol.ID(proto.ProtoID()), proto.Match, proto.HandleStream) } } diff --git a/rosetta/services/network.go b/rosetta/services/network.go index 707feb24a..2b4d5709a 100644 --- a/rosetta/services/network.go +++ b/rosetta/services/network.go @@ -82,12 +82,12 @@ func (s *NetworkAPI) NetworkStatus( if rosettaError != nil { return nil, rosettaError } - targetHeight := int64(s.hmy.NodeAPI.GetMaxPeerHeight()) + isSyncing, targetHeight := s.hmy.NodeAPI.SyncStatus(s.hmy.BlockChain.ShardID()) syncStatus := common.SyncingFinish - if s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain) { - syncStatus = common.SyncingNewBlock - } else if targetHeight == 0 { + if targetHeight == 0 { syncStatus = common.SyncingUnknown + } else if isSyncing { + syncStatus = common.SyncingNewBlock } stage := syncStatus.String() @@ -116,6 +116,13 @@ func (s *NetworkAPI) NetworkStatus( } } + targetInt := int64(targetHeight) + ss := &types.SyncStatus{ + CurrentIndex: currentHeader.Number().Int64(), + TargetIndex: &targetInt, + Stage: &stage, + } + return &types.NetworkStatusResponse{ CurrentBlockIdentifier: currentBlockIdentifier, OldestBlockIdentifier: oldestBlockIdentifier, @@ -124,12 +131,8 @@ func (s *NetworkAPI) NetworkStatus( Index: genesisHeader.Number().Int64(), Hash: genesisHeader.Hash().String(), }, - Peers: peers, - SyncStatus: &types.SyncStatus{ - CurrentIndex: currentHeader.Number().Int64(), - TargetIndex: &targetHeight, - Stage: &stage, - }, + Peers: peers, + SyncStatus: ss, }, nil } diff --git a/rpc/blockchain.go b/rpc/blockchain.go index 88032e41d..b27667165 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -685,12 +685,12 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo( // InSync returns if shard chain is syncing func (s *PublicBlockchainService) InSync(ctx context.Context) (bool, error) { - return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain), nil + return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain.ShardID()), nil } // BeaconInSync returns if beacon chain is syncing func (s *PublicBlockchainService) BeaconInSync(ctx context.Context) (bool, error) { - return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain), nil + return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain.ShardID()), nil } func isBlockGreaterThanLatest(hmy *hmy.Harmony, blockNum rpc.BlockNumber) bool { diff --git a/rpc/common/types.go b/rpc/common/types.go index 69e418d02..253b6bd14 100644 --- a/rpc/common/types.go +++ b/rpc/common/types.go @@ -64,6 +64,7 @@ type NodeMetadata struct { PeerID peer.ID `json:"peerid"` Consensus ConsensusInternal `json:"consensus"` C C `json:"p2p-connectivity"` + SyncPeers map[string]int `json:"sync-peers",omitempty` } // P captures the connected peers per topic