[stream] added rpc and rosetta and metadata interface about stream sync protocol

pull/3588/head
Jacky Wang 4 years ago
parent bbe14e7500
commit 395a875158
No known key found for this signature in database
GPG Key ID: 1085CE5F4FF5842C
  1. 7
      hmy/hmy.go
  2. 66
      node/node_syncing.go
  3. 23
      rosetta/services/network.go
  4. 4
      rpc/blockchain.go
  5. 1
      rpc/common/types.go

@ -87,8 +87,9 @@ type NodeAPI interface {
GetTransactionsCount(address, txType string) (uint64, error) GetTransactionsCount(address, txType string) (uint64, error)
GetStakingTransactionsCount(address, txType string) (uint64, error) GetStakingTransactionsCount(address, txType string) (uint64, error)
IsCurrentlyLeader() bool IsCurrentlyLeader() bool
IsOutOfSync(*core.BlockChain) bool IsOutOfSync(shardID uint32) bool
GetMaxPeerHeight() uint64 SyncStatus(shardID uint32) (bool, uint64)
SyncPeers() map[string]int
ReportStakingErrorSink() types.TransactionErrorReports ReportStakingErrorSink() types.TransactionErrorReports
ReportPlainErrorSink() types.TransactionErrorReports ReportPlainErrorSink() types.TransactionErrorReports
PendingCXReceipts() []*types.CXReceiptsProof PendingCXReceipts() []*types.CXReceiptsProof
@ -186,6 +187,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata {
c := commonRPC.C{} c := commonRPC.C{}
c.TotalKnownPeers, c.Connected, c.NotConnected = hmy.NodeAPI.PeerConnectivity() c.TotalKnownPeers, c.Connected, c.NotConnected = hmy.NodeAPI.PeerConnectivity()
syncPeers := hmy.NodeAPI.SyncPeers()
consensusInternal := hmy.NodeAPI.GetConsensusInternal() consensusInternal := hmy.NodeAPI.GetConsensusInternal()
return commonRPC.NodeMetadata{ return commonRPC.NodeMetadata{
@ -204,6 +206,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata {
PeerID: nodeconfig.GetPeerID(), PeerID: nodeconfig.GetPeerID(),
Consensus: consensusInternal, Consensus: consensusInternal,
C: c, C: c,
SyncPeers: syncPeers,
} }
} }

@ -7,21 +7,24 @@ import (
"sync" "sync"
"time" "time"
"github.com/harmony-one/harmony/shard"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/legacysync" "github.com/harmony-one/harmony/api/service/legacysync"
"github.com/harmony-one/harmony/api/service/legacysync/downloader" legdownloader "github.com/harmony-one/harmony/api/service/legacysync/downloader"
downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto" downloader_pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto"
"github.com/harmony-one/harmony/api/service/synchronize"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/hmy/downloader"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
lru "github.com/hashicorp/golang-lru" "github.com/harmony-one/harmony/shard"
"github.com/pkg/errors"
) )
// Constants related to doing syncing. // Constants related to doing syncing.
@ -315,7 +318,7 @@ func (node *Node) supportSyncing() {
// InitSyncingServer starts downloader server. // InitSyncingServer starts downloader server.
func (node *Node) InitSyncingServer() { func (node *Node) InitSyncingServer() {
if node.downloaderServer == nil { if node.downloaderServer == nil {
node.downloaderServer = downloader.NewServer(node) node.downloaderServer = legdownloader.NewServer(node)
} }
} }
@ -468,7 +471,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
} else { } else {
response.Type = downloader_pb.DownloaderResponse_FAIL response.Type = downloader_pb.DownloaderResponse_FAIL
syncPort := legacysync.GetSyncingPort(port) syncPort := legacysync.GetSyncingPort(port)
client := downloader.ClientSetup(ip, syncPort) client := legdownloader.ClientSetup(ip, syncPort)
if client == nil { if client == nil {
utils.Logger().Warn(). utils.Logger().Warn().
Str("ip", ip). Str("ip", ip).
@ -542,12 +545,49 @@ func (node *Node) getEncodedBlockByHash(hash common.Hash) ([]byte, error) {
return b, nil return b, nil
} }
// GetMaxPeerHeight ... // SyncStatus return the syncing status, including whether node is syncing
func (node *Node) GetMaxPeerHeight() uint64 { // and the target block number.
return node.stateSync.GetMaxPeerHeight() func (node *Node) SyncStatus(shardID uint32) (bool, uint64) {
ds := node.getDownloaders()
if ds == nil {
return false, 0
}
return ds.SyncStatus(shardID)
}
// IsOutOfSync return whether the node is out of sync of the given hsardID
func (node *Node) IsOutOfSync(shardID uint32) bool {
ds := node.getDownloaders()
if ds == nil {
return false
}
isSyncing, _ := ds.SyncStatus(shardID)
return !isSyncing
} }
// IsOutOfSync ... // SyncPeers return connected sync peers for each shard
func (node *Node) IsOutOfSync(bc *core.BlockChain) bool { func (node *Node) SyncPeers() map[string]int {
return node.stateSync.IsOutOfSync(bc, false) ds := node.getDownloaders()
if ds == nil {
return nil
}
nums := ds.NumPeers()
res := make(map[string]int)
for sid, num := range nums {
s := fmt.Sprintf("shard-%v", sid)
res[s] = num
}
return res
}
func (node *Node) getDownloaders() *downloader.Downloaders {
syncService := node.serviceManager.GetService(service.Synchronize)
if syncService == nil {
return nil
}
dsService, ok := syncService.(*synchronize.Service)
if !ok {
return nil
}
return dsService.Downloaders
} }

@ -82,12 +82,12 @@ func (s *NetworkAPI) NetworkStatus(
if rosettaError != nil { if rosettaError != nil {
return nil, rosettaError return nil, rosettaError
} }
targetHeight := int64(s.hmy.NodeAPI.GetMaxPeerHeight()) isSyncing, targetHeight := s.hmy.NodeAPI.SyncStatus(s.hmy.BlockChain.ShardID())
syncStatus := common.SyncingFinish syncStatus := common.SyncingFinish
if s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain) { if targetHeight == 0 {
syncStatus = common.SyncingNewBlock
} else if targetHeight == 0 {
syncStatus = common.SyncingUnknown syncStatus = common.SyncingUnknown
} else if isSyncing {
syncStatus = common.SyncingNewBlock
} }
stage := syncStatus.String() stage := syncStatus.String()
@ -116,6 +116,13 @@ func (s *NetworkAPI) NetworkStatus(
} }
} }
targetInt := int64(targetHeight)
ss := &types.SyncStatus{
CurrentIndex: currentHeader.Number().Int64(),
TargetIndex: &targetInt,
Stage: &stage,
}
return &types.NetworkStatusResponse{ return &types.NetworkStatusResponse{
CurrentBlockIdentifier: currentBlockIdentifier, CurrentBlockIdentifier: currentBlockIdentifier,
OldestBlockIdentifier: oldestBlockIdentifier, OldestBlockIdentifier: oldestBlockIdentifier,
@ -124,12 +131,8 @@ func (s *NetworkAPI) NetworkStatus(
Index: genesisHeader.Number().Int64(), Index: genesisHeader.Number().Int64(),
Hash: genesisHeader.Hash().String(), Hash: genesisHeader.Hash().String(),
}, },
Peers: peers, Peers: peers,
SyncStatus: &types.SyncStatus{ SyncStatus: ss,
CurrentIndex: currentHeader.Number().Int64(),
TargetIndex: &targetHeight,
Stage: &stage,
},
}, nil }, nil
} }

@ -685,12 +685,12 @@ func (s *PublicBlockchainService) GetStakingNetworkInfo(
// InSync returns if shard chain is syncing // InSync returns if shard chain is syncing
func (s *PublicBlockchainService) InSync(ctx context.Context) (bool, error) { func (s *PublicBlockchainService) InSync(ctx context.Context) (bool, error) {
return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain), nil return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BlockChain.ShardID()), nil
} }
// BeaconInSync returns if beacon chain is syncing // BeaconInSync returns if beacon chain is syncing
func (s *PublicBlockchainService) BeaconInSync(ctx context.Context) (bool, error) { func (s *PublicBlockchainService) BeaconInSync(ctx context.Context) (bool, error) {
return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain), nil return !s.hmy.NodeAPI.IsOutOfSync(s.hmy.BeaconChain.ShardID()), nil
} }
func isBlockGreaterThanLatest(hmy *hmy.Harmony, blockNum rpc.BlockNumber) bool { func isBlockGreaterThanLatest(hmy *hmy.Harmony, blockNum rpc.BlockNumber) bool {

@ -64,6 +64,7 @@ type NodeMetadata struct {
PeerID peer.ID `json:"peerid"` PeerID peer.ID `json:"peerid"`
Consensus ConsensusInternal `json:"consensus"` Consensus ConsensusInternal `json:"consensus"`
C C `json:"p2p-connectivity"` C C `json:"p2p-connectivity"`
SyncPeers map[string]int `json:"sync-peers",omitempty`
} }
// P captures the connected peers per topic // P captures the connected peers per topic

Loading…
Cancel
Save