From 395a87515843c8b31f36e68026e3646f2db510f1 Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Sat, 13 Mar 2021 00:57:23 -0800 Subject: [PATCH] [stream] added rpc and rosetta and metadata interface about stream sync protocol --- hmy/hmy.go | 7 ++-- node/node_syncing.go | 66 +++++++++++++++++++++++++++++-------- rosetta/services/network.go | 23 +++++++------ rpc/blockchain.go | 4 +-- rpc/common/types.go | 1 + 5 files changed, 74 insertions(+), 27 deletions(-) diff --git a/hmy/hmy.go b/hmy/hmy.go index 8962c51b5..d2f4c992b 100644 --- a/hmy/hmy.go +++ b/hmy/hmy.go @@ -87,8 +87,9 @@ type NodeAPI interface { GetTransactionsCount(address, txType string) (uint64, error) GetStakingTransactionsCount(address, txType string) (uint64, error) IsCurrentlyLeader() bool - IsOutOfSync(*core.BlockChain) bool - GetMaxPeerHeight() uint64 + IsOutOfSync(shardID uint32) bool + SyncStatus(shardID uint32) (bool, uint64) + SyncPeers() map[string]int ReportStakingErrorSink() types.TransactionErrorReports ReportPlainErrorSink() types.TransactionErrorReports PendingCXReceipts() []*types.CXReceiptsProof @@ -186,6 +187,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata { c := commonRPC.C{} c.TotalKnownPeers, c.Connected, c.NotConnected = hmy.NodeAPI.PeerConnectivity() + syncPeers := hmy.NodeAPI.SyncPeers() consensusInternal := hmy.NodeAPI.GetConsensusInternal() return commonRPC.NodeMetadata{ @@ -204,6 +206,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata { PeerID: nodeconfig.GetPeerID(), Consensus: consensusInternal, C: c, + SyncPeers: syncPeers, } } diff --git a/node/node_syncing.go b/node/node_syncing.go index d333e1172..dc7423013 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -7,21 +7,24 @@ import ( "sync" "time" - "github.com/harmony-one/harmony/shard" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" + lru "github.com/hashicorp/golang-lru" + "github.com/pkg/errors" + + "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/legacysync" - "github.com/harmony-one/harmony/api/service/legacysync/downloader" + legdownloader "github.com/harmony-one/harmony/api/service/legacysync/downloader" downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto" + "github.com/harmony-one/harmony/api/service/synchronize" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/hmy/downloader" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" - lru "github.com/hashicorp/golang-lru" - "github.com/pkg/errors" + "github.com/harmony-one/harmony/shard" ) // Constants related to doing syncing. @@ -315,7 +318,7 @@ func (node *Node) supportSyncing() { // InitSyncingServer starts downloader server. func (node *Node) InitSyncingServer() { if node.downloaderServer == nil { - node.downloaderServer = downloader.NewServer(node) + node.downloaderServer = legdownloader.NewServer(node) } } @@ -468,7 +471,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in } else { response.Type = downloader_pb.DownloaderResponse_FAIL syncPort := legacysync.GetSyncingPort(port) - client := downloader.ClientSetup(ip, syncPort) + client := legdownloader.ClientSetup(ip, syncPort) if client == nil { utils.Logger().Warn(). Str("ip", ip). @@ -542,12 +545,49 @@ func (node *Node) getEncodedBlockByHash(hash common.Hash) ([]byte, error) { return b, nil } -// GetMaxPeerHeight ... -func (node *Node) GetMaxPeerHeight() uint64 { - return node.stateSync.GetMaxPeerHeight() +// SyncStatus return the syncing status, including whether node is syncing +// and the target block number. +func (node *Node) SyncStatus(shardID uint32) (bool, uint64) { + ds := node.getDownloaders() + if ds == nil { + return false, 0 + } + return ds.SyncStatus(shardID) +} + +// IsOutOfSync return whether the node is out of sync of the given hsardID +func (node *Node) IsOutOfSync(shardID uint32) bool { + ds := node.getDownloaders() + if ds == nil { + return false + } + isSyncing, _ := ds.SyncStatus(shardID) + return !isSyncing } -// IsOutOfSync ... -func (node *Node) IsOutOfSync(bc *core.BlockChain) bool { - return node.stateSync.IsOutOfSync(bc, false) +// SyncPeers return connected sync peers for each shard +func (node *Node) SyncPeers() map[string]int { + ds := node.getDownloaders() + if ds == nil { + return nil + } + nums := ds.NumPeers() + res := make(map[string]int) + for sid, num := range nums { + s := fmt.Sprintf("shard-%v", sid) + res[s] = num + } + return res +} + +func (node *Node) getDownloaders() *downloader.Downloaders { + syncService := node.serviceManager.GetService(service.Synchronize) + if syncService == nil { + return nil + } + dsService, ok := syncService.(*synchronize.Service) + if !ok { + return nil + } + return dsService.Downloaders } diff --git a/rosetta/services/network.go b/rosetta/services/network.go index 707feb24a..2b4d5709a 100644 --- a/rosetta/services/network.go +++ b/rosetta/services/network.go @@ -82,12 +82,12 @@ func (s *NetworkAPI) NetworkStatus( if rosettaError != nil { return nil, rosettaError } - targetHeight := int64(s.hmy.NodeAPI.GetMaxPeerHeight()) + isSyncing, targetHeight := s.hmy.NodeAPI.SyncStatus(s.hmy.BlockChain.ShardID()) syncStatus := common.SyncingFinish - if s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain) { - syncStatus = common.SyncingNewBlock - } else if targetHeight == 0 { + if targetHeight == 0 { syncStatus = common.SyncingUnknown + } else if isSyncing { + syncStatus = common.SyncingNewBlock } stage := syncStatus.String() @@ -116,6 +116,13 @@ func (s *NetworkAPI) NetworkStatus( } } + targetInt := int64(targetHeight) + ss := &types.SyncStatus{ + CurrentIndex: currentHeader.Number().Int64(), + TargetIndex: &targetInt, + Stage: &stage, + } + return &types.NetworkStatusResponse{ CurrentBlockIdentifier: currentBlockIdentifier, OldestBlockIdentifier: oldestBlockIdentifier, @@ -124,12 +131,8 @@ func (s *NetworkAPI) NetworkStatus( Index: genesisHeader.Number().Int64(), Hash: genesisHeader.Hash().String(), }, - Peers: peers, - SyncStatus: &types.SyncStatus{ - CurrentIndex: currentHeader.Number().Int64(), - TargetIndex: &targetHeight, - Stage: &stage, - }, + Peers: peers, + SyncStatus: ss, }, nil } diff --git a/rpc/blockchain.go b/rpc/blockchain.go index 88032e41d..b27667165 100644 --- a/rpc/blockchain.go +++ b/rpc/blockchain.go @@ -685,12 +685,12 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo( // InSync returns if shard chain is syncing func (s *PublicBlockchainService) InSync(ctx context.Context) (bool, error) { - return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain), nil + return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain.ShardID()), nil } // BeaconInSync returns if beacon chain is syncing func (s *PublicBlockchainService) BeaconInSync(ctx context.Context) (bool, error) { - return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain), nil + return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain.ShardID()), nil } func isBlockGreaterThanLatest(hmy *hmy.Harmony, blockNum rpc.BlockNumber) bool { diff --git a/rpc/common/types.go b/rpc/common/types.go index 69e418d02..253b6bd14 100644 --- a/rpc/common/types.go +++ b/rpc/common/types.go @@ -64,6 +64,7 @@ type NodeMetadata struct { PeerID peer.ID `json:"peerid"` Consensus ConsensusInternal `json:"consensus"` C C `json:"p2p-connectivity"` + SyncPeers map[string]int `json:"sync-peers",omitempty` } // P captures the connected peers per topic