add client new functions to stream sync adapter, update GetAccountRanges parameters

pull/4465/head
“GheisMohammadi” 12 months ago
parent e141f79818
commit 390bdb67d8
No known key found for this signature in database
GPG Key ID: 15073AED3829FE90
  1. 5
      api/service/stagedstreamsync/adapter.go
  2. 27
      p2p/stream/protocols/sync/client.go

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager" "github.com/harmony-one/harmony/p2p/stream/common/streammanager"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types"
) )
@ -20,6 +21,10 @@ type syncProtocol interface {
GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error) GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error)
GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error) GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error)
GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error) GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error)
GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error)
GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error)
GetByteCodes(ctx context.Context, hs []common.Hash, bytes uint64, opts ...syncproto.Option) (codes [][]byte, stid sttypes.StreamID, err error)
GetTrieNodes(ctx context.Context, root common.Hash, paths []*message.TrieNodePathSet, bytes uint64, opts ...syncproto.Option) (nodes [][]byte, stid sttypes.StreamID, err error)
RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string) StreamFailed(stID sttypes.StreamID, reason string)

@ -184,7 +184,7 @@ func (p *Protocol) GetNodeData(ctx context.Context, hs []common.Hash, opts ...Op
// GetAccountRange do getAccountRange through sync stream protocol. // GetAccountRange do getAccountRange through sync stream protocol.
// returns the accounts along with proofs as result, target stream id, and error // returns the accounts along with proofs as result, target stream id, and error
func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (accounts []*message.AccountData, proof []common.Hash, stid sttypes.StreamID, err error) { func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error) {
timer := p.doMetricClientRequest("getAccountRange") timer := p.doMetricClientRequest("getAccountRange")
defer p.doMetricPostClientRequest("getAccountRange", err, timer) defer p.doMetricPostClientRequest("getAccountRange", err, timer)
@ -207,7 +207,7 @@ func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin
// GetStorageRanges do getStorageRanges through sync stream protocol. // GetStorageRanges do getStorageRanges through sync stream protocol.
// returns the slots along with proofs as result, target stream id, and error // returns the slots along with proofs as result, target stream id, and error
func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (slots []*message.StorageData, proof []common.Hash, stid sttypes.StreamID, err error) { func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error) {
timer := p.doMetricClientRequest("getStorageRanges") timer := p.doMetricClientRequest("getStorageRanges")
defer p.doMetricPostClientRequest("getStorageRanges", err, timer) defer p.doMetricPostClientRequest("getStorageRanges", err, timer)
@ -233,11 +233,9 @@ func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accou
if err != nil { if err != nil {
return return
} }
slots = make([]*message.StorageData, 0) slots = make([][]*message.StorageData, 0)
for _, storage := range storages { for _, storage := range storages {
for _, data := range storage.Data { slots = append(slots, storage.Data)
slots = append(slots, data)
}
} }
return return
} }
@ -735,8 +733,7 @@ func (req *getAccountRangeRequest) Encode() ([]byte, error) {
return protobuf.Marshal(msg) return protobuf.Marshal(msg)
} }
// []*message.AccountData, []common.Hash func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Response) ([]*message.AccountData, [][]byte, error) {
func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Response) ([]*message.AccountData, []common.Hash, error) {
sResp, ok := resp.(*syncResponse) sResp, ok := resp.(*syncResponse)
if !ok || sResp == nil { if !ok || sResp == nil {
return nil, nil, errors.New("not sync response") return nil, nil, errors.New("not sync response")
@ -744,7 +741,7 @@ func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Resp
return req.parseGetAccountRangeResponse(sResp) return req.parseGetAccountRangeResponse(sResp)
} }
func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncResponse) ([]*message.AccountData, []common.Hash, error) { func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncResponse) ([]*message.AccountData, [][]byte, error) {
if errResp := resp.pb.GetErrorResponse(); errResp != nil { if errResp := resp.pb.GetErrorResponse(); errResp != nil {
return nil, nil, errors.New(errResp.Error) return nil, nil, errors.New(errResp.Error)
} }
@ -752,9 +749,9 @@ func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncRespon
if grResp == nil { if grResp == nil {
return nil, nil, errors.New("response not GetAccountRange") return nil, nil, errors.New("response not GetAccountRange")
} }
proofs := make([]common.Hash, 0) proofs := make([][]byte, 0)
for _, proofBytes := range grResp.Proof { for _, proofBytes := range grResp.Proof {
var proof common.Hash var proof []byte
if err := rlp.DecodeBytes(proofBytes, &proof); err != nil { if err := rlp.DecodeBytes(proofBytes, &proof); err != nil {
return nil, nil, errors.Wrap(err, "[GetAccountRangeResponse]") return nil, nil, errors.Wrap(err, "[GetAccountRangeResponse]")
} }
@ -817,7 +814,7 @@ func (req *getStorageRangesRequest) Encode() ([]byte, error) {
} }
// []*message.AccountData, []common.Hash // []*message.AccountData, []common.Hash
func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Response) ([]*message.StoragesData, []common.Hash, error) { func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Response) ([]*message.StoragesData, [][]byte, error) {
sResp, ok := resp.(*syncResponse) sResp, ok := resp.(*syncResponse)
if !ok || sResp == nil { if !ok || sResp == nil {
return nil, nil, errors.New("not sync response") return nil, nil, errors.New("not sync response")
@ -825,7 +822,7 @@ func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Re
return req.parseGetStorageRangesResponse(sResp) return req.parseGetStorageRangesResponse(sResp)
} }
func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResponse) ([]*message.StoragesData, []common.Hash, error) { func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResponse) ([]*message.StoragesData, [][]byte, error) {
if errResp := resp.pb.GetErrorResponse(); errResp != nil { if errResp := resp.pb.GetErrorResponse(); errResp != nil {
return nil, nil, errors.New(errResp.Error) return nil, nil, errors.New(errResp.Error)
} }
@ -833,9 +830,9 @@ func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResp
if grResp == nil { if grResp == nil {
return nil, nil, errors.New("response not GetStorageRanges") return nil, nil, errors.New("response not GetStorageRanges")
} }
proofs := make([]common.Hash, 0) proofs := make([][]byte, 0)
for _, proofBytes := range grResp.Proof { for _, proofBytes := range grResp.Proof {
var proof common.Hash var proof []byte
if err := rlp.DecodeBytes(proofBytes, &proof); err != nil { if err := rlp.DecodeBytes(proofBytes, &proof); err != nil {
return nil, nil, errors.Wrap(err, "[GetStorageRangesResponse]") return nil, nil, errors.Wrap(err, "[GetStorageRangesResponse]")
} }

Loading…
Cancel
Save