Use topic-based libp2p pubsub interface

https://github.com/libp2p/go-libp2p-pubsub/releases/tag/v0.2.0: “The old
pubsub.Publish and pubsub.Subscribe methods are still usable, but have
been deprecated”
pull/1862/head
Eugene Kim 5 years ago
parent a649369e80
commit ff2e0072c9
  1. 82
      p2p/host/hostv2/hostv2.go
  2. 83
      p2p/host/hostv2/hostv2_mock_for_test.go
  3. 258
      p2p/host/hostv2/hostv2_test.go

@ -7,6 +7,7 @@ import (
"fmt"
"sync"
"github.com/pkg/errors"
"github.com/rs/zerolog"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -33,16 +34,45 @@ const (
//numOutgoing = 16
)
// pubsub captures the pubsub interface we expect from libp2p.
type pubsub interface {
Publish(topic string, data []byte) error
Subscribe(topic string, opts ...libp2p_pubsub.SubOpt) (*libp2p_pubsub.Subscription, error)
// topicHandle is a pubsub topic handle.
type topicHandle interface {
Publish(ctx context.Context, data []byte) error
Subscribe() (subscription, error)
}
type topicHandleImpl struct {
t *libp2p_pubsub.Topic
}
func (th topicHandleImpl) Publish(ctx context.Context, data []byte) error {
return th.t.Publish(ctx, data)
}
func (th topicHandleImpl) Subscribe() (subscription, error) {
return th.t.Subscribe()
}
type topicJoiner interface {
JoinTopic(topic string) (topicHandle, error)
}
type topicJoinerImpl struct {
pubsub *libp2p_pubsub.PubSub
}
func (tj topicJoinerImpl) JoinTopic(topic string) (topicHandle, error) {
th, err := tj.pubsub.Join(topic)
if err != nil {
return nil, err
}
return topicHandleImpl{th}, nil
}
// HostV2 is the version 2 p2p host
type HostV2 struct {
h libp2p_host.Host
pubsub pubsub
joiner topicJoiner
joined map[string]topicHandle
self p2p.Peer
priKey libp2p_crypto.PrivKey
lock sync.Mutex
@ -54,16 +84,36 @@ type HostV2 struct {
logger *zerolog.Logger
}
func (host *HostV2) getTopic(topic string) (topicHandle, error) {
host.lock.Lock()
defer host.lock.Unlock()
if t, ok := host.joined[topic]; ok {
return t, nil
} else if t, err := host.joiner.JoinTopic(topic); err != nil {
return nil, errors.Wrapf(err, "cannot join pubsub topic %x", topic)
} else {
host.joined[topic] = t
return t, nil
}
}
// SendMessageToGroups sends a message to one or more multicast groups.
func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error {
var error error
// It returns a nil error if and only if it has succeeded to schedule the given
// message for sending.
func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) (err error) {
for _, group := range groups {
err := host.pubsub.Publish(string(group), msg)
if err != nil {
error = err
t, e := host.getTopic(string(group))
if e != nil {
err = e
continue
}
e = t.Publish(context.Background(), msg)
if e != nil {
err = e
continue
}
}
return error
return err
}
// subscription captures the subscription interface we expect from libp2p.
@ -104,10 +154,14 @@ func (r *GroupReceiverImpl) Receive(ctx context.Context) (
func (host *HostV2) GroupReceiver(group nodeconfig.GroupID) (
receiver p2p.GroupReceiver, err error,
) {
sub, err := host.pubsub.Subscribe(string(group))
t, err := host.getTopic(string(group))
if err != nil {
return nil, err
}
sub, err := t.Subscribe()
if err != nil {
return nil, errors.Wrapf(err, "cannot subscribe to topic %x", group)
}
return &GroupReceiverImpl{sub: sub}, nil
}
@ -171,7 +225,6 @@ func New(self *p2p.Peer, priKey libp2p_crypto.PrivKey) *HostV2 {
)
catchError(err)
pubsub, err := libp2p_pubsub.NewGossipSub(ctx, p2pHost)
// pubsub, err := libp2p_pubsub.NewFloodSub(ctx, p2pHost)
catchError(err)
self.PeerID = p2pHost.ID()
@ -180,7 +233,8 @@ func New(self *p2p.Peer, priKey libp2p_crypto.PrivKey) *HostV2 {
// has to save the private key for host
h := &HostV2{
h: p2pHost,
pubsub: pubsub,
joiner: topicJoinerImpl{pubsub},
joined: map[string]topicHandle{},
self: *self,
priKey: priKey,
logger: &subLogger,

@ -11,61 +11,94 @@ import (
reflect "reflect"
)
// Mockpubsub is a mock of pubsub interface
type Mockpubsub struct {
// MocktopicHandle is a mock of topicHandle interface
type MocktopicHandle struct {
ctrl *gomock.Controller
recorder *MockpubsubMockRecorder
recorder *MocktopicHandleMockRecorder
}
// MockpubsubMockRecorder is the mock recorder for Mockpubsub
type MockpubsubMockRecorder struct {
mock *Mockpubsub
// MocktopicHandleMockRecorder is the mock recorder for MocktopicHandle
type MocktopicHandleMockRecorder struct {
mock *MocktopicHandle
}
// NewMockpubsub creates a new mock instance
func NewMockpubsub(ctrl *gomock.Controller) *Mockpubsub {
mock := &Mockpubsub{ctrl: ctrl}
mock.recorder = &MockpubsubMockRecorder{mock}
// NewMocktopicHandle creates a new mock instance
func NewMocktopicHandle(ctrl *gomock.Controller) *MocktopicHandle {
mock := &MocktopicHandle{ctrl: ctrl}
mock.recorder = &MocktopicHandleMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *Mockpubsub) EXPECT() *MockpubsubMockRecorder {
func (m *MocktopicHandle) EXPECT() *MocktopicHandleMockRecorder {
return m.recorder
}
// Publish mocks base method
func (m *Mockpubsub) Publish(topic string, data []byte) error {
func (m *MocktopicHandle) Publish(ctx context.Context, data []byte) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Publish", topic, data)
ret := m.ctrl.Call(m, "Publish", ctx, data)
ret0, _ := ret[0].(error)
return ret0
}
// Publish indicates an expected call of Publish
func (mr *MockpubsubMockRecorder) Publish(topic, data interface{}) *gomock.Call {
func (mr *MocktopicHandleMockRecorder) Publish(ctx, data interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*Mockpubsub)(nil).Publish), topic, data)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MocktopicHandle)(nil).Publish), ctx, data)
}
// Subscribe mocks base method
func (m *Mockpubsub) Subscribe(topic string, opts ...go_libp2p_pubsub.SubOpt) (*go_libp2p_pubsub.Subscription, error) {
func (m *MocktopicHandle) Subscribe() (subscription, error) {
m.ctrl.T.Helper()
varargs := []interface{}{topic}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Subscribe", varargs...)
ret0, _ := ret[0].(*go_libp2p_pubsub.Subscription)
ret := m.ctrl.Call(m, "Subscribe")
ret0, _ := ret[0].(subscription)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Subscribe indicates an expected call of Subscribe
func (mr *MockpubsubMockRecorder) Subscribe(topic interface{}, opts ...interface{}) *gomock.Call {
func (mr *MocktopicHandleMockRecorder) Subscribe() *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{topic}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*Mockpubsub)(nil).Subscribe), varargs...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MocktopicHandle)(nil).Subscribe))
}
// MocktopicJoiner is a mock of topicJoiner interface
type MocktopicJoiner struct {
ctrl *gomock.Controller
recorder *MocktopicJoinerMockRecorder
}
// MocktopicJoinerMockRecorder is the mock recorder for MocktopicJoiner
type MocktopicJoinerMockRecorder struct {
mock *MocktopicJoiner
}
// NewMocktopicJoiner creates a new mock instance
func NewMocktopicJoiner(ctrl *gomock.Controller) *MocktopicJoiner {
mock := &MocktopicJoiner{ctrl: ctrl}
mock.recorder = &MocktopicJoinerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MocktopicJoiner) EXPECT() *MocktopicJoinerMockRecorder {
return m.recorder
}
// JoinTopic mocks base method
func (m *MocktopicJoiner) JoinTopic(topic string) (topicHandle, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "JoinTopic", topic)
ret0, _ := ret[0].(topicHandle)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// JoinTopic indicates an expected call of JoinTopic
func (mr *MocktopicJoinerMockRecorder) JoinTopic(topic interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JoinTopic", reflect.TypeOf((*MocktopicJoiner)(nil).JoinTopic), topic)
}
// Mocksubscription is a mock of subscription interface

@ -7,7 +7,7 @@ import (
"testing"
"github.com/golang/mock/gomock"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2p_pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
@ -18,31 +18,76 @@ func TestHostV2_SendMessageToGroups(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
groups := []nodeconfig.GroupID{"ABC", "DEF"}
okTopic := NewMocktopicHandle(mc)
newTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"OK", "New"}
data := []byte{1, 2, 3}
pubsub := NewMockpubsub(mc)
joined := map[string]topicHandle{"OK": okTopic}
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: joined}
gomock.InOrder(
pubsub.EXPECT().Publish("ABC", data),
pubsub.EXPECT().Publish("DEF", data),
// okTopic is already in joined map, JoinTopic shouldn't be called
joiner.EXPECT().JoinTopic("OK").Times(0),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
// newTopic is not in joined map, JoinTopic should be called
joiner.EXPECT().JoinTopic("New").Return(newTopic, nil),
newTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
)
host := &HostV2{pubsub: pubsub}
if err := host.SendMessageToGroups(groups, data); err != nil {
err := host.SendMessageToGroups(groups, data)
if err != nil {
t.Errorf("expected no error; got %v", err)
}
})
t.Run("Error", func(t *testing.T) {
t.Run("JoinError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
okTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"Error", "OK"}
data := []byte{1, 2, 3}
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
// Make first join return an error
joiner.EXPECT().JoinTopic("Error").Return(nil, errors.New("join error")),
// Subsequent topics should still be processed after an error
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
)
err := host.SendMessageToGroups(groups, data)
if err == nil {
t.Error("expected an error; got nil")
}
})
t.Run("PublishError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
groups := []nodeconfig.GroupID{"ABC", "DEF"}
okTopic := NewMocktopicHandle(mc)
erringTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"Error", "OK"}
data := []byte{1, 2, 3}
pubsub := NewMockpubsub(mc)
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
pubsub.EXPECT().Publish("ABC", data).Return(errors.New("FIAL")),
pubsub.EXPECT().Publish("DEF", data), // Should not early-return
// Make first publish return an error
joiner.EXPECT().JoinTopic("Error").Return(erringTopic, nil),
erringTopic.EXPECT().Publish(context.TODO(), data).Return(errors.New("publish error")),
// Subsequent topics should still be processed after an error
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
)
host := &HostV2{pubsub: pubsub}
if err := host.SendMessageToGroups(groups, data); err == nil {
t.Error("expected an error but got none")
t.Error("expected an error; got nil")
}
})
}
@ -50,10 +95,14 @@ func TestHostV2_SendMessageToGroups(t *testing.T) {
func TestGroupReceiver_Close(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
sub := NewMocksubscription(mc)
sub.EXPECT().Cancel()
receiver := GroupReceiverImpl{sub: sub}
if err := receiver.Close(); err != nil {
err := receiver.Close()
if err != nil {
t.Errorf("expected no error but got %v", err)
}
}
@ -64,46 +113,71 @@ func pubsubMessage(from libp2p_peer.ID, data []byte) *libp2p_pubsub.Message {
}
func TestGroupReceiver_Receive(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
sub := NewMocksubscription(mc)
ctx := context.Background()
gomock.InOrder(
sub.EXPECT().Next(ctx).Return(pubsubMessage("ABC", []byte{1, 2, 3}), nil),
sub.EXPECT().Next(ctx).Return(pubsubMessage("DEF", []byte{4, 5, 6}), nil),
sub.EXPECT().Next(ctx).Return(nil, errors.New("FIAL")),
)
receiver := GroupReceiverImpl{sub: sub}
verify := func(sender libp2p_peer.ID, msg []byte, shouldError bool) {
t.Run("OK", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
ctx := context.Background()
sub := NewMocksubscription(mc)
receiver := GroupReceiverImpl{sub: sub}
wantSender := libp2p_peer.ID("OK")
wantMsg := []byte{1, 2, 3}
sub.EXPECT().Next(ctx).Return(pubsubMessage(wantSender, wantMsg), nil)
gotMsg, gotSender, err := receiver.Receive(ctx)
if (err != nil) != shouldError {
if shouldError {
t.Error("expected an error but got none")
} else {
t.Errorf("expected no error but got %v", err)
}
if err != nil {
t.Errorf("expected no error; got %v", err)
}
if gotSender != sender {
t.Errorf("expected sender %v but got %v", sender, gotSender)
if gotSender != wantSender {
t.Errorf("expected sender %v; got %v", wantSender, gotSender)
}
if !reflect.DeepEqual(gotMsg, msg) {
t.Errorf("expected message %v but got %v", msg, gotMsg)
if !reflect.DeepEqual(gotMsg, wantMsg) {
t.Errorf("expected message %v; got %v", wantMsg, gotMsg)
}
}
verify("ABC", []byte{1, 2, 3}, false)
verify("DEF", []byte{4, 5, 6}, false)
verify("", nil, true)
})
t.Run("Error", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
ctx := context.Background()
sub := NewMocksubscription(mc)
receiver := GroupReceiverImpl{sub: sub}
sub.EXPECT().Next(ctx).Return(nil, errors.New("receive error"))
msg, sender, err := receiver.Receive(ctx)
if err == nil {
t.Error("expected an error; got nil")
}
if sender != "" {
t.Errorf("expected empty sender; got %v", sender)
}
if len(msg) > 0 {
t.Errorf("expected empty message; got %v", msg)
}
})
}
func TestHostV2_GroupReceiver(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
t.Run("New", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
sub := &libp2p_pubsub.Subscription{}
pubsub := NewMockpubsub(mc)
pubsub.EXPECT().Subscribe("ABC").Return(sub, nil)
host := &HostV2{pubsub: pubsub}
topic := NewMocktopicHandle(mc)
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
topic.EXPECT().Subscribe().Return(sub, nil),
)
gotReceiver, err := host.GroupReceiver("ABC")
if r, ok := gotReceiver.(*GroupReceiverImpl); !ok {
t.Errorf("expected a hostv2 GroupReceiverImpl; got %v", gotReceiver)
} else if r.sub != sub {
@ -113,13 +187,39 @@ func TestHostV2_GroupReceiver(t *testing.T) {
t.Errorf("expected no error; got %v", err)
}
})
t.Run("Error", func(t *testing.T) {
t.Run("JoinError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("join error"))
gotReceiver, err := host.GroupReceiver("ABC")
if gotReceiver != nil {
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
}
if err == nil {
t.Error("expected an error; got none")
}
})
t.Run("SubscribeError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
pubsub := NewMockpubsub(mc)
pubsub.EXPECT().Subscribe("ABC").Return(nil, errors.New("FIAL"))
host := &HostV2{pubsub: pubsub}
topic := NewMocktopicHandle(mc)
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
topic.EXPECT().Subscribe().Return(nil, errors.New("subscription error")),
)
gotReceiver, err := host.GroupReceiver("ABC")
if gotReceiver != nil {
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
}
@ -135,3 +235,65 @@ func TestHostV2_GroupReceiver(t *testing.T) {
}
})
}
func TestHostV2_getTopic(t *testing.T) {
t.Run("NewOK", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
want := NewMocktopicHandle(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(want, nil)
got, err := host.getTopic("ABC")
if err != nil {
t.Errorf("want nil error; got %v", err)
}
if got != want {
t.Errorf("want topic handle %v; got %v", want, got)
}
if _, ok := host.joined["ABC"]; !ok {
t.Error("topic not found in joined map")
}
})
t.Run("NewError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("OMG"))
got, err := host.getTopic("ABC")
if err == nil {
t.Error("want non-nil error; got nil")
}
if got != nil {
t.Errorf("want nil handle; got %v", got)
}
})
t.Run("Existing", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
want := NewMocktopicHandle(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{"ABC": want}}
joiner.EXPECT().JoinTopic("ABC").Times(0)
got, err := host.getTopic("ABC")
if err != nil {
t.Errorf("want nil error; got %v", err)
}
if got != want {
t.Errorf("want topic handle %v; got %v", want, got)
}
})
}

Loading…
Cancel
Save