Add pubsub-based multicast, first pass

Should be good enough for interface and strawman implementation review.
pull/392/head
Eugene Kim 6 years ago
parent 15d3693b49
commit 52f6600b39
  1. 32
      p2p/group.go
  2. 9
      p2p/host.go
  3. 58
      p2p/host/hostv2/hostv2.go

@ -0,0 +1,32 @@
package p2p
import (
"context"
"fmt"
"io"
peer "github.com/libp2p/go-libp2p-peer"
)
// GroupID is a multicast group ID.
//
// It is a binary string,
// conducive to layering and scoped generation using cryptographic hash.
//
// Applications define their own group ID, without central allocation.
// A cryptographically secure random string of enough length – 32 bytes for
// example – may be used.
type GroupID string
func (id GroupID) String() string {
return fmt.Sprintf("%x", string(id))
}
// GroupReceiver is a multicast group message receiver interface.
type GroupReceiver interface {
// Close closes this receiver.
io.Closer
// Receive a message.
Receive(ctx context.Context) (msg []byte, sender peer.ID, err error)
}

@ -14,4 +14,13 @@ type Host interface {
AddPeer(*Peer) error
GetID() peer.ID
GetP2PHost() p2p_host.Host
// SendMessageToGroups sends a message to one or more multicast groups.
SendMessageToGroups(groups []GroupID, msg []byte) error
// GroupReceiver returns a receiver of messages sent to a multicast group.
// Each call creates a new receiver.
// If multiple receivers are created for the same group,
// a message sent to the group will be delivered to all of the receivers.
GroupReceiver(GroupID) (receiver GroupReceiver, err error)
}

@ -5,6 +5,8 @@ import (
"fmt"
"io"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p"
libp2p "github.com/libp2p/go-libp2p"
@ -28,10 +30,59 @@ const (
// HostV2 is the version 2 p2p host
type HostV2 struct {
h p2p_host.Host
pubsub *pubsub.PubSub
self p2p.Peer
priKey p2p_crypto.PrivKey
}
// SendMessageToGroups sends a message to one or more multicast groups.
func (host *HostV2) SendMessageToGroups(groups []p2p.GroupID, msg []byte) error {
var error error
for _, group := range groups {
err := host.pubsub.Publish(string(group), msg)
if err != nil {
error = err
}
}
return error
}
// GroupReceiver is a multicast group receiver implementation.
type GroupReceiver struct {
sub *pubsub.Subscription
}
// Close closes this receiver.
func (r *GroupReceiver) Close() error {
r.sub.Cancel()
r.sub = nil
return nil
}
// Receive receives a message.
func (r *GroupReceiver) Receive(ctx context.Context) (
msg []byte, sender peer.ID, err error,
) {
m, err := r.sub.Next(ctx)
if err == nil {
msg = m.Data
sender = peer.ID(m.From)
}
return msg, sender, err
}
// GroupReceiver returns a receiver of messages sent to a multicast group.
// See the GroupReceiver interface for details.
func (host *HostV2) GroupReceiver(group p2p.GroupID) (
receiver p2p.GroupReceiver, err error,
) {
sub, err := host.pubsub.Subscribe(string(group))
if err != nil {
return nil, err
}
return &GroupReceiver{sub: sub}, nil
}
// AddPeer add p2p.Peer into Peerstore
func (host *HostV2) AddPeer(p *p2p.Peer) error {
if p.PeerID != "" && len(p.Addrs) != 0 {
@ -73,10 +124,14 @@ func New(self *p2p.Peer, priKey p2p_crypto.PrivKey, opts ...p2p_config.Option) *
log.Error("New MA Error", "IP", self.IP, "Port", self.Port)
return nil
}
p2pHost, err := libp2p.New(context.Background(),
// TODO – use WithCancel for orderly host teardown (which we don't have yet)
ctx := context.Background()
p2pHost, err := libp2p.New(ctx,
append(opts, libp2p.ListenAddrs(listenAddr), libp2p.Identity(priKey))...,
)
catchError(err)
pubsub, err := pubsub.NewGossipSub(ctx, p2pHost)
catchError(err)
self.PeerID = p2pHost.ID()
@ -85,6 +140,7 @@ func New(self *p2p.Peer, priKey p2p_crypto.PrivKey, opts ...p2p_config.Option) *
// has to save the private key for host
h := &HostV2{
h: p2pHost,
pubsub: pubsub,
self: *self,
priKey: priKey,
}

Loading…
Cancel
Save