Merge pull request #3595 from JackyWYX/stream_downloaderflag

[Stream] Stream downloader flag
pull/3598/head
Rongjian Lan 4 years ago committed by GitHub
commit b4b4984eb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      api/service/manager.go
  2. 37
      api/service/synchronize/service.go
  3. 31
      cmd/harmony/config.go
  4. 25
      cmd/harmony/config_test.go
  5. 45
      cmd/harmony/default.go
  6. 119
      cmd/harmony/flags.go
  7. 74
      cmd/harmony/flags_test.go
  8. 86
      cmd/harmony/main.go
  9. 4
      hmy/downloader/beaconhelper.go
  10. 4
      hmy/downloader/const.go
  11. 8
      hmy/downloader/downloader.go
  12. 4
      hmy/downloader/shortrange.go
  13. 7
      hmy/hmy.go
  14. 1
      internal/configs/node/config.go
  15. 3
      node/double_signing.go
  16. 15
      node/node.go
  17. 2
      node/node_cross_link.go
  18. 8
      node/node_handler.go
  19. 132
      node/node_syncing.go
  20. 7
      p2p/host.go
  21. 23
      rosetta/services/network.go
  22. 4
      rpc/blockchain.go
  23. 1
      rpc/common/types.go

@ -21,6 +21,7 @@ const (
BlockProposal
NetworkInfo
Prometheus
Synchronize
)
func (t Type) String() string {
@ -37,6 +38,8 @@ func (t Type) String() string {
return "NetworkInfo"
case Prometheus:
return "Prometheus"
case Synchronize:
return "Synchronize"
default:
return "Unknown"
}
@ -82,6 +85,11 @@ func (m *Manager) GetServices() []Service {
return m.services
}
// GetService get the specified service
func (m *Manager) GetService(t Type) Service {
return m.serviceMap[t]
}
// StartServices run all registered services. If one of the starting service returns
// an error, closing all started services.
func (m *Manager) StartServices() (err error) {

@ -0,0 +1,37 @@
package synchronize
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/hmy/downloader"
"github.com/harmony-one/harmony/p2p"
)
// Service is simply a adapter of Downloaders, which support block synchronization
type Service struct {
Downloaders *downloader.Downloaders
}
// NewService creates the a new downloader service
func NewService(host p2p.Host, bcs []*core.BlockChain, config downloader.Config) *Service {
return &Service{
Downloaders: downloader.NewDownloaders(host, bcs, config),
}
}
// Start start the service
func (s *Service) Start() error {
s.Downloaders.Start()
return nil
}
// Stop stop the service
func (s *Service) Stop() error {
s.Downloaders.Close()
return nil
}
// APIs return all APIs of the service
func (s *Service) APIs() []rpc.API {
return nil
}

@ -28,6 +28,7 @@ type harmonyConfig struct {
TxPool txPoolConfig
Pprof pprofConfig
Log logConfig
Sync syncConfig
Sys *sysConfig `toml:",omitempty"`
Consensus *consensusConfig `toml:",omitempty"`
Devnet *devnetConfig `toml:",omitempty"`
@ -152,6 +153,20 @@ type prometheusConfig struct {
Gateway string
}
type syncConfig struct {
// TODO: Remove this bool after stream sync is fully up.
Downloader bool // start the sync downloader client
LegacyServer bool // provide the gRPC sync protocol server
LegacyClient bool // aside from stream sync protocol, also run gRPC client to get blocks
Concurrency int // concurrency used for stream sync protocol
MinPeers int // minimum streams to start a sync task.
InitStreams int // minimum streams in bootstrap to start sync loop.
DiscSoftLowCap int // when number of streams is below this value, spin discover during check
DiscHardLowCap int // when removing stream, num is below this value, spin discovery immediately
DiscHighCap int // upper limit of streams in one sync protocol
DiscBatch int // size of each discovery
}
// TODO: use specific type wise validation instead of general string types assertion.
func validateHarmonyConfig(config harmonyConfig) error {
var accepts []string
@ -188,6 +203,11 @@ func validateHarmonyConfig(config harmonyConfig) error {
return fmt.Errorf("flag --run.offline must have p2p IP be %v", nodeconfig.DefaultLocalListenIP)
}
if !config.Sync.Downloader && !config.Sync.LegacyClient {
// There is no module up for sync
return errors.New("either --sync.downloader or --sync.legacy.client shall be enabled")
}
return nil
}
@ -235,6 +255,17 @@ func parseNetworkType(nt string) nodeconfig.NetworkType {
}
}
func getDefaultSyncConfig(nt nodeconfig.NetworkType) syncConfig {
switch nt {
case nodeconfig.Mainnet:
return defaultMainnetSyncConfig
case nodeconfig.Testnet:
return defaultTestNetSyncConfig
default:
return defaultElseSyncConfig
}
}
var dumpConfigCmd = &cobra.Command{
Use: "dumpconfig [config_file]",
Short: "dump the config file for harmony binary configurations",

@ -30,7 +30,7 @@ func init() {
}
func TestV1_0_0Config(t *testing.T) {
testConfig := `Version = "1.0.3"
testConfig := `Version = "1.0.4"
[BLSKeys]
KMSConfigFile = ""
@ -80,6 +80,18 @@ func TestV1_0_0Config(t *testing.T) {
[TxPool]
BlacklistFile = "./.hmy/blacklist.txt"
[Sync]
Downloader = false
Concurrency = 6
DiscBatch = 8
DiscHardLowCap = 6
DiscHighCap = 128
DiscSoftLowCap = 8
InitStreams = 8
LegacyClient = true
LegacyServer = true
MinPeers = 6
[WS]
Enabled = true
IP = "127.0.0.1"
@ -96,20 +108,21 @@ func TestV1_0_0Config(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defConf := getDefaultHmyConfigCopy(nodeconfig.Mainnet)
if config.HTTP.RosettaEnabled {
t.Errorf("Expected rosetta http server to be disabled when loading old config")
}
if config.General.IsOffline {
t.Errorf("Expect node to de online when loading old config")
}
if config.P2P.IP != defaultConfig.P2P.IP {
if config.P2P.IP != defConf.P2P.IP {
t.Errorf("Expect default p2p IP if old config is provided")
}
if config.Version != "1.0.3" {
t.Errorf("Expected config version: 1.0.3, not %v", config.Version)
if config.Version != "1.0.4" {
t.Errorf("Expected config version: 1.0.4, not %v", config.Version)
}
config.Version = defaultConfig.Version // Shortcut for testing, value checked above
if !reflect.DeepEqual(config, defaultConfig) {
config.Version = defConf.Version // Shortcut for testing, value checked above
if !reflect.DeepEqual(config, defConf) {
t.Errorf("Unexpected config \n\t%+v \n\t%+v", config, defaultConfig)
}
}

@ -2,7 +2,7 @@ package main
import nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
const tomlConfigVersion = "1.0.3"
const tomlConfigVersion = "1.0.4"
const (
defNetworkType = nodeconfig.Mainnet
@ -102,6 +102,47 @@ var defaultPrometheusConfig = prometheusConfig{
Gateway: "https://gateway.harmony.one",
}
var (
defaultMainnetSyncConfig = syncConfig{
Downloader: false,
LegacyServer: true,
LegacyClient: true,
Concurrency: 6,
MinPeers: 6,
InitStreams: 8,
DiscSoftLowCap: 8,
DiscHardLowCap: 6,
DiscHighCap: 128,
DiscBatch: 8,
}
defaultTestNetSyncConfig = syncConfig{
Downloader: false,
LegacyServer: true,
LegacyClient: true,
Concurrency: 4,
MinPeers: 4,
InitStreams: 4,
DiscSoftLowCap: 4,
DiscHardLowCap: 4,
DiscHighCap: 1024,
DiscBatch: 8,
}
defaultElseSyncConfig = syncConfig{
Downloader: true,
LegacyServer: true,
LegacyClient: false,
Concurrency: 4,
MinPeers: 4,
InitStreams: 4,
DiscSoftLowCap: 4,
DiscHardLowCap: 4,
DiscHighCap: 1024,
DiscBatch: 8,
}
)
const (
defaultBroadcastInvalidTx = true
)
@ -114,6 +155,8 @@ func getDefaultHmyConfigCopy(nt nodeconfig.NetworkType) harmonyConfig {
devnet := getDefaultDevnetConfigCopy()
config.Devnet = &devnet
}
config.Sync = getDefaultSyncConfig(nt)
return config
}

@ -175,6 +175,19 @@ var (
prometheusGatewayFlag,
prometheusEnablePushFlag,
}
syncFlags = []cli.Flag{
syncDownloaderFlag,
syncLegacyClientFlag,
syncLegacyServerFlag,
syncConcurrencyFlag,
syncMinPeersFlag,
syncInitStreamsFlag,
syncDiscSoftLowFlag,
syncDiscHardLowFlag,
syncDiscHighFlag,
syncDiscBatchFlag,
}
)
var (
@ -267,6 +280,7 @@ func getRootFlags() []cli.Flag {
flags = append(flags, revertFlags...)
flags = append(flags, legacyMiscFlags...)
flags = append(flags, prometheusFlags...)
flags = append(flags, syncFlags...)
return flags
}
@ -1269,3 +1283,108 @@ func applyPrometheusFlags(cmd *cobra.Command, config *harmonyConfig) {
config.Prometheus.EnablePush = cli.GetBoolFlagValue(cmd, prometheusEnablePushFlag)
}
}
var (
// TODO: Deprecate this flag, and always set to true after stream sync is fully up.
syncDownloaderFlag = cli.BoolFlag{
Name: "sync.downloader",
Usage: "Enable the downloader module to sync through stream sync protocol",
Hidden: true,
DefValue: false,
}
syncLegacyServerFlag = cli.BoolFlag{
Name: "sync.legacy.server",
Usage: "Enable the gRPC sync server for backward compatibility",
Hidden: true,
DefValue: true,
}
syncLegacyClientFlag = cli.BoolFlag{
Name: "sync.legacy.client",
Usage: "Enable the legacy centralized sync service for block synchronization",
Hidden: true,
DefValue: false,
}
syncConcurrencyFlag = cli.IntFlag{
Name: "sync.concurrency",
Usage: "Concurrency when doing p2p sync requests",
Hidden: true,
}
syncMinPeersFlag = cli.IntFlag{
Name: "sync.min-peers",
Usage: "Minimum peers check for each shard-wise sync loop",
Hidden: true,
}
syncInitStreamsFlag = cli.IntFlag{
Name: "sync.init-peers",
Usage: "Initial shard-wise number of peers to start syncing",
Hidden: true,
}
syncDiscSoftLowFlag = cli.IntFlag{
Name: "sync.disc.soft-low-cap",
Usage: "Soft low cap for sync stream management",
Hidden: true,
}
syncDiscHardLowFlag = cli.IntFlag{
Name: "sync.disc.hard-low-cap",
Usage: "Hard low cap for sync stream management",
Hidden: true,
}
syncDiscHighFlag = cli.IntFlag{
Name: "sync.disc.hi-cap",
Usage: "High cap for sync stream management",
Hidden: true,
}
syncDiscBatchFlag = cli.IntFlag{
Name: "sync.disc.batch",
Usage: "batch size of the sync discovery",
Hidden: true,
}
)
// applySyncFlags apply the sync flags.
func applySyncFlags(cmd *cobra.Command, config *harmonyConfig) {
if config.Sync == (syncConfig{}) {
nt := nodeconfig.NetworkType(config.Network.NetworkType)
config.Sync = getDefaultSyncConfig(nt)
}
if cli.IsFlagChanged(cmd, syncDownloaderFlag) {
config.Sync.Downloader = cli.GetBoolFlagValue(cmd, syncDownloaderFlag)
}
if cli.IsFlagChanged(cmd, syncLegacyServerFlag) {
config.Sync.LegacyServer = cli.GetBoolFlagValue(cmd, syncLegacyServerFlag)
}
if cli.IsFlagChanged(cmd, syncLegacyClientFlag) {
config.Sync.LegacyClient = cli.GetBoolFlagValue(cmd, syncLegacyClientFlag)
}
if cli.IsFlagChanged(cmd, syncConcurrencyFlag) {
config.Sync.Concurrency = cli.GetIntFlagValue(cmd, syncConcurrencyFlag)
}
if cli.IsFlagChanged(cmd, syncMinPeersFlag) {
config.Sync.MinPeers = cli.GetIntFlagValue(cmd, syncMinPeersFlag)
}
if cli.IsFlagChanged(cmd, syncInitStreamsFlag) {
config.Sync.InitStreams = cli.GetIntFlagValue(cmd, syncInitStreamsFlag)
}
if cli.IsFlagChanged(cmd, syncDiscSoftLowFlag) {
config.Sync.DiscSoftLowCap = cli.GetIntFlagValue(cmd, syncDiscSoftLowFlag)
}
if cli.IsFlagChanged(cmd, syncDiscHardLowFlag) {
config.Sync.DiscHardLowCap = cli.GetIntFlagValue(cmd, syncDiscHardLowFlag)
}
if cli.IsFlagChanged(cmd, syncDiscHighFlag) {
config.Sync.DiscHighCap = cli.GetIntFlagValue(cmd, syncDiscHighFlag)
}
if cli.IsFlagChanged(cmd, syncDiscBatchFlag) {
config.Sync.DiscBatch = cli.GetIntFlagValue(cmd, syncDiscBatchFlag)
}
}

@ -112,6 +112,7 @@ func TestHarmonyFlags(t *testing.T) {
EnablePush: true,
Gateway: "https://gateway.harmony.one",
},
Sync: defaultMainnetSyncConfig,
},
},
}
@ -947,6 +948,79 @@ func TestRevertFlags(t *testing.T) {
}
}
func TestSyncFlags(t *testing.T) {
tests := []struct {
args []string
network string
expConfig syncConfig
expErr error
}{
{
args: []string{},
network: "mainnet",
expConfig: defaultMainnetSyncConfig,
},
{
args: []string{"--sync.legacy.server", "--sync.legacy.client"},
network: "mainnet",
expConfig: func() syncConfig {
cfg := defaultMainnetSyncConfig
cfg.LegacyClient = true
cfg.LegacyServer = true
return cfg
}(),
},
{
args: []string{"--sync.legacy.server", "--sync.legacy.client"},
network: "testnet",
expConfig: func() syncConfig {
cfg := defaultTestNetSyncConfig
cfg.LegacyClient = true
cfg.LegacyServer = true
return cfg
}(),
},
{
args: []string{"--sync.downloader", "--sync.concurrency", "10", "--sync.min-peers", "10",
"--sync.init-peers", "10", "--sync.disc.soft-low-cap", "10",
"--sync.disc.hard-low-cap", "10", "--sync.disc.hi-cap", "10",
"--sync.disc.batch", "10",
},
network: "mainnet",
expConfig: func() syncConfig {
cfg := defaultMainnetSyncConfig
cfg.Downloader = true
cfg.Concurrency = 10
cfg.MinPeers = 10
cfg.InitStreams = 10
cfg.DiscSoftLowCap = 10
cfg.DiscHardLowCap = 10
cfg.DiscHighCap = 10
cfg.DiscBatch = 10
return cfg
}(),
},
}
for i, test := range tests {
ts := newFlagTestSuite(t, syncFlags, func(command *cobra.Command, config *harmonyConfig) {
config.Network.NetworkType = test.network
applySyncFlags(command, config)
})
hc, err := ts.run(test.args)
if assErr := assertError(err, test.expErr); assErr != nil {
t.Fatalf("Test %v: %v", i, assErr)
}
if err != nil || test.expErr != nil {
continue
}
if !reflect.DeepEqual(hc.Sync, test.expConfig) {
t.Errorf("Test %v:\n\t%+v\n\t%+v", i, hc.Sync, test.expConfig)
}
ts.tearDown()
}
}
type flagTestSuite struct {
t *testing.T

@ -16,6 +16,9 @@ import (
"syscall"
"time"
"github.com/harmony-one/harmony/api/service/synchronize"
"github.com/harmony-one/harmony/hmy/downloader"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/bls/ffi/go/bls"
@ -204,6 +207,7 @@ func applyRootFlags(cmd *cobra.Command, config *harmonyConfig) {
applyDevnetFlags(cmd, config)
applyRevertFlags(cmd, config)
applyPrometheusFlags(cmd, config)
applySyncFlags(cmd, config)
}
func setupNodeLog(config harmonyConfig) {
@ -278,6 +282,7 @@ func setupNodeAndRun(hc harmonyConfig) {
currentNode := setupConsensusAndNode(hc, nodeConfig)
nodeconfig.GetDefaultConfig().ShardID = nodeConfig.ShardID
nodeconfig.GetDefaultConfig().IsOffline = nodeConfig.IsOffline
nodeconfig.GetDefaultConfig().Downloader = nodeConfig.Downloader
// Check NTP configuration
accurate, err := ntp.CheckLocalTimeAccurate(nodeConfig.NtpServer)
@ -304,12 +309,6 @@ func setupNodeAndRun(hc harmonyConfig) {
WSPort: hc.WS.Port,
DebugEnabled: hc.RPCOpt.DebugEnabled,
}
if nodeConfig.ShardID != shard.BeaconChainShardID {
utils.Logger().Info().
Uint32("shardID", currentNode.Blockchain().ShardID()).
Uint32("shardID", nodeConfig.ShardID).Msg("SupportBeaconSyncing")
currentNode.SupportBeaconSyncing()
}
// Parse rosetta config
nodeConfig.RosettaServer = nodeconfig.RosettaServerConfig{
@ -360,6 +359,9 @@ func setupNodeAndRun(hc harmonyConfig) {
nodeconfig.SetPeerID(myHost.GetID())
// Setup services
setupSyncService(currentNode, myHost, hc)
if currentNode.NodeConfig.Role() == nodeconfig.Validator {
currentNode.RegisterValidatorServices()
} else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode {
@ -369,8 +371,14 @@ func setupNodeAndRun(hc harmonyConfig) {
setupPrometheusService(currentNode, hc, nodeConfig.ShardID)
}
// TODO: replace this legacy syncing
currentNode.SupportSyncing()
if hc.Sync.LegacyServer && !hc.General.IsOffline {
utils.Logger().Info().Msg("support gRPC sync server")
currentNode.SupportGRPCSyncServer()
}
if hc.Sync.LegacyClient && !hc.General.IsOffline {
utils.Logger().Info().Msg("go with gRPC sync client")
currentNode.StartGRPCSyncClient()
}
if err := currentNode.StartServices(); err != nil {
fmt.Fprint(os.Stderr, err.Error())
@ -391,22 +399,24 @@ func setupNodeAndRun(hc harmonyConfig) {
go listenOSSigAndShutDown(currentNode)
if err := myHost.Start(); err != nil {
utils.Logger().Fatal().
Err(err).
Msg("Start p2p host failed")
}
if !hc.General.IsOffline {
if err := myHost.Start(); err != nil {
utils.Logger().Fatal().
Err(err).
Msg("Start p2p host failed")
}
if err := currentNode.BootstrapConsensus(); err != nil {
fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error())
if !currentNode.NodeConfig.IsOffline {
os.Exit(-1)
if err := currentNode.BootstrapConsensus(); err != nil {
fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error())
if !currentNode.NodeConfig.IsOffline {
os.Exit(-1)
}
}
}
if err := currentNode.StartPubSub(); err != nil {
fmt.Fprint(os.Stderr, "could not begin network message handling for node", err.Error())
os.Exit(-1)
if err := currentNode.StartPubSub(); err != nil {
fmt.Fprint(os.Stderr, "could not begin network message handling for node", err.Error())
os.Exit(-1)
}
}
select {}
@ -529,6 +539,7 @@ func createGlobalConfig(hc harmonyConfig) (*nodeconfig.ConfigType, error) {
nodeConfig.SetShardID(initialAccounts[0].ShardID) // sets shard ID
nodeConfig.SetArchival(hc.General.IsBeaconArchival, hc.General.IsArchival)
nodeConfig.IsOffline = hc.General.IsOffline
nodeConfig.Downloader = hc.Sync.Downloader
// P2P private key is used for secure message transfer between p2p nodes.
nodeConfig.P2PPriKey, _, err = utils.LoadKeyFromFile(hc.P2P.KeyFile)
@ -699,6 +710,39 @@ func setupPrometheusService(node *node.Node, hc harmonyConfig, sid uint32) {
node.RegisterService(service.Prometheus, p)
}
func setupSyncService(node *node.Node, host p2p.Host, hc harmonyConfig) {
blockchains := []*core.BlockChain{node.Blockchain()}
if !node.IsRunningBeaconChain() {
blockchains = append(blockchains, node.Beaconchain())
}
dConfig := downloader.Config{
ServerOnly: !hc.Sync.Downloader,
Network: nodeconfig.NetworkType(hc.Network.NetworkType),
Concurrency: hc.Sync.Concurrency,
MinStreams: hc.Sync.MinPeers,
InitStreams: hc.Sync.InitStreams,
SmSoftLowCap: hc.Sync.DiscSoftLowCap,
SmHardLowCap: hc.Sync.DiscHardLowCap,
SmHiCap: hc.Sync.DiscHighCap,
SmDiscBatch: hc.Sync.DiscBatch,
}
// If we are running side chain, we will need to do some extra works for beacon
// sync
if !node.IsRunningBeaconChain() {
dConfig.BHConfig = &downloader.BeaconHelperConfig{
BlockC: node.BeaconBlockChannel,
InsertHook: node.BeaconSyncHook,
}
}
s := synchronize.NewService(host, blockchains, dConfig)
node.RegisterService(service.Synchronize, s)
d := s.Downloaders.GetShardDownloader(node.Blockchain().ShardID())
node.Consensus.SetDownloader(d)
}
func setupBlacklist(hc harmonyConfig) (map[ethCommon.Address]struct{}, error) {
utils.Logger().Debug().Msgf("Using blacklist file at `%s`", hc.TxPool.BlacklistFile)
dat, err := ioutil.ReadFile(hc.TxPool.BlacklistFile)

@ -120,7 +120,9 @@ func (bh *beaconHelper) insertLastMileBlocks() (inserted int, bn uint64, err err
bn--
return
}
if err = bh.ih.verifyAndInsertBlock(b); err != nil {
// TODO: Instruct the beacon helper to verify signatures. This may require some forks
// in pub-sub message (add commit sigs in node.block.sync messages)
if _, err = bh.bc.InsertChain(types.Blocks{b}, true); err != nil {
bn--
return
}

@ -25,6 +25,10 @@ const (
type (
// Config is the downloader config
Config struct {
// Only run stream sync protocol as a server.
// TODO: remove this when stream sync is fully up.
ServerOnly bool
// parameters
Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests

@ -82,6 +82,10 @@ func NewDownloader(host p2p.Host, bc *core.BlockChain, config Config) *Downloade
// Start start the downloader
func (d *Downloader) Start() {
if d.config.ServerOnly {
return
}
go d.run()
if d.bh != nil {
@ -91,6 +95,10 @@ func (d *Downloader) Start() {
// Close close the downloader
func (d *Downloader) Close() {
if d.config.ServerOnly {
return
}
close(d.closeC)
d.cancel()

@ -15,6 +15,8 @@ import (
"github.com/rs/zerolog"
)
var emptySigVerifyError *sigVerifyError
// doShortRangeSync does the short range sync.
// Compared with long range sync, short range sync is more focused on syncing to the latest block.
// It consist of 3 steps:
@ -56,7 +58,7 @@ func (d *Downloader) doShortRangeSync() (int, error) {
}
n, err := d.ih.verifyAndInsertBlocks(blocks)
if err != nil {
if !errors.As(err, &sigVerifyError{}) {
if !errors.As(err, &emptySigVerifyError) {
sh.removeStreams(whitelist) // Data provided by remote nodes is corrupted
}
return n, errors.Wrap(err, "InsertChain")

@ -87,8 +87,9 @@ type NodeAPI interface {
GetTransactionsCount(address, txType string) (uint64, error)
GetStakingTransactionsCount(address, txType string) (uint64, error)
IsCurrentlyLeader() bool
IsOutOfSync(*core.BlockChain) bool
GetMaxPeerHeight() uint64
IsOutOfSync(shardID uint32) bool
SyncStatus(shardID uint32) (bool, uint64)
SyncPeers() map[string]int
ReportStakingErrorSink() types.TransactionErrorReports
ReportPlainErrorSink() types.TransactionErrorReports
PendingCXReceipts() []*types.CXReceiptsProof
@ -186,6 +187,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata {
c := commonRPC.C{}
c.TotalKnownPeers, c.Connected, c.NotConnected = hmy.NodeAPI.PeerConnectivity()
syncPeers := hmy.NodeAPI.SyncPeers()
consensusInternal := hmy.NodeAPI.GetConsensusInternal()
return commonRPC.NodeMetadata{
@ -204,6 +206,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata {
PeerID: nodeconfig.GetPeerID(),
Consensus: consensusInternal,
C: c,
SyncPeers: syncPeers,
}
}

@ -80,6 +80,7 @@ type ConfigType struct {
RPCServer RPCServerConfig // RPC server port and ip
RosettaServer RosettaServerConfig // rosetta server port and ip
IsOffline bool
Downloader bool // Whether stream downloader is running; TODO: remove this after sync up
NtpServer string
StringRole string
P2PPriKey p2p_crypto.PrivKey

@ -3,13 +3,12 @@ package node
import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/staking/slash"
)
// ProcessSlashCandidateMessage ..
func (node *Node) processSlashCandidateMessage(msgPayload []byte) {
if node.NodeConfig.ShardID != shard.BeaconChainShardID {
if !node.IsRunningBeaconChain() {
return
}
candidates := slash.Records{}

@ -217,7 +217,7 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) []error {
// Add new staking transactions to the pending staking transaction list.
func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) []error {
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
if node.IsRunningBeaconChain() {
if node.Blockchain().Config().IsPreStaking(node.Blockchain().CurrentHeader().Epoch()) {
poolTxs := types.PoolTransactions{}
for _, tx := range newStakingTxs {
@ -245,7 +245,7 @@ func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTra
func (node *Node) AddPendingStakingTransaction(
newStakingTx *staking.StakingTransaction,
) error {
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
if node.IsRunningBeaconChain() {
errs := node.addPendingStakingTransactions(staking.StakingTransactions{newStakingTx})
var err error
for i := range errs {
@ -404,7 +404,7 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
case proto_node.SlashCandidate:
nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "slash"}).Inc()
// only beacon chain node process slash candidate messages
if node.NodeConfig.ShardID != shard.BeaconChainShardID {
if !node.IsRunningBeaconChain() {
return nil, 0, errIgnoreBeaconMsg
}
case proto_node.Receipt:
@ -412,7 +412,7 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
case proto_node.CrossLink:
nodeNodeMessageCounterVec.With(prometheus.Labels{"type": "crosslink"}).Inc()
// only beacon chain node process crosslink messages
if node.NodeConfig.ShardID != shard.BeaconChainShardID ||
if !node.IsRunningBeaconChain() ||
node.NodeConfig.Role() == nodeconfig.ExplorerNode {
return nil, 0, errIgnoreBeaconMsg
}
@ -1016,7 +1016,7 @@ func New(
go func() { webhooks.DoPost(url, &doubleSign) }()
}
}
if node.NodeConfig.ShardID != shard.BeaconChainShardID {
if !node.IsRunningBeaconChain() {
go node.BroadcastSlash(&doubleSign)
} else {
records := slash.Records{doubleSign}
@ -1282,3 +1282,8 @@ func (node *Node) GetAddresses(epoch *big.Int) map[string]common.Address {
// self addresses map can never be nil
return node.KeysToAddrs
}
// IsRunningBeaconChain returns whether the node is running on beacon chain.
func (node *Node) IsRunningBeaconChain() bool {
return node.NodeConfig.ShardID == shard.BeaconChainShardID
}

@ -61,7 +61,7 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
// ProcessCrossLinkMessage verify and process Node/CrossLink message into crosslink when it's valid
func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
if node.IsRunningBeaconChain() {
pendingCLs, err := node.Blockchain().ReadPendingCrossLinks()
if err == nil && len(pendingCLs) >= maxPendingCrossLinkSize {
utils.Logger().Debug().

@ -171,7 +171,7 @@ func (node *Node) BroadcastCrossLink() {
return
}
if node.NodeConfig.ShardID == shard.BeaconChainShardID ||
if node.IsRunningBeaconChain() ||
!node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) {
// no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch
return
@ -308,7 +308,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// Verify cross links
// TODO: move into ValidateNewBlock
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
if node.IsRunningBeaconChain() {
err := node.VerifyBlockCrossLinks(newBlock)
if err != nil {
utils.Logger().Debug().Err(err).Msg("ops2 VerifyBlockCrossLinks Failed")
@ -336,7 +336,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// 3. [leader] send cross shard tx receipts to destination shard
func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
if node.Consensus.IsLeader() {
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
if node.IsRunningBeaconChain() {
node.BroadcastNewBlock(newBlock)
}
node.BroadcastCXReceipts(newBlock)
@ -360,7 +360,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
rnd := rand.Intn(100)
if rnd < 1 {
// Beacon validators also broadcast new blocks to make sure beacon sync is strong.
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
if node.IsRunningBeaconChain() {
node.BroadcastNewBlock(newBlock)
}
node.BroadcastCXReceipts(newBlock)

@ -9,17 +9,22 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/legacysync"
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
legdownloader "github.com/harmony-one/harmony/api/service/legacysync/downloader"
downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto"
"github.com/harmony-one/harmony/api/service/synchronize"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/hmy/downloader"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/harmony-one/harmony/shard"
)
// Constants related to doing syncing.
@ -33,6 +38,16 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
// BeaconSyncHook is the hook function called after inserted beacon in downloader
// TODO: This is a small misc piece of consensus logic. Better put it to consensus module.
func (node *Node) BeaconSyncHook() {
if node.Consensus.IsLeader() {
// TODO: Instead of leader, it would better be validator do this broadcast since leader do
// not have much idle resources.
node.BroadcastCrossLink()
}
}
// GenerateRandomString generates a random string with given length
func GenerateRandomString(n int) string {
b := make([]rune, n)
@ -176,28 +191,31 @@ func (p *LocalSyncingPeerProvider) SyncingPeers(shardID uint32) (peers []p2p.Pee
return peers, nil
}
// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks
func (node *Node) DoBeaconSyncing() {
// doBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks
func (node *Node) doBeaconSyncing() {
if node.NodeConfig.IsOffline {
return
}
go func(node *Node) {
// TODO ek – infinite loop; add shutdown/cleanup logic
for beaconBlock := range node.BeaconBlockChannel {
if node.beaconSync != nil {
err := node.beaconSync.UpdateBlockAndStatus(
beaconBlock, node.Beaconchain(), node.BeaconWorker, true,
)
if err != nil {
node.beaconSync.AddLastMileBlock(beaconBlock)
} else if node.Consensus.IsLeader() {
// Only leader broadcast crosslink to avoid spamming p2p
node.BroadcastCrossLink()
if !node.NodeConfig.Downloader {
// If Downloader is not working, we need also deal with blocks from beaconBlockChannel
go func(node *Node) {
// TODO ek – infinite loop; add shutdown/cleanup logic
for beaconBlock := range node.BeaconBlockChannel {
if node.beaconSync != nil {
err := node.beaconSync.UpdateBlockAndStatus(
beaconBlock, node.Beaconchain(), node.BeaconWorker, true,
)
if err != nil {
node.beaconSync.AddLastMileBlock(beaconBlock)
} else if node.Consensus.IsLeader() {
// Only leader broadcast crosslink to avoid spamming p2p
node.BroadcastCrossLink()
}
}
}
}
}(node)
}(node)
}
// TODO ek – infinite loop; add shutdown/cleanup logic
for {
@ -279,16 +297,25 @@ func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinCon
node.IsInSync.Set()
}
// SupportBeaconSyncing sync with beacon chain for archival node in beacon chan or non-beacon node
func (node *Node) SupportBeaconSyncing() {
go node.DoBeaconSyncing()
}
// SupportSyncing keeps sleeping until it's doing consensus or it's a leader.
func (node *Node) SupportSyncing() {
// SupportGRPCSyncServer do gRPC sync server
func (node *Node) SupportGRPCSyncServer() {
node.InitSyncingServer()
node.StartSyncingServer()
}
// StartGRPCSyncClient start the legacy gRPC sync process
func (node *Node) StartGRPCSyncClient() {
if node.Blockchain().ShardID() != shard.BeaconChainShardID {
utils.Logger().Info().
Uint32("shardID", node.Blockchain().ShardID()).
Msg("SupportBeaconSyncing")
go node.doBeaconSyncing()
}
node.supportSyncing()
}
// supportSyncing keeps sleeping until it's doing consensus or it's a leader.
func (node *Node) supportSyncing() {
joinConsensus := false
// Check if the current node is explorer node.
switch node.NodeConfig.Role() {
@ -313,7 +340,7 @@ func (node *Node) SupportSyncing() {
// InitSyncingServer starts downloader server.
func (node *Node) InitSyncingServer() {
if node.downloaderServer == nil {
node.downloaderServer = downloader.NewServer(node)
node.downloaderServer = legdownloader.NewServer(node)
}
}
@ -466,7 +493,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
} else {
response.Type = downloader_pb.DownloaderResponse_FAIL
syncPort := legacysync.GetSyncingPort(port)
client := downloader.ClientSetup(ip, syncPort)
client := legdownloader.ClientSetup(ip, syncPort)
if client == nil {
utils.Logger().Warn().
Str("ip", ip).
@ -540,12 +567,49 @@ func (node *Node) getEncodedBlockByHash(hash common.Hash) ([]byte, error) {
return b, nil
}
// GetMaxPeerHeight ...
func (node *Node) GetMaxPeerHeight() uint64 {
return node.stateSync.GetMaxPeerHeight()
// SyncStatus return the syncing status, including whether node is syncing
// and the target block number.
func (node *Node) SyncStatus(shardID uint32) (bool, uint64) {
ds := node.getDownloaders()
if ds == nil {
return false, 0
}
return ds.SyncStatus(shardID)
}
// IsOutOfSync return whether the node is out of sync of the given hsardID
func (node *Node) IsOutOfSync(shardID uint32) bool {
ds := node.getDownloaders()
if ds == nil {
return false
}
isSyncing, _ := ds.SyncStatus(shardID)
return !isSyncing
}
// IsOutOfSync ...
func (node *Node) IsOutOfSync(bc *core.BlockChain) bool {
return node.stateSync.IsOutOfSync(bc, false)
// SyncPeers return connected sync peers for each shard
func (node *Node) SyncPeers() map[string]int {
ds := node.getDownloaders()
if ds == nil {
return nil
}
nums := ds.NumPeers()
res := make(map[string]int)
for sid, num := range nums {
s := fmt.Sprintf("shard-%v", sid)
res[s] = num
}
return res
}
func (node *Node) getDownloaders() *downloader.Downloaders {
syncService := node.serviceManager.GetService(service.Synchronize)
if syncService == nil {
return nil
}
dsService, ok := syncService.(*synchronize.Service)
if !ok {
return nil
}
return dsService.Downloaders
}

@ -10,6 +10,8 @@ import (
"strings"
"sync"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/harmony-one/bls/ffi/go/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
@ -230,8 +232,9 @@ func (host *HostV2) C() (int, int, int) {
// AddStreamProtocol adds the stream protocols to the host to be started and closed
// when the host starts or close
func (host *HostV2) AddStreamProtocol(protocols ...sttypes.Protocol) {
for _, protocol := range protocols {
host.streamProtos = append(host.streamProtos, protocol)
for _, proto := range protocols {
host.streamProtos = append(host.streamProtos, proto)
host.h.SetStreamHandlerMatch(protocol.ID(proto.ProtoID()), proto.Match, proto.HandleStream)
}
}

@ -82,12 +82,12 @@ func (s *NetworkAPI) NetworkStatus(
if rosettaError != nil {
return nil, rosettaError
}
targetHeight := int64(s.hmy.NodeAPI.GetMaxPeerHeight())
isSyncing, targetHeight := s.hmy.NodeAPI.SyncStatus(s.hmy.BlockChain.ShardID())
syncStatus := common.SyncingFinish
if s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain) {
syncStatus = common.SyncingNewBlock
} else if targetHeight == 0 {
if targetHeight == 0 {
syncStatus = common.SyncingUnknown
} else if isSyncing {
syncStatus = common.SyncingNewBlock
}
stage := syncStatus.String()
@ -116,6 +116,13 @@ func (s *NetworkAPI) NetworkStatus(
}
}
targetInt := int64(targetHeight)
ss := &types.SyncStatus{
CurrentIndex: currentHeader.Number().Int64(),
TargetIndex: &targetInt,
Stage: &stage,
}
return &types.NetworkStatusResponse{
CurrentBlockIdentifier: currentBlockIdentifier,
OldestBlockIdentifier: oldestBlockIdentifier,
@ -124,12 +131,8 @@ func (s *NetworkAPI) NetworkStatus(
Index: genesisHeader.Number().Int64(),
Hash: genesisHeader.Hash().String(),
},
Peers: peers,
SyncStatus: &types.SyncStatus{
CurrentIndex: currentHeader.Number().Int64(),
TargetIndex: &targetHeight,
Stage: &stage,
},
Peers: peers,
SyncStatus: ss,
}, nil
}

@ -685,12 +685,12 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo(
// InSync returns if shard chain is syncing
func (s *PublicBlockchainService) InSync(ctx context.Context) (bool, error) {
return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain), nil
return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain.ShardID()), nil
}
// BeaconInSync returns if beacon chain is syncing
func (s *PublicBlockchainService) BeaconInSync(ctx context.Context) (bool, error) {
return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain), nil
return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain.ShardID()), nil
}
func isBlockGreaterThanLatest(hmy *hmy.Harmony, blockNum rpc.BlockNumber) bool {

@ -64,6 +64,7 @@ type NodeMetadata struct {
PeerID peer.ID `json:"peerid"`
Consensus ConsensusInternal `json:"consensus"`
C C `json:"p2p-connectivity"`
SyncPeers map[string]int `json:"sync-peers",omitempty`
}
// P captures the connected peers per topic

Loading…
Cancel
Save