[P2P] Added functionality to Host for stream, replaced networkInfo service. (#3534)
1. Added some functionality to HostV2 for stream support. 2. Added new module discovery under p2p host to replace the networkInfo service. 3. Make dht datastore badger only active on bootstrap nodes. For normal nodes, badger is disabled by default. It can also be enabled by flags. Co-authored-by: Rongjian Lan <rongjian.lan@gmail.com>pull/3551/head
parent
84616a7cfc
commit
3d6858aba5
@ -1,300 +0,0 @@ |
||||
package networkinfo |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"net" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
msg_pb "github.com/harmony-one/harmony/api/proto/message" |
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
badger "github.com/ipfs/go-ds-badger" |
||||
coredis "github.com/libp2p/go-libp2p-core/discovery" |
||||
libp2p_peer "github.com/libp2p/go-libp2p-core/peer" |
||||
libp2pdis "github.com/libp2p/go-libp2p-discovery" |
||||
libp2pdht "github.com/libp2p/go-libp2p-kad-dht" |
||||
libp2pdhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" |
||||
madns "github.com/multiformats/go-multiaddr-dns" |
||||
manet "github.com/multiformats/go-multiaddr-net" |
||||
"github.com/pkg/errors" |
||||
) |
||||
|
||||
// Service is the network info service.
|
||||
type Service struct { |
||||
Host p2p.Host |
||||
Rendezvous nodeconfig.GroupID |
||||
bootnodes p2p.AddrList |
||||
dht *libp2pdht.IpfsDHT |
||||
cancel context.CancelFunc |
||||
stopChan chan struct{} |
||||
stoppedChan chan struct{} |
||||
peerChan chan p2p.Peer |
||||
peerInfo <-chan libp2p_peer.AddrInfo |
||||
discovery *libp2pdis.RoutingDiscovery |
||||
messageChan chan *msg_pb.Message |
||||
started bool |
||||
} |
||||
|
||||
// ConnectionRetry set the number of retry of connection to bootnode in case the initial connection is failed
|
||||
var ( |
||||
// retry for 30s and give up then
|
||||
ConnectionRetry = 15 |
||||
) |
||||
|
||||
const ( |
||||
waitInRetry = 5 * time.Second |
||||
connectionTimeout = 3 * time.Minute |
||||
|
||||
minFindPeerInterval = 5 // initial find peer interval during bootstrap
|
||||
maxFindPeerInterval = 1800 // max find peer interval, every 30 minutes
|
||||
|
||||
// register to bootnode every ticker
|
||||
dhtTicker = 6 * time.Hour |
||||
|
||||
discoveryLimit = 32 |
||||
) |
||||
|
||||
// New returns role conversion service. If dataStorePath is not empty, it
|
||||
// points to a persistent database directory to use.
|
||||
func New( |
||||
h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer, |
||||
bootnodes p2p.AddrList, dataStorePath string, |
||||
) (*Service, error) { |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
var dhtOpts []libp2pdhtopts.Option |
||||
if dataStorePath != "" { |
||||
dataStore, err := badger.NewDatastore(dataStorePath, nil) |
||||
if err != nil { |
||||
return nil, errors.Wrapf(err, |
||||
"cannot open Badger datastore at %s", dataStorePath) |
||||
} |
||||
utils.Logger().Info(). |
||||
Str("dataStorePath", dataStorePath). |
||||
Msg("backing DHT with Badger datastore") |
||||
dhtOpts = append(dhtOpts, libp2pdhtopts.Datastore(dataStore)) |
||||
} |
||||
|
||||
dht, err := libp2pdht.New(ctx, h.GetP2PHost(), dhtOpts...) |
||||
if err != nil { |
||||
return nil, errors.Wrapf(err, "cannot create DHT") |
||||
} |
||||
|
||||
return &Service{ |
||||
Host: h, |
||||
dht: dht, |
||||
Rendezvous: rendezvous, |
||||
cancel: cancel, |
||||
stopChan: make(chan struct{}), |
||||
stoppedChan: make(chan struct{}), |
||||
peerChan: peerChan, |
||||
bootnodes: bootnodes, |
||||
discovery: nil, |
||||
started: false, |
||||
}, nil |
||||
} |
||||
|
||||
// MustNew is a panic-on-error version of New.
|
||||
func MustNew( |
||||
h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer, |
||||
bootnodes p2p.AddrList, dataStorePath string, |
||||
) *Service { |
||||
service, err := New(h, rendezvous, peerChan, bootnodes, dataStorePath) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
return service |
||||
} |
||||
|
||||
// Start starts network info service.
|
||||
func (s *Service) Start() error { |
||||
err := s.Init() |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Service Init Failed") |
||||
return nil |
||||
} |
||||
s.Run() |
||||
s.started = true |
||||
return nil |
||||
} |
||||
|
||||
// Init initializes role conversion service.
|
||||
func (s *Service) Init() error { |
||||
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) |
||||
defer cancel() |
||||
utils.Logger().Info().Msg("Init networkinfo service") |
||||
|
||||
// Bootstrap the DHT. In the default configuration, this spawns a Background
|
||||
// thread that will refresh the peer table every five minutes.
|
||||
utils.Logger().Debug().Msg("Bootstrapping the DHT") |
||||
if err := s.dht.Bootstrap(ctx); err != nil { |
||||
return fmt.Errorf("error bootstrap dht: %s", err) |
||||
} |
||||
|
||||
var wg sync.WaitGroup |
||||
if s.bootnodes == nil { |
||||
// TODO: should've passed in bootnodes through constructor.
|
||||
s.bootnodes = p2p.BootNodes |
||||
} |
||||
|
||||
connected := false |
||||
var bnList p2p.AddrList |
||||
for _, maddr := range s.bootnodes { |
||||
if madns.Matches(maddr) { |
||||
mas, err := madns.Resolve(context.Background(), maddr) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Resolve bootnode") |
||||
continue |
||||
} |
||||
bnList = append(bnList, mas...) |
||||
} else { |
||||
bnList = append(bnList, maddr) |
||||
} |
||||
} |
||||
|
||||
for _, peerAddr := range bnList { |
||||
peerinfo, _ := libp2p_peer.AddrInfoFromP2pAddr(peerAddr) |
||||
wg.Add(1) |
||||
go func() { |
||||
defer wg.Done() |
||||
for i := 0; i < ConnectionRetry; i++ { |
||||
if err := s.Host.GetP2PHost().Connect(ctx, *peerinfo); err != nil { |
||||
utils.Logger().Warn().Err(err).Int("try", i).Msg("can't connect to bootnode") |
||||
time.Sleep(waitInRetry) |
||||
} else { |
||||
utils.Logger().Info().Int("try", i).Interface("node", *peerinfo).Msg("connected to bootnode") |
||||
// it is okay if any bootnode is connected
|
||||
connected = true |
||||
break |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
wg.Wait() |
||||
|
||||
if !connected { |
||||
return fmt.Errorf("[FATAL] error connecting to bootnodes") |
||||
} |
||||
|
||||
// We use a rendezvous point "shardID" to announce our location.
|
||||
utils.Logger().Info().Str("Rendezvous", string(s.Rendezvous)).Msg("Announcing ourselves...") |
||||
s.discovery = libp2pdis.NewRoutingDiscovery(s.dht) |
||||
libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) |
||||
utils.Logger().Info().Msg("Successfully announced!") |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Run runs network info.
|
||||
func (s *Service) Run() { |
||||
defer close(s.stoppedChan) |
||||
if s.discovery == nil { |
||||
utils.Logger().Error().Msg("discovery is not initialized") |
||||
return |
||||
} |
||||
|
||||
go s.DoService() |
||||
} |
||||
|
||||
// DoService does network info.
|
||||
func (s *Service) DoService() { |
||||
tick := time.NewTicker(dhtTicker) |
||||
defer tick.Stop() |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
defer cancel() |
||||
peerInterval := minFindPeerInterval |
||||
intervalTick := time.NewTicker(time.Duration(peerInterval) * time.Second) |
||||
defer intervalTick.Stop() |
||||
for { |
||||
select { |
||||
case <-s.stopChan: |
||||
return |
||||
case <-tick.C: |
||||
libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) |
||||
utils.Logger().Info(). |
||||
Str("Rendezvous", string(s.Rendezvous)). |
||||
Msg("Successfully announced!") |
||||
case <-intervalTick.C: |
||||
var err error |
||||
s.peerInfo, err = s.discovery.FindPeers( |
||||
ctx, string(s.Rendezvous), coredis.Limit(discoveryLimit), |
||||
) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("FindPeers") |
||||
return |
||||
} |
||||
if peerInterval < maxFindPeerInterval { |
||||
peerInterval *= 2 |
||||
intervalTick.Stop() |
||||
intervalTick = time.NewTicker(time.Duration(peerInterval) * time.Second) |
||||
} |
||||
|
||||
go s.findPeers(ctx) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (s *Service) findPeers(ctx context.Context) { |
||||
_, cgnPrefix, err := net.ParseCIDR("100.64.0.0/10") |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("can't parse CIDR") |
||||
return |
||||
} |
||||
for peer := range s.peerInfo { |
||||
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { |
||||
if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil { |
||||
utils.Logger().Warn().Err(err).Interface("peer", peer).Msg("can't connect to peer node") |
||||
// break if the node can't connect to peers, waiting for another peer
|
||||
break |
||||
} else { |
||||
utils.Logger().Info().Interface("peer", peer).Msg("connected to peer node") |
||||
} |
||||
// figure out the public ip/port
|
||||
var ip, port string |
||||
|
||||
for _, addr := range peer.Addrs { |
||||
netaddr, err := manet.ToNetAddr(addr) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
nip := netaddr.(*net.TCPAddr).IP |
||||
if (nip.IsGlobalUnicast() && !utils.IsPrivateIP(nip)) || cgnPrefix.Contains(nip) { |
||||
ip = nip.String() |
||||
port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port) |
||||
break |
||||
} |
||||
} |
||||
p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs} |
||||
utils.Logger().Info().Interface("peer", p).Msg("Notify peerChan") |
||||
if s.peerChan != nil { |
||||
s.peerChan <- p |
||||
} |
||||
} |
||||
} |
||||
|
||||
utils.Logger().Info().Msg("PeerInfo Channel Closed") |
||||
} |
||||
|
||||
// Stop stops network info service.
|
||||
func (s *Service) Stop() error { |
||||
utils.Logger().Info().Msg("Stopping network info service") |
||||
defer s.cancel() |
||||
|
||||
if !s.started { |
||||
utils.Logger().Info().Msg("Service didn't started. Exit") |
||||
return nil |
||||
} |
||||
|
||||
s.stopChan <- struct{}{} |
||||
<-s.stoppedChan |
||||
utils.Logger().Info().Msg("Network info service stopped") |
||||
return nil |
||||
} |
||||
|
||||
// APIs for the services.
|
||||
func (s *Service) APIs() []rpc.API { |
||||
return nil |
||||
} |
@ -1,37 +0,0 @@ |
||||
package networkinfo |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/crypto/bls" |
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
func TestService(t *testing.T) { |
||||
nodePriKey, _, err := utils.LoadKeyFromFile("/tmp/127.0.0.1.12345.key") |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
peerPriKey := bls.RandPrivateKey() |
||||
peerPubKey := peerPriKey.GetPublicKey() |
||||
if peerPriKey == nil || peerPubKey == nil { |
||||
t.Fatal("generate key error") |
||||
} |
||||
selfPeer := p2p.Peer{IP: "127.0.0.1", Port: "12345", ConsensusPubKey: peerPubKey} |
||||
host, err := p2p.NewHost(&selfPeer, nodePriKey) |
||||
if err != nil { |
||||
t.Fatal("unable to new host in harmony") |
||||
} |
||||
|
||||
s, err := New(host, nodeconfig.GroupIDBeaconClient, nil, nil, "") |
||||
if err != nil { |
||||
t.Fatalf("New() failed: %s", err) |
||||
} |
||||
|
||||
s.Start() |
||||
time.Sleep(2 * time.Second) |
||||
s.Stop() |
||||
} |
@ -0,0 +1,89 @@ |
||||
package discovery |
||||
|
||||
import ( |
||||
"context" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/libp2p/go-libp2p-core/discovery" |
||||
libp2p_host "github.com/libp2p/go-libp2p-core/host" |
||||
libp2p_peer "github.com/libp2p/go-libp2p-core/peer" |
||||
libp2p_dis "github.com/libp2p/go-libp2p-discovery" |
||||
libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" |
||||
"github.com/rs/zerolog" |
||||
) |
||||
|
||||
// Discovery is the interface for the underlying peer discovery protocol.
|
||||
// The interface is implemented by dhtDiscovery
|
||||
type Discovery interface { |
||||
Start() error |
||||
Close() error |
||||
Advertise(ctx context.Context, ns string) (time.Duration, error) |
||||
FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) |
||||
GetRawDiscovery() discovery.Discovery |
||||
} |
||||
|
||||
// dhtDiscovery is a wrapper of libp2p dht discovery service. It implements Discovery
|
||||
// interface.
|
||||
type dhtDiscovery struct { |
||||
dht *libp2p_dht.IpfsDHT |
||||
disc discovery.Discovery |
||||
host libp2p_host.Host |
||||
|
||||
opt DHTConfig |
||||
logger zerolog.Logger |
||||
ctx context.Context |
||||
cancel func() |
||||
} |
||||
|
||||
// NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface.
|
||||
func NewDHTDiscovery(host libp2p_host.Host, opt DHTConfig) (Discovery, error) { |
||||
opts, err := opt.getLibp2pRawOptions() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
dht, err := libp2p_dht.New(ctx, host, opts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
d := libp2p_dis.NewRoutingDiscovery(dht) |
||||
|
||||
logger := utils.Logger().With().Str("module", "discovery").Logger() |
||||
return &dhtDiscovery{ |
||||
dht: dht, |
||||
disc: d, |
||||
host: host, |
||||
opt: opt, |
||||
logger: logger, |
||||
ctx: ctx, |
||||
cancel: cancel, |
||||
}, nil |
||||
} |
||||
|
||||
// Start bootstrap the dht discovery service.
|
||||
func (d *dhtDiscovery) Start() error { |
||||
return d.dht.Bootstrap(d.ctx) |
||||
} |
||||
|
||||
// Stop stop the dhtDiscovery service
|
||||
func (d *dhtDiscovery) Close() error { |
||||
d.cancel() |
||||
return nil |
||||
} |
||||
|
||||
// Advertise advertises a service
|
||||
func (d *dhtDiscovery) Advertise(ctx context.Context, ns string) (time.Duration, error) { |
||||
return d.disc.Advertise(ctx, ns) |
||||
} |
||||
|
||||
// FindPeers discovers peers providing a service
|
||||
func (d *dhtDiscovery) FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) { |
||||
opt := discovery.Limit(peerLimit) |
||||
return d.disc.FindPeers(ctx, ns, opt) |
||||
} |
||||
|
||||
// GetRawDiscovery get the raw discovery to be used for libp2p pubsub options
|
||||
func (d *dhtDiscovery) GetRawDiscovery() discovery.Discovery { |
||||
return d.disc |
||||
} |
@ -0,0 +1,21 @@ |
||||
package discovery |
||||
|
||||
// TODO: test this module
|
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
|
||||
"github.com/libp2p/go-libp2p" |
||||
) |
||||
|
||||
func TestNewDHTDiscovery(t *testing.T) { |
||||
host, err := libp2p.New(context.Background()) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
_, err = NewDHTDiscovery(host, DHTConfig{}) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
} |
@ -0,0 +1,54 @@ |
||||
package discovery |
||||
|
||||
import ( |
||||
"github.com/pkg/errors" |
||||
|
||||
p2ptypes "github.com/harmony-one/harmony/p2p/types" |
||||
badger "github.com/ipfs/go-ds-badger" |
||||
libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" |
||||
) |
||||
|
||||
// DHTConfig is the configurable DHT options.
|
||||
// For normal nodes, only BootNodes field need to be specified.
|
||||
type DHTConfig struct { |
||||
BootNodes []string |
||||
DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes.
|
||||
} |
||||
|
||||
// getLibp2pRawOptions get the raw libp2p options as a slice.
|
||||
func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) { |
||||
var opts []libp2p_dht.Option |
||||
|
||||
bootOption, err := getBootstrapOption(opt.BootNodes) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
opts = append(opts, bootOption) |
||||
|
||||
if opt.DataStoreFile != nil && len(*opt.DataStoreFile) != 0 { |
||||
dsOption, err := getDataStoreOption(*opt.DataStoreFile) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
opts = append(opts, dsOption) |
||||
} |
||||
|
||||
return opts, nil |
||||
} |
||||
|
||||
func getBootstrapOption(bootNodes []string) (libp2p_dht.Option, error) { |
||||
resolved, err := p2ptypes.ResolveAndParseMultiAddrs(bootNodes) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to parse boot nodes") |
||||
} |
||||
return libp2p_dht.BootstrapPeers(resolved...), nil |
||||
} |
||||
|
||||
func getDataStoreOption(dataStoreFile string) (libp2p_dht.Option, error) { |
||||
ds, err := badger.NewDatastore(dataStoreFile, nil) |
||||
if err != nil { |
||||
return nil, errors.Wrapf(err, |
||||
"cannot open Badger data store at %s", dataStoreFile) |
||||
} |
||||
return libp2p_dht.Datastore(ds), nil |
||||
} |
@ -0,0 +1,85 @@ |
||||
package discovery |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"os" |
||||
"path/filepath" |
||||
"strings" |
||||
"testing" |
||||
) |
||||
|
||||
var ( |
||||
tmpDir = filepath.Join(os.TempDir(), "harmony-one", "harmony", "p2p", "discovery") |
||||
emptyFile = filepath.Join(tmpDir, "empty_file") |
||||
validPath = filepath.Join(tmpDir, "dht-1.1.1.1") |
||||
) |
||||
|
||||
var ( |
||||
testAddrStr = []string{ |
||||
"/ip4/52.40.84.2/tcp/9800/p2p/QmbPVwrqWsTYXq1RxGWcxx9SWaTUCfoo1wA6wmdbduWe29", |
||||
"/ip4/54.86.126.90/tcp/9800/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv", |
||||
} |
||||
) |
||||
|
||||
func init() { |
||||
os.RemoveAll(tmpDir) |
||||
os.MkdirAll(tmpDir, os.ModePerm) |
||||
|
||||
f, _ := os.Create(emptyFile) |
||||
f.Close() |
||||
} |
||||
|
||||
func TestDHTOption_getLibp2pRawOptions(t *testing.T) { |
||||
tests := []struct { |
||||
opt DHTConfig |
||||
expLen int |
||||
expErr error |
||||
}{ |
||||
{ |
||||
opt: DHTConfig{ |
||||
BootNodes: testAddrStr, |
||||
}, |
||||
expLen: 1, |
||||
}, |
||||
{ |
||||
opt: DHTConfig{ |
||||
BootNodes: testAddrStr, |
||||
DataStoreFile: &validPath, |
||||
}, |
||||
expLen: 2, |
||||
}, |
||||
{ |
||||
opt: DHTConfig{ |
||||
BootNodes: testAddrStr, |
||||
DataStoreFile: &emptyFile, |
||||
}, |
||||
expErr: errors.New("not a directory"), |
||||
}, |
||||
} |
||||
for i, test := range tests { |
||||
opts, err := test.opt.getLibp2pRawOptions() |
||||
if assErr := assertError(err, test.expErr); assErr != nil { |
||||
t.Errorf("Test %v: %v", i, assErr) |
||||
} |
||||
if err != nil || test.expErr != nil { |
||||
continue |
||||
} |
||||
if len(opts) != test.expLen { |
||||
t.Errorf("Test %v: unexpected option size %v / %v", i, len(opts), test.expLen) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func assertError(got, expect error) error { |
||||
if (got == nil) != (expect == nil) { |
||||
return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) |
||||
} |
||||
if (got == nil) || (expect == nil) { |
||||
return nil |
||||
} |
||||
if !strings.Contains(got.Error(), expect.Error()) { |
||||
return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,33 @@ |
||||
package sttypes |
||||
|
||||
import ( |
||||
p2ptypes "github.com/harmony-one/harmony/p2p/types" |
||||
"github.com/hashicorp/go-version" |
||||
libp2p_network "github.com/libp2p/go-libp2p-core/network" |
||||
) |
||||
|
||||
// Protocol is the interface of protocol to be registered to libp2p.
|
||||
type Protocol interface { |
||||
p2ptypes.LifeCycle |
||||
|
||||
Specifier() string |
||||
Version() *version.Version |
||||
ProtoID() ProtoID |
||||
Match(string) bool |
||||
HandleStream(st libp2p_network.Stream) |
||||
} |
||||
|
||||
// Request is the interface of a stream request used for common stream utils.
|
||||
type Request interface { |
||||
ReqID() uint64 |
||||
SetReqID(rid uint64) |
||||
String() string |
||||
IsSupportedByProto(ProtoSpec) bool |
||||
Encode() ([]byte, error) |
||||
} |
||||
|
||||
// Response is the interface of a stream response used for common stream utils
|
||||
type Response interface { |
||||
ReqID() uint64 |
||||
String() string |
||||
} |
@ -0,0 +1,78 @@ |
||||
package sttypes |
||||
|
||||
import ( |
||||
"io/ioutil" |
||||
"sync" |
||||
|
||||
libp2p_network "github.com/libp2p/go-libp2p-core/network" |
||||
) |
||||
|
||||
// Stream is the interface for streams implemented in each service.
|
||||
// The stream interface is used for stream management as well as rate limiters
|
||||
type Stream interface { |
||||
ID() StreamID |
||||
ProtoID() ProtoID |
||||
ProtoSpec() (ProtoSpec, error) |
||||
WriteBytes([]byte) error |
||||
ReadBytes() ([]byte, error) |
||||
Close() error // Make sure streams can handle multiple calls of Close
|
||||
} |
||||
|
||||
// BaseStream is the wrapper around
|
||||
type BaseStream struct { |
||||
raw libp2p_network.Stream |
||||
|
||||
// parse protocol spec fields
|
||||
spec ProtoSpec |
||||
specErr error |
||||
specOnce sync.Once |
||||
} |
||||
|
||||
// NewBaseStream creates BaseStream as the wrapper of libp2p Stream
|
||||
func NewBaseStream(st libp2p_network.Stream) *BaseStream { |
||||
return &BaseStream{ |
||||
raw: st, |
||||
} |
||||
} |
||||
|
||||
// StreamID is the unique identifier for the stream. It has the value of
|
||||
// libp2p_network.Stream.ID()
|
||||
type StreamID string |
||||
|
||||
// Meta return the StreamID of the stream
|
||||
func (st *BaseStream) ID() StreamID { |
||||
return StreamID(st.raw.ID()) |
||||
} |
||||
|
||||
// ProtoID return the remote protocol ID of the stream
|
||||
func (st *BaseStream) ProtoID() ProtoID { |
||||
return ProtoID(st.raw.Protocol()) |
||||
} |
||||
|
||||
// ProtoSpec get the parsed protocol Specifier of the stream
|
||||
func (st *BaseStream) ProtoSpec() (ProtoSpec, error) { |
||||
st.specOnce.Do(func() { |
||||
st.spec, st.specErr = ProtoIDToProtoSpec(st.ProtoID()) |
||||
}) |
||||
return st.spec, st.specErr |
||||
} |
||||
|
||||
// Close close the stream on both sides.
|
||||
func (st *BaseStream) Close() error { |
||||
return st.raw.Reset() |
||||
} |
||||
|
||||
// WriteBytes write the bytes to the stream
|
||||
func (st *BaseStream) WriteBytes(b []byte) error { |
||||
_, err := st.raw.Write(b) |
||||
return err |
||||
} |
||||
|
||||
// ReadMsg read the bytes from the stream
|
||||
func (st *BaseStream) ReadBytes() ([]byte, error) { |
||||
b, err := ioutil.ReadAll(st.raw) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return b, nil |
||||
} |
@ -0,0 +1,91 @@ |
||||
package sttypes |
||||
|
||||
// TODO: test this file
|
||||
|
||||
import ( |
||||
"crypto/rand" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
"github.com/hashicorp/go-version" |
||||
libp2p_proto "github.com/libp2p/go-libp2p-core/protocol" |
||||
"github.com/pkg/errors" |
||||
) |
||||
|
||||
const ( |
||||
// ProtoIDCommonPrefix is the common prefix for stream protocol
|
||||
ProtoIDCommonPrefix = "harmony" |
||||
|
||||
// ProtoIDFormat is the format of stream protocol ID
|
||||
ProtoIDFormat = "%s/%s/%s/%d/%s" |
||||
|
||||
// protoIDNumElem is the number of elements of the ProtoID. See comments in ProtoID
|
||||
protoIDNumElem = 5 |
||||
) |
||||
|
||||
// ProtoID is the protocol id for streaming, an alias of libp2p stream protocol ID。
|
||||
// The stream protocol ID is composed of following components:
|
||||
// 1. Service - Currently, only sync service is supported.
|
||||
// 2. NetworkType - mainnet, testnet, stn, e.t.c.
|
||||
// 3. ShardID - shard ID of the current protocol.
|
||||
// 4. Version - Stream protocol version for backward compatibility.
|
||||
type ProtoID libp2p_proto.ID |
||||
|
||||
// ProtoSpec is the un-serialized stream proto id specification
|
||||
// TODO: move this to service wise module since different protocol might have different
|
||||
// protoID information
|
||||
type ProtoSpec struct { |
||||
Service string |
||||
NetworkType nodeconfig.NetworkType |
||||
ShardID nodeconfig.ShardID |
||||
Version *version.Version |
||||
} |
||||
|
||||
// ToProtoID convert a ProtoSpec to ProtoID.
|
||||
func (spec ProtoSpec) ToProtoID() ProtoID { |
||||
s := fmt.Sprintf(ProtoIDFormat, ProtoIDCommonPrefix, spec.Service, |
||||
spec.NetworkType, spec.ShardID, spec.Version.String()) |
||||
return ProtoID(s) |
||||
} |
||||
|
||||
// ProtoIDToProtoSpec converts a ProtoID to ProtoSpec
|
||||
func ProtoIDToProtoSpec(id ProtoID) (ProtoSpec, error) { |
||||
comps := strings.Split(string(id), "/") |
||||
if len(comps) != protoIDNumElem { |
||||
return ProtoSpec{}, errors.New("unexpected protocol size") |
||||
} |
||||
var ( |
||||
prefix = comps[0] |
||||
service = comps[1] |
||||
networkType = comps[2] |
||||
shardIDStr = comps[3] |
||||
versionStr = comps[4] |
||||
) |
||||
shardID, err := strconv.Atoi(shardIDStr) |
||||
if err != nil { |
||||
return ProtoSpec{}, errors.Wrap(err, "invalid shard ID") |
||||
} |
||||
if prefix != ProtoIDCommonPrefix { |
||||
return ProtoSpec{}, errors.New("unexpected prefix") |
||||
} |
||||
version, err := version.NewVersion(versionStr) |
||||
if err != nil { |
||||
return ProtoSpec{}, errors.Wrap(err, "unexpected version string") |
||||
} |
||||
return ProtoSpec{ |
||||
Service: service, |
||||
NetworkType: nodeconfig.NetworkType(networkType), |
||||
ShardID: nodeconfig.ShardID(uint32(shardID)), |
||||
Version: version, |
||||
}, nil |
||||
} |
||||
|
||||
// GenReqID generates a random ReqID
|
||||
func GenReqID() uint64 { |
||||
var rnd [8]byte |
||||
rand.Read(rnd[:]) |
||||
return binary.BigEndian.Uint64(rnd[:]) |
||||
} |
@ -0,0 +1,10 @@ |
||||
package sttypes |
||||
|
||||
import "testing" |
||||
|
||||
func BenchmarkProtoIDToProtoSpec(b *testing.B) { |
||||
stid := ProtoID("harmony/sync/unitest/0/1.0.1") |
||||
for i := 0; i != b.N; i++ { |
||||
ProtoIDToProtoSpec(stid) |
||||
} |
||||
} |
@ -0,0 +1,7 @@ |
||||
package p2ptypes |
||||
|
||||
// LifeCycle is the interface of module supports Start and Close
|
||||
type LifeCycle interface { |
||||
Start() |
||||
Close() |
||||
} |
@ -0,0 +1,97 @@ |
||||
package p2ptypes |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"strings" |
||||
"time" |
||||
|
||||
libp2p_peer "github.com/libp2p/go-libp2p-core/peer" |
||||
ma "github.com/multiformats/go-multiaddr" |
||||
madns "github.com/multiformats/go-multiaddr-dns" |
||||
) |
||||
|
||||
// AddrList is a list of multi address
|
||||
type AddrList []ma.Multiaddr |
||||
|
||||
// String is a function to print a string representation of the AddrList
|
||||
func (al *AddrList) String() string { |
||||
strs := make([]string, len(*al)) |
||||
for i, addr := range *al { |
||||
strs[i] = addr.String() |
||||
} |
||||
return strings.Join(strs, ",") |
||||
} |
||||
|
||||
// Set is a function to set the value of AddrList based on a string
|
||||
func (al *AddrList) Set(value string) error { |
||||
if len(*al) > 0 { |
||||
return fmt.Errorf("AddrList is already set") |
||||
} |
||||
for _, a := range strings.Split(value, ",") { |
||||
addr, err := ma.NewMultiaddr(a) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
*al = append(*al, addr) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// StringsToMultiAddrs convert a list of strings to a list of multiaddresses
|
||||
func StringsToMultiAddrs(addrStrings []string) (maddrs []ma.Multiaddr, err error) { |
||||
for _, addrString := range addrStrings { |
||||
addr, err := ma.NewMultiaddr(addrString) |
||||
if err != nil { |
||||
return maddrs, err |
||||
} |
||||
maddrs = append(maddrs, addr) |
||||
} |
||||
return |
||||
} |
||||
|
||||
// ResolveAndParseMultiAddrs resolve the DNS multi peer and parse to libp2p AddrInfo
|
||||
func ResolveAndParseMultiAddrs(addrStrings []string) ([]libp2p_peer.AddrInfo, error) { |
||||
var res []libp2p_peer.AddrInfo |
||||
for _, addrStr := range addrStrings { |
||||
ais, err := resolveMultiAddrString(addrStr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
res = append(res, ais...) |
||||
} |
||||
return res, nil |
||||
} |
||||
|
||||
func resolveMultiAddrString(addrStr string) ([]libp2p_peer.AddrInfo, error) { |
||||
var ais []libp2p_peer.AddrInfo |
||||
|
||||
mAddr, err := ma.NewMultiaddr(addrStr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
mAddrs, err := resolveMultiAddr(mAddr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
for _, mAddr := range mAddrs { |
||||
ai, err := libp2p_peer.AddrInfoFromP2pAddr(mAddr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
ais = append(ais, *ai) |
||||
} |
||||
return ais, nil |
||||
} |
||||
|
||||
func resolveMultiAddr(raw ma.Multiaddr) ([]ma.Multiaddr, error) { |
||||
if madns.Matches(raw) { |
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) |
||||
mas, err := madns.Resolve(ctx, raw) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return mas, nil |
||||
} |
||||
return []ma.Multiaddr{raw}, nil |
||||
} |
@ -0,0 +1,45 @@ |
||||
package p2ptypes |
||||
|
||||
import ( |
||||
"strings" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
var testAddrs = []string{ |
||||
"/ip4/54.86.126.90/tcp/9850/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv", |
||||
"/ip4/52.40.84.2/tcp/9850/p2p/QmbPVwrqWsTYXq1RxGWcxx9SWaTUCfoo1wA6wmdbduWe29", |
||||
} |
||||
|
||||
func TestMultiAddressParsing(t *testing.T) { |
||||
multiAddresses, err := StringsToMultiAddrs(testAddrs) |
||||
assert.NoError(t, err) |
||||
assert.Equal(t, len(testAddrs), len(multiAddresses)) |
||||
|
||||
for index, multiAddress := range multiAddresses { |
||||
assert.Equal(t, multiAddress.String(), testAddrs[index]) |
||||
} |
||||
} |
||||
|
||||
func TestAddressListConversionToString(t *testing.T) { |
||||
multiAddresses, err := StringsToMultiAddrs(testAddrs) |
||||
assert.NoError(t, err) |
||||
assert.Equal(t, len(testAddrs), len(multiAddresses)) |
||||
|
||||
expected := strings.Join(testAddrs[:], ",") |
||||
var addressList AddrList = multiAddresses |
||||
assert.Equal(t, expected, addressList.String()) |
||||
} |
||||
|
||||
func TestAddressListConversionFromString(t *testing.T) { |
||||
multiAddresses, err := StringsToMultiAddrs(testAddrs) |
||||
assert.NoError(t, err) |
||||
assert.Equal(t, len(testAddrs), len(multiAddresses)) |
||||
|
||||
addressString := strings.Join(testAddrs[:], ",") |
||||
var addressList AddrList = multiAddresses |
||||
addressList.Set(addressString) |
||||
assert.Equal(t, len(addressList), len(multiAddresses)) |
||||
assert.Equal(t, addressList[0], multiAddresses[0]) |
||||
} |
@ -0,0 +1,8 @@ |
||||
package p2ptypes |
||||
|
||||
import ( |
||||
libp2p_peer "github.com/libp2p/go-libp2p-core/peer" |
||||
) |
||||
|
||||
// PeerID is the alias for libp2p peer ID
|
||||
type PeerID libp2p_peer.ID |
Loading…
Reference in new issue