From 390bdb67d835939bc951139d171478e3e88e0705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 7 Dec 2023 16:23:03 +0800 Subject: [PATCH] add client new functions to stream sync adapter, update GetAccountRanges parameters --- api/service/stagedstreamsync/adapter.go | 5 +++++ p2p/stream/protocols/sync/client.go | 27 +++++++++++-------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/api/service/stagedstreamsync/adapter.go b/api/service/stagedstreamsync/adapter.go index ca9c6a678..56c42b661 100644 --- a/api/service/stagedstreamsync/adapter.go +++ b/api/service/stagedstreamsync/adapter.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" + "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" sttypes "github.com/harmony-one/harmony/p2p/stream/types" ) @@ -20,6 +21,10 @@ type syncProtocol interface { GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error) GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error) GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error) + GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error) + GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error) + GetByteCodes(ctx context.Context, hs []common.Hash, bytes uint64, opts ...syncproto.Option) (codes [][]byte, stid sttypes.StreamID, err error) + GetTrieNodes(ctx context.Context, root common.Hash, paths []*message.TrieNodePathSet, bytes uint64, opts ...syncproto.Option) (nodes [][]byte, stid sttypes.StreamID, err error) RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream StreamFailed(stID sttypes.StreamID, reason string) diff --git a/p2p/stream/protocols/sync/client.go b/p2p/stream/protocols/sync/client.go index 9024142ce..45707e119 100644 --- a/p2p/stream/protocols/sync/client.go +++ b/p2p/stream/protocols/sync/client.go @@ -184,7 +184,7 @@ func (p *Protocol) GetNodeData(ctx context.Context, hs []common.Hash, opts ...Op // GetAccountRange do getAccountRange through sync stream protocol. // returns the accounts along with proofs as result, target stream id, and error -func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (accounts []*message.AccountData, proof []common.Hash, stid sttypes.StreamID, err error) { +func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getAccountRange") defer p.doMetricPostClientRequest("getAccountRange", err, timer) @@ -207,7 +207,7 @@ func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin // GetStorageRanges do getStorageRanges through sync stream protocol. // returns the slots along with proofs as result, target stream id, and error -func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (slots []*message.StorageData, proof []common.Hash, stid sttypes.StreamID, err error) { +func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getStorageRanges") defer p.doMetricPostClientRequest("getStorageRanges", err, timer) @@ -233,11 +233,9 @@ func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accou if err != nil { return } - slots = make([]*message.StorageData, 0) + slots = make([][]*message.StorageData, 0) for _, storage := range storages { - for _, data := range storage.Data { - slots = append(slots, data) - } + slots = append(slots, storage.Data) } return } @@ -735,8 +733,7 @@ func (req *getAccountRangeRequest) Encode() ([]byte, error) { return protobuf.Marshal(msg) } -// []*message.AccountData, []common.Hash -func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Response) ([]*message.AccountData, []common.Hash, error) { +func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Response) ([]*message.AccountData, [][]byte, error) { sResp, ok := resp.(*syncResponse) if !ok || sResp == nil { return nil, nil, errors.New("not sync response") @@ -744,7 +741,7 @@ func (req *getAccountRangeRequest) getAccountRangeFromResponse(resp sttypes.Resp return req.parseGetAccountRangeResponse(sResp) } -func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncResponse) ([]*message.AccountData, []common.Hash, error) { +func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncResponse) ([]*message.AccountData, [][]byte, error) { if errResp := resp.pb.GetErrorResponse(); errResp != nil { return nil, nil, errors.New(errResp.Error) } @@ -752,9 +749,9 @@ func (req *getAccountRangeRequest) parseGetAccountRangeResponse(resp *syncRespon if grResp == nil { return nil, nil, errors.New("response not GetAccountRange") } - proofs := make([]common.Hash, 0) + proofs := make([][]byte, 0) for _, proofBytes := range grResp.Proof { - var proof common.Hash + var proof []byte if err := rlp.DecodeBytes(proofBytes, &proof); err != nil { return nil, nil, errors.Wrap(err, "[GetAccountRangeResponse]") } @@ -817,7 +814,7 @@ func (req *getStorageRangesRequest) Encode() ([]byte, error) { } // []*message.AccountData, []common.Hash -func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Response) ([]*message.StoragesData, []common.Hash, error) { +func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Response) ([]*message.StoragesData, [][]byte, error) { sResp, ok := resp.(*syncResponse) if !ok || sResp == nil { return nil, nil, errors.New("not sync response") @@ -825,7 +822,7 @@ func (req *getStorageRangesRequest) getStorageRangesFromResponse(resp sttypes.Re return req.parseGetStorageRangesResponse(sResp) } -func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResponse) ([]*message.StoragesData, []common.Hash, error) { +func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResponse) ([]*message.StoragesData, [][]byte, error) { if errResp := resp.pb.GetErrorResponse(); errResp != nil { return nil, nil, errors.New(errResp.Error) } @@ -833,9 +830,9 @@ func (req *getStorageRangesRequest) parseGetStorageRangesResponse(resp *syncResp if grResp == nil { return nil, nil, errors.New("response not GetStorageRanges") } - proofs := make([]common.Hash, 0) + proofs := make([][]byte, 0) for _, proofBytes := range grResp.Proof { - var proof common.Hash + var proof []byte if err := rlp.DecodeBytes(proofBytes, &proof); err != nil { return nil, nil, errors.Wrap(err, "[GetStorageRangesResponse]") }