The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
woop/p2p/stream/protocols/sync/stream.go

379 lines
9.9 KiB

package sync
import (
"fmt"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
Release Candidate: dev -> main (#4319) * Rebase dev branch to current main branch (#4318) * add openssl compatibility on m2 chips using darwin (#4302) Adds support for OpenSSL on MacOS Ventura using m2 chips. * [dumpdb] ensure each cross link is dumped (#4311) * bump libp2p to version 0.24.0 and update its dependencies and relevant tests (#4315) * Removed legacy syncing peer provider. (#4260) * Removed legacy syncing peer provider. * Fix localnet. * Fix migrate version. * Rebased on main. * Fix formatting. * Remove blockchain dependency from engine. (#4310) * Consensus doesn't require anymore `Node` as a circular dependency. * Rebased upon main. * Removed engine beacon chain dependency. * Fixed nil error. * Fixed error. * bump libp2p to version 0.24.0 and update its dependencies and relevant tests * fix format, remove wrongly added configs * add back wrongly deleted comment * fix travis go checker Co-authored-by: Konstantin <355847+Frozen@users.noreply.github.com> Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”> * bump libp2p to version 0.24.0 and update its dependencies and relevant tests (#4315) * Removed legacy syncing peer provider. (#4260) * Removed legacy syncing peer provider. * Fix localnet. * Fix migrate version. * Rebased on main. * Fix formatting. * Remove blockchain dependency from engine. (#4310) * Consensus doesn't require anymore `Node` as a circular dependency. * Rebased upon main. * Removed engine beacon chain dependency. * Fixed nil error. * Fixed error. * bump libp2p to version 0.24.0 and update its dependencies and relevant tests * fix format, remove wrongly added configs * add back wrongly deleted comment * fix travis go checker Co-authored-by: Konstantin <355847+Frozen@users.noreply.github.com> Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”> * Fix for consensus stuck. (#4307) * Added check for block validity. * Starts new view change if block invalid. * Revert "Starts new view change if block invalid." This reverts commit e889fa5da2e0780f087ab7dae5106b96287706db. * staged dns sync v1.0 (#4316) * staged dns sync v1.0 * enabled stream downloader for localnet * fix code review issues * remove extra lock Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”> * add description for closing client and change randomize process to ma… (#4276) * add description for closing client and change randomize process to make sure only online nodes are added to sync config * fix sync test * fix legacy limitNumPeers test * add WaitForEachPeerToConnect to node configs to make parallel peer connection optional Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”> * Small fixes and code cleanup for network stack. (#4320) * staged dns sync v1.0 * enabled stream downloader for localnet * fix code review issues * remove extra lock * staged dns sync v1.0 * Fixed, code clean up and other. * Fixed, code clean up and other. * Fixed, code clean up and other. * Fix config. Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”> * Fix not disable cache in archival mode (#4322) * Feature registry (#4324) * Registry for services. * Test. * Reverted comment. * Fix. * Slash fix (#4284) * Implementation of new slashing rate calculation * Write tests for then new slashing rate calculation * Add engine.applySlashing tests * fix #4059 Co-authored-by: Alex Brezas <abresas@gmail.com> Co-authored-by: Dimitris Lamprinos <pkakelas@gmail.com> * Bump github.com/aws/aws-sdk-go from 1.30.1 to 1.33.0 (#4325) (#4328) Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.1 to 1.33.0. - [Release notes](https://github.com/aws/aws-sdk-go/releases) - [Changelog](https://github.com/aws/aws-sdk-go/blob/v1.33.0/CHANGELOG.md) - [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.1...v1.33.0) --- updated-dependencies: - dependency-name: github.com/aws/aws-sdk-go dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump github.com/btcsuite/btcd from 0.21.0-beta to 0.23.2 (#4327) (#4329) Bumps [github.com/btcsuite/btcd](https://github.com/btcsuite/btcd) from 0.21.0-beta to 0.23.2. - [Release notes](https://github.com/btcsuite/btcd/releases) - [Changelog](https://github.com/btcsuite/btcd/blob/master/CHANGES) - [Commits](https://github.com/btcsuite/btcd/compare/v0.21.0-beta...v0.23.2) --- updated-dependencies: - dependency-name: github.com/btcsuite/btcd dependency-type: indirect ... Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Max <82761650+MaxMustermann2@users.noreply.github.com> Co-authored-by: Gheis <36589218+GheisMohammadi@users.noreply.github.com> Co-authored-by: Konstantin <355847+Frozen@users.noreply.github.com> Co-authored-by: “GheisMohammadi” <“Gheis.Mohammadi@gmail.com”> Co-authored-by: Danny Willis <102543677+dannyposi@users.noreply.github.com> Co-authored-by: PeekPI <894646171@QQ.COM> Co-authored-by: Alex Brezas <abresas@gmail.com> Co-authored-by: Dimitris Lamprinos <pkakelas@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2 years ago
libp2p_network "github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
// syncStream is the structure for a stream running sync protocol.
type syncStream struct {
// Basic stream
*sttypes.BaseStream
protocol *Protocol
chain chainHelper
// pipeline channels
reqC chan *syncpb.Request
respC chan *syncpb.Response
// close related fields. Concurrent call of close is possible.
closeC chan struct{}
closeStat uint32
logger zerolog.Logger
}
// wrapStream wraps the raw libp2p stream to syncStream
func (p *Protocol) wrapStream(raw libp2p_network.Stream) *syncStream {
bs := sttypes.NewBaseStream(raw)
logger := p.logger.With().
Str("ID", string(bs.ID())).
Str("Remote Protocol", string(bs.ProtoID())).
Logger()
return &syncStream{
BaseStream: bs,
protocol: p,
chain: newChainHelper(p.chain, p.schedule),
reqC: make(chan *syncpb.Request, 100),
respC: make(chan *syncpb.Response, 100),
closeC: make(chan struct{}),
closeStat: 0,
logger: logger,
}
}
func (st *syncStream) run() {
st.logger.Info().Str("StreamID", string(st.ID())).Msg("running sync protocol on stream")
defer st.logger.Info().Str("StreamID", string(st.ID())).Msg("end running sync protocol on stream")
go st.handleReqLoop()
go st.handleRespLoop()
st.readMsgLoop()
}
// readMsgLoop is the loop
func (st *syncStream) readMsgLoop() {
for {
msg, err := st.readMsg()
if err != nil {
if err := st.Close(); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
}
return
}
st.deliverMsg(msg)
}
}
// deliverMsg process the delivered message and forward to the corresponding channel
func (st *syncStream) deliverMsg(msg protobuf.Message) {
syncMsg := msg.(*syncpb.Message)
if syncMsg == nil {
st.logger.Info().Str("message", msg.String()).Msg("received unexpected sync message")
return
}
if req := syncMsg.GetReq(); req != nil {
go func() {
select {
case st.reqC <- req:
case <-time.After(1 * time.Minute):
st.logger.Warn().Str("request", req.String()).
Msg("request handler severely jammed, message dropped")
}
}()
}
if resp := syncMsg.GetResp(); resp != nil {
go func() {
select {
case st.respC <- resp:
case <-time.After(1 * time.Minute):
st.logger.Warn().Str("response", resp.String()).
Msg("response handler severely jammed, message dropped")
}
}()
}
return
}
func (st *syncStream) handleReqLoop() {
for {
select {
case req := <-st.reqC:
st.protocol.rl.LimitRequest(st.ID())
err := st.handleReq(req)
if err != nil {
st.logger.Info().Err(err).Str("request", req.String()).
Msg("handle request error. Closing stream")
if err := st.Close(); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
}
return
}
case <-st.closeC:
return
}
}
}
func (st *syncStream) handleRespLoop() {
for {
select {
case resp := <-st.respC:
st.handleResp(resp)
case <-st.closeC:
return
}
}
}
// Close stops the stream handling and closes the underlying stream
func (st *syncStream) Close() error {
notClosed := atomic.CompareAndSwapUint32(&st.closeStat, 0, 1)
if !notClosed {
// Already closed by another goroutine. Directly return
return nil
}
if err := st.protocol.sm.RemoveStream(st.ID()); err != nil {
st.logger.Err(err).Str("stream ID", string(st.ID())).
Msg("failed to remove sync stream on close")
}
close(st.closeC)
return st.BaseStream.Close()
}
// CloseOnExit reset the stream on exiting node
func (st *syncStream) CloseOnExit() error {
notClosed := atomic.CompareAndSwapUint32(&st.closeStat, 0, 1)
if !notClosed {
// Already closed by another goroutine. Directly return
return nil
}
close(st.closeC)
return st.BaseStream.CloseOnExit()
}
func (st *syncStream) handleReq(req *syncpb.Request) error {
if gnReq := req.GetGetBlockNumberRequest(); gnReq != nil {
return st.handleGetBlockNumberRequest(req.ReqId)
}
if ghReq := req.GetGetBlockHashesRequest(); ghReq != nil {
return st.handleGetBlockHashesRequest(req.ReqId, ghReq)
}
if bnReq := req.GetGetBlocksByNumRequest(); bnReq != nil {
return st.handleGetBlocksByNumRequest(req.ReqId, bnReq)
}
if bhReq := req.GetGetBlocksByHashesRequest(); bhReq != nil {
return st.handleGetBlocksByHashesRequest(req.ReqId, bhReq)
}
// unsupported request type
return st.handleUnknownRequest(req.ReqId)
}
func (st *syncStream) handleGetBlockNumberRequest(rid uint64) error {
serverRequestCounterVec.With(prometheus.Labels{
"topic": string(st.ProtoID()),
"request_type": "getBlockNumber",
}).Inc()
resp := st.computeBlockNumberResp(rid)
if err := st.writeMsg(resp); err != nil {
return errors.Wrap(err, "[GetBlockNumber]: writeMsg")
}
return nil
}
func (st *syncStream) handleGetBlockHashesRequest(rid uint64, req *syncpb.GetBlockHashesRequest) error {
serverRequestCounterVec.With(prometheus.Labels{
"topic": string(st.ProtoID()),
"request_type": "getBlockHashes",
}).Inc()
resp, err := st.computeGetBlockHashesResp(rid, req.Nums)
if err != nil {
resp = syncpb.MakeErrorResponseMessage(rid, err)
}
if writeErr := st.writeMsg(resp); writeErr != nil {
if err == nil {
err = writeErr
} else {
err = fmt.Errorf("%v; [writeMsg] %v", err.Error(), writeErr)
}
}
return errors.Wrap(err, "[GetBlockHashes]")
}
func (st *syncStream) handleGetBlocksByNumRequest(rid uint64, req *syncpb.GetBlocksByNumRequest) error {
serverRequestCounterVec.With(prometheus.Labels{
"topic": string(st.ProtoID()),
"request_type": "getBlocksByNumber",
}).Inc()
resp, err := st.computeRespFromBlockNumber(rid, req.Nums)
if resp == nil && err != nil {
resp = syncpb.MakeErrorResponseMessage(rid, err)
}
if writeErr := st.writeMsg(resp); writeErr != nil {
if err == nil {
err = writeErr
} else {
err = fmt.Errorf("%v; [writeMsg] %v", err.Error(), writeErr)
}
}
return errors.Wrap(err, "[GetBlocksByNumber]")
}
func (st *syncStream) handleGetBlocksByHashesRequest(rid uint64, req *syncpb.GetBlocksByHashesRequest) error {
serverRequestCounterVec.With(prometheus.Labels{
"topic": string(st.ProtoID()),
"request_type": "getBlocksByHashes",
}).Inc()
hashes := bytesToHashes(req.BlockHashes)
resp, err := st.computeRespFromBlockHashes(rid, hashes)
if resp == nil && err != nil {
resp = syncpb.MakeErrorResponseMessage(rid, err)
}
if writeErr := st.writeMsg(resp); writeErr != nil {
if err == nil {
err = writeErr
} else {
err = fmt.Errorf("%v; [writeMsg] %v", err.Error(), writeErr)
}
}
return errors.Wrap(err, "[GetBlocksByHashes]")
}
func (st *syncStream) handleUnknownRequest(rid uint64) error {
serverRequestCounterVec.With(prometheus.Labels{
"topic": string(st.ProtoID()),
"request_type": "unknown",
}).Inc()
resp := syncpb.MakeErrorResponseMessage(rid, errUnknownReqType)
return st.writeMsg(resp)
}
func (st *syncStream) handleResp(resp *syncpb.Response) {
st.protocol.rm.DeliverResponse(st.ID(), &syncResponse{resp})
}
func (st *syncStream) readMsg() (*syncpb.Message, error) {
b, err := st.ReadBytes()
if err != nil {
return nil, err
}
var msg = &syncpb.Message{}
if err := protobuf.Unmarshal(b, msg); err != nil {
return nil, err
}
return msg, nil
}
func (st *syncStream) writeMsg(msg *syncpb.Message) error {
b, err := protobuf.Marshal(msg)
if err != nil {
return err
}
return st.WriteBytes(b)
}
func (st *syncStream) computeBlockNumberResp(rid uint64) *syncpb.Message {
bn := st.chain.getCurrentBlockNumber()
return syncpb.MakeGetBlockNumberResponseMessage(rid, bn)
}
func (st syncStream) computeGetBlockHashesResp(rid uint64, bns []uint64) (*syncpb.Message, error) {
if len(bns) > GetBlockHashesAmountCap {
err := fmt.Errorf("GetBlockHashes amount exceed cap: %v>%v", len(bns), GetBlockHashesAmountCap)
return nil, err
}
hashes := st.chain.getBlockHashes(bns)
return syncpb.MakeGetBlockHashesResponseMessage(rid, hashes), nil
}
func (st *syncStream) computeRespFromBlockNumber(rid uint64, bns []uint64) (*syncpb.Message, error) {
if len(bns) > GetBlocksByNumAmountCap {
err := fmt.Errorf("GetBlocksByNum amount exceed cap: %v>%v", len(bns), GetBlocksByNumAmountCap)
return nil, err
}
blocks, err := st.chain.getBlocksByNumber(bns)
if err != nil {
return nil, err
}
var (
blocksBytes = make([][]byte, 0, len(blocks))
sigs = make([][]byte, 0, len(blocks))
)
for _, block := range blocks {
bb, err := rlp.EncodeToBytes(block)
if err != nil {
return nil, err
}
blocksBytes = append(blocksBytes, bb)
var sig []byte
if block != nil {
sig = block.GetCurrentCommitSig()
}
sigs = append(sigs, sig)
}
return syncpb.MakeGetBlocksByNumResponseMessage(rid, blocksBytes, sigs), nil
}
func (st *syncStream) computeRespFromBlockHashes(rid uint64, hs []common.Hash) (*syncpb.Message, error) {
if len(hs) > GetBlocksByHashesAmountCap {
err := fmt.Errorf("GetBlockByHashes amount exceed cap: %v > %v", len(hs), GetBlocksByHashesAmountCap)
return nil, err
}
blocks, err := st.chain.getBlocksByHashes(hs)
if err != nil {
return nil, err
}
var (
blocksBytes = make([][]byte, 0, len(blocks))
sigs = make([][]byte, 0, len(blocks))
)
for _, block := range blocks {
bb, err := rlp.EncodeToBytes(block)
if err != nil {
return nil, err
}
blocksBytes = append(blocksBytes, bb)
var sig []byte
if block != nil {
sig = block.GetCurrentCommitSig()
}
sigs = append(sigs, sig)
}
return syncpb.MakeGetBlocksByHashesResponseMessage(rid, blocksBytes, sigs), nil
}
func bytesToHashes(bs [][]byte) []common.Hash {
hs := make([]common.Hash, 0, len(bs))
for _, b := range bs {
var h common.Hash
copy(h[:], b)
hs = append(hs, h)
}
return hs
}