You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
8.9 KiB
321 lines
8.9 KiB
package sync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/harmony-one/harmony/consensus/engine"
|
|
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
|
|
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
|
|
"github.com/harmony-one/harmony/internal/utils"
|
|
"github.com/harmony-one/harmony/p2p/discovery"
|
|
"github.com/harmony-one/harmony/p2p/stream/common/ratelimiter"
|
|
"github.com/harmony-one/harmony/p2p/stream/common/requestmanager"
|
|
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
|
|
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
|
|
"github.com/harmony-one/harmony/shard"
|
|
"github.com/hashicorp/go-version"
|
|
libp2p_host "github.com/libp2p/go-libp2p/core/host"
|
|
libp2p_network "github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
const (
|
|
// serviceSpecifier is the specifier for the service.
|
|
serviceSpecifier = "sync"
|
|
)
|
|
|
|
var (
|
|
version100, _ = version.NewVersion("1.0.0")
|
|
|
|
// MyVersion is the version of sync protocol of the local node
|
|
MyVersion = version100
|
|
|
|
// MinVersion is the minimum version for matching function
|
|
MinVersion = version100
|
|
)
|
|
|
|
type (
|
|
// Protocol is the protocol for sync streaming
|
|
Protocol struct {
|
|
chain engine.ChainReader // provide SYNC data
|
|
beaconNode bool // is beacon node or shard chain node
|
|
schedule shardingconfig.Schedule // provide schedule information
|
|
rl ratelimiter.RateLimiter // limit the incoming request rate
|
|
sm streammanager.StreamManager // stream management
|
|
rm requestmanager.RequestManager // deliver the response from stream
|
|
disc discovery.Discovery
|
|
|
|
config Config
|
|
logger zerolog.Logger
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
closeC chan struct{}
|
|
}
|
|
|
|
// Config is the sync protocol config
|
|
Config struct {
|
|
Chain engine.ChainReader
|
|
Host libp2p_host.Host
|
|
Discovery discovery.Discovery
|
|
ShardID nodeconfig.ShardID
|
|
Network nodeconfig.NetworkType
|
|
BeaconNode bool
|
|
MaxAdvertiseWaitTime int
|
|
// stream manager config
|
|
SmSoftLowCap int
|
|
SmHardLowCap int
|
|
SmHiCap int
|
|
DiscBatch int
|
|
}
|
|
)
|
|
|
|
// NewProtocol creates a new sync protocol
|
|
func NewProtocol(config Config) *Protocol {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
sp := &Protocol{
|
|
chain: config.Chain,
|
|
beaconNode: config.BeaconNode,
|
|
disc: config.Discovery,
|
|
config: config,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
closeC: make(chan struct{}),
|
|
}
|
|
smConfig := streammanager.Config{
|
|
SoftLoCap: config.SmSoftLowCap,
|
|
HardLoCap: config.SmHardLowCap,
|
|
HiCap: config.SmHiCap,
|
|
DiscBatch: config.DiscBatch,
|
|
}
|
|
sp.sm = streammanager.NewStreamManager(sp.ProtoID(), config.Host, config.Discovery,
|
|
sp.HandleStream, smConfig)
|
|
|
|
sp.rl = ratelimiter.NewRateLimiter(sp.sm, rateLimiterGlobalRequestPerSecond, rateLimiterSingleRequestsPerSecond)
|
|
|
|
sp.rm = requestmanager.NewRequestManager(sp.sm)
|
|
|
|
sp.logger = utils.Logger().With().Str("Protocol", string(sp.ProtoID())).Logger()
|
|
return sp
|
|
}
|
|
|
|
// Start starts the sync protocol
|
|
func (p *Protocol) Start() {
|
|
p.sm.Start()
|
|
p.rm.Start()
|
|
p.rl.Start()
|
|
// If it's not EpochChain, advertise
|
|
if p.beaconNode || p.chain.ShardID() != shard.BeaconChainShardID {
|
|
go p.advertiseLoop()
|
|
}
|
|
}
|
|
|
|
// Close close the protocol
|
|
func (p *Protocol) Close() {
|
|
p.rl.Close()
|
|
p.rm.Close()
|
|
p.sm.Close()
|
|
p.cancel()
|
|
close(p.closeC)
|
|
}
|
|
|
|
// Specifier return the specifier for the protocol
|
|
func (p *Protocol) Specifier() string {
|
|
return serviceSpecifier + "/" + strconv.Itoa(int(p.config.ShardID))
|
|
}
|
|
|
|
// ProtoID return the ProtoID of the sync protocol
|
|
func (p *Protocol) ProtoID() sttypes.ProtoID {
|
|
return p.protoIDByVersion(MyVersion)
|
|
}
|
|
|
|
// ShardProtoID returns the ProtoID of the sync protocol for shard nodes
|
|
func (p *Protocol) ShardProtoID() sttypes.ProtoID {
|
|
return p.protoIDByVersionForShardNodes(MyVersion)
|
|
}
|
|
|
|
// Version returns the sync protocol version
|
|
func (p *Protocol) Version() *version.Version {
|
|
return MyVersion
|
|
}
|
|
|
|
// IsBeaconNode returns true if it is a beacon chain node
|
|
func (p *Protocol) IsBeaconNode() bool {
|
|
return p.beaconNode
|
|
}
|
|
|
|
// Match checks the compatibility to the target protocol ID.
|
|
func (p *Protocol) Match(targetID string) bool {
|
|
target, err := sttypes.ProtoIDToProtoSpec(sttypes.ProtoID(targetID))
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if target.Service != serviceSpecifier {
|
|
return false
|
|
}
|
|
if target.NetworkType != p.config.Network {
|
|
return false
|
|
}
|
|
if target.ShardID != p.config.ShardID {
|
|
return false
|
|
}
|
|
if target.Version.LessThan(MinVersion) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// HandleStream is the stream handle function being registered to libp2p.
|
|
func (p *Protocol) HandleStream(raw libp2p_network.Stream) {
|
|
p.logger.Info().Str("stream", raw.ID()).Msg("handle new sync stream")
|
|
st := p.wrapStream(raw)
|
|
if err := p.sm.NewStream(st); err != nil {
|
|
// Possibly we have reach the hard limit of the stream
|
|
p.logger.Warn().Err(err).Str("stream ID", string(st.ID())).
|
|
Msg("failed to add new stream")
|
|
return
|
|
}
|
|
fmt.Println("Connected to", raw.Conn().RemotePeer().String(), "(", st.ProtoID(), ")", "my ID: ", raw.Conn().LocalPeer().String())
|
|
st.run()
|
|
}
|
|
|
|
func (p *Protocol) advertiseLoop() {
|
|
for {
|
|
sleep := p.advertise()
|
|
maxSleepTime := time.Duration(p.config.MaxAdvertiseWaitTime) * time.Minute
|
|
if sleep > maxSleepTime {
|
|
sleep = maxSleepTime
|
|
}
|
|
select {
|
|
case <-p.closeC:
|
|
return
|
|
case <-time.After(sleep):
|
|
}
|
|
}
|
|
}
|
|
|
|
// advertise will advertise all compatible protocol versions for helping nodes running low
|
|
// version
|
|
func (p *Protocol) advertise() time.Duration {
|
|
var nextWait time.Duration
|
|
|
|
pids := p.supportedProtoIDs()
|
|
for _, pid := range pids {
|
|
w, e := p.disc.Advertise(p.ctx, string(pid))
|
|
if e != nil {
|
|
p.logger.Warn().Err(e).Str("protocol", string(pid)).
|
|
Msg("cannot advertise sync protocol")
|
|
continue
|
|
}
|
|
if nextWait == 0 || nextWait > w {
|
|
nextWait = w
|
|
}
|
|
}
|
|
if nextWait < minAdvertiseInterval {
|
|
nextWait = minAdvertiseInterval
|
|
}
|
|
return nextWait
|
|
}
|
|
|
|
func (p *Protocol) supportedProtoIDs() []sttypes.ProtoID {
|
|
vs := p.supportedVersions()
|
|
|
|
pids := make([]sttypes.ProtoID, 0, len(vs))
|
|
for _, v := range vs {
|
|
pids = append(pids, p.protoIDByVersion(v))
|
|
// beacon node needs to inform shard nodes about it supports them as well for EpochChain
|
|
// basically beacon node can accept connection from shard nodes to share last epoch blocks
|
|
if p.beaconNode {
|
|
pids = append(pids, p.protoIDByVersionForShardNodes(v))
|
|
}
|
|
}
|
|
return pids
|
|
}
|
|
|
|
func (p *Protocol) supportedVersions() []*version.Version {
|
|
return []*version.Version{version100}
|
|
}
|
|
|
|
func (p *Protocol) protoIDByVersion(v *version.Version) sttypes.ProtoID {
|
|
spec := sttypes.ProtoSpec{
|
|
Service: serviceSpecifier,
|
|
NetworkType: p.config.Network,
|
|
ShardID: p.config.ShardID,
|
|
Version: v,
|
|
BeaconNode: p.beaconNode,
|
|
}
|
|
return spec.ToProtoID()
|
|
}
|
|
|
|
func (p *Protocol) protoIDByVersionForShardNodes(v *version.Version) sttypes.ProtoID {
|
|
spec := sttypes.ProtoSpec{
|
|
Service: serviceSpecifier,
|
|
NetworkType: p.config.Network,
|
|
ShardID: p.config.ShardID,
|
|
Version: v,
|
|
BeaconNode: false,
|
|
}
|
|
return spec.ToProtoID()
|
|
}
|
|
|
|
// RemoveStream removes the stream of the given stream ID
|
|
// TODO: add reason to parameters
|
|
func (p *Protocol) RemoveStream(stID sttypes.StreamID) {
|
|
st, exist := p.sm.GetStreamByID(stID)
|
|
if exist && st != nil {
|
|
//TODO: log this incident with reason
|
|
st.Close()
|
|
// stream manager removes this stream from the list and triggers discovery if number of streams are not enough
|
|
p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed
|
|
}
|
|
}
|
|
|
|
func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) {
|
|
st, exist := p.sm.GetStreamByID(stID)
|
|
if exist && st != nil {
|
|
st.AddFailedTimes()
|
|
p.logger.Info().
|
|
Str("stream ID", string(st.ID())).
|
|
Int("num failures", st.FailedTimes()).
|
|
Str("reason", reason).
|
|
Msg("stream failed")
|
|
if st.FailedTimes() >= MaxStreamFailures {
|
|
st.Close()
|
|
// stream manager removes this stream from the list and triggers discovery if number of streams are not enough
|
|
p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed
|
|
p.logger.Warn().
|
|
Str("stream ID", string(st.ID())).
|
|
Msg("stream removed")
|
|
}
|
|
}
|
|
}
|
|
|
|
// NumStreams return the streams with minimum version.
|
|
// Note: nodes with sync version smaller than minVersion is not counted.
|
|
func (p *Protocol) NumStreams() int {
|
|
res := 0
|
|
sts := p.sm.GetStreams()
|
|
|
|
for _, st := range sts {
|
|
ps, _ := st.ProtoSpec()
|
|
if ps.Version.GreaterThanOrEqual(MinVersion) {
|
|
res++
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
// GetStreamManager get the underlying stream manager for upper level stream operations
|
|
func (p *Protocol) GetStreamManager() streammanager.StreamManager {
|
|
return p.sm
|
|
}
|
|
|
|
// SubscribeAddStreamEvent subscribe the stream add event
|
|
func (p *Protocol) SubscribeAddStreamEvent(ch chan<- streammanager.EvtStreamAdded) event.Subscription {
|
|
return p.sm.SubscribeAddStreamEvent(ch)
|
|
}
|
|
|