Revert "Merge pull request #1978 from LeoHChen/revert-libp2p"

This reverts commit 003b6c6ed5, reversing
changes made to efc0ae111c.
pull/2166/head
Leo Chen 5 years ago
parent 5e7ac9910c
commit af2661022f
  1. 25
      go.mod
  2. 64
      internal/utils/gomock_reflect_069400606/prog.go
  3. 64
      internal/utils/gomock_reflect_579506979/prog.go
  4. 83
      p2p/host/hostv2/hostv2.go
  5. 87
      p2p/host/hostv2/hostv2_mock_for_test.go
  6. 249
      p2p/host/hostv2/hostv2_test.go
  7. 1
      scripts/list_harmony_go_files.sh

@ -31,18 +31,20 @@ require (
github.com/ipfs/go-log v0.0.1 github.com/ipfs/go-log v0.0.1
github.com/karalabe/hid v1.0.0 // indirect github.com/karalabe/hid v1.0.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect
github.com/libp2p/go-libp2p v0.5.0 github.com/libp2p/go-libp2p v0.3.1
github.com/libp2p/go-libp2p-core v0.3.0 github.com/libp2p/go-libp2p-core v0.2.4
github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-discovery v0.2.0 github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/libp2p/go-libp2p-host v0.1.0 github.com/libp2p/go-libp2p-host v0.1.0
github.com/libp2p/go-libp2p-kad-dht v0.2.0 github.com/libp2p/go-libp2p-kad-dht v0.2.0
github.com/libp2p/go-libp2p-net v0.1.0 github.com/libp2p/go-libp2p-net v0.1.0
github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.4 github.com/libp2p/go-libp2p-peerstore v0.1.3
github.com/libp2p/go-libp2p-pubsub v0.1.1 github.com/libp2p/go-libp2p-pubsub v0.2.3
github.com/multiformats/go-multiaddr v0.2.0 github.com/libp2p/go-ws-transport v0.1.1 // indirect
github.com/multiformats/go-multiaddr-net v0.1.1 github.com/multiformats/go-multiaddr v0.1.1
github.com/multiformats/go-multiaddr-dns v0.1.1 // indirect
github.com/multiformats/go-multiaddr-net v0.1.0
github.com/natefinch/lumberjack v2.0.0+incompatible github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pborman/uuid v1.2.0 github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
@ -56,10 +58,15 @@ require (
github.com/spf13/cobra v0.0.5 github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0 github.com/stretchr/testify v1.4.0
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/whyrusleeping/go-logging v0.0.1 github.com/uber/jaeger-client-go v2.20.1+incompatible // indirect
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc
go.uber.org/atomic v1.5.1 // indirect
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
golang.org/x/lint v0.0.0-20190409202823-959b441ac422 golang.org/x/lint v0.0.0-20190930215403-16217165b5de
golang.org/x/tools v0.0.0-20191113232020-e2727e816f5a golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 // indirect
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/grpc v1.22.0 google.golang.org/grpc v1.22.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
gopkg.in/ini.v1 v1.51.0 gopkg.in/ini.v1 v1.51.0

@ -1,64 +0,0 @@
package main
import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"
"github.com/golang/mock/mockgen/model"
pkg_ "github.com/ethereum/go-ethereum/log"
)
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
func main() {
flag.Parse()
its := []struct {
sym string
typ reflect.Type
}{
{"Logger", reflect.TypeOf((*pkg_.Logger)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/ethereum/go-ethereum/log"),
}
for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}
outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}

@ -1,64 +0,0 @@
package main
import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"
"github.com/golang/mock/mockgen/model"
pkg_ "github.com/ethereum/go-ethereum/log"
)
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
func main() {
flag.Parse()
its := []struct {
sym string
typ reflect.Type
}{
{"Handler", reflect.TypeOf((*pkg_.Handler)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/ethereum/go-ethereum/log"),
}
for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}
outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}

@ -1,6 +1,6 @@
package hostv2 package hostv2
//go:generate mockgen -source hostv2.go -destination=mock/hostv2_mock.go //go:generate mockgen -source=hostv2.go -package=hostv2 -destination=hostv2_mock_for_test.go
import ( import (
"context" "context"
@ -34,16 +34,45 @@ const (
//numOutgoing = 16 //numOutgoing = 16
) )
// pubsub captures the pubsub interface we expect from libp2p. // topicHandle is a pubsub topic handle.
type pubsub interface { type topicHandle interface {
Publish(topic string, data []byte) error Publish(ctx context.Context, data []byte) error
Subscribe(topic string, opts ...libp2p_pubsub.SubOpt) (*libp2p_pubsub.Subscription, error) Subscribe() (subscription, error)
}
type topicHandleImpl struct {
t *libp2p_pubsub.Topic
}
func (th topicHandleImpl) Publish(ctx context.Context, data []byte) error {
return th.t.Publish(ctx, data)
}
func (th topicHandleImpl) Subscribe() (subscription, error) {
return th.t.Subscribe()
}
type topicJoiner interface {
JoinTopic(topic string) (topicHandle, error)
}
type topicJoinerImpl struct {
pubsub *libp2p_pubsub.PubSub
}
func (tj topicJoinerImpl) JoinTopic(topic string) (topicHandle, error) {
th, err := tj.pubsub.Join(topic)
if err != nil {
return nil, err
}
return topicHandleImpl{th}, nil
} }
// HostV2 is the version 2 p2p host // HostV2 is the version 2 p2p host
type HostV2 struct { type HostV2 struct {
h libp2p_host.Host h libp2p_host.Host
pubsub pubsub joiner topicJoiner
joined map[string]topicHandle
self p2p.Peer self p2p.Peer
priKey libp2p_crypto.PrivKey priKey libp2p_crypto.PrivKey
lock sync.Mutex lock sync.Mutex
@ -55,16 +84,36 @@ type HostV2 struct {
logger *zerolog.Logger logger *zerolog.Logger
} }
func (host *HostV2) getTopic(topic string) (topicHandle, error) {
host.lock.Lock()
defer host.lock.Unlock()
if t, ok := host.joined[topic]; ok {
return t, nil
} else if t, err := host.joiner.JoinTopic(topic); err != nil {
return nil, errors.Wrapf(err, "cannot join pubsub topic %x", topic)
} else {
host.joined[topic] = t
return t, nil
}
}
// SendMessageToGroups sends a message to one or more multicast groups. // SendMessageToGroups sends a message to one or more multicast groups.
func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error { // It returns a nil error if and only if it has succeeded to schedule the given
var error error // message for sending.
func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) (err error) {
for _, group := range groups { for _, group := range groups {
err := host.pubsub.Publish(string(group), msg) t, e := host.getTopic(string(group))
if err != nil { if e != nil {
error = err err = e
continue
}
e = t.Publish(context.Background(), msg)
if e != nil {
err = e
continue
} }
} }
return error return err
} }
// subscription captures the subscription interface we expect from libp2p. // subscription captures the subscription interface we expect from libp2p.
@ -105,10 +154,14 @@ func (r *GroupReceiverImpl) Receive(ctx context.Context) (
func (host *HostV2) GroupReceiver(group nodeconfig.GroupID) ( func (host *HostV2) GroupReceiver(group nodeconfig.GroupID) (
receiver p2p.GroupReceiver, err error, receiver p2p.GroupReceiver, err error,
) { ) {
sub, err := host.pubsub.Subscribe(string(group)) t, err := host.getTopic(string(group))
if err != nil { if err != nil {
return nil, err return nil, err
} }
sub, err := t.Subscribe()
if err != nil {
return nil, errors.Wrapf(err, "cannot subscribe to topic %x", group)
}
return &GroupReceiverImpl{sub: sub}, nil return &GroupReceiverImpl{sub: sub}, nil
} }
@ -174,7 +227,6 @@ func New(self *p2p.Peer, priKey libp2p_crypto.PrivKey) (*HostV2, error) {
return nil, errors.Wrapf(err, "cannot initialize libp2p host") return nil, errors.Wrapf(err, "cannot initialize libp2p host")
} }
pubsub, err := libp2p_pubsub.NewGossipSub(ctx, p2pHost) pubsub, err := libp2p_pubsub.NewGossipSub(ctx, p2pHost)
// pubsub, err := libp2p_pubsub.NewFloodSub(ctx, p2pHost)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "cannot initialize libp2p pubsub") return nil, errors.Wrapf(err, "cannot initialize libp2p pubsub")
} }
@ -185,7 +237,8 @@ func New(self *p2p.Peer, priKey libp2p_crypto.PrivKey) (*HostV2, error) {
// has to save the private key for host // has to save the private key for host
h := &HostV2{ h := &HostV2{
h: p2pHost, h: p2pHost,
pubsub: pubsub, joiner: topicJoinerImpl{pubsub},
joined: map[string]topicHandle{},
self: *self, self: *self,
priKey: priKey, priKey: priKey,
logger: &subLogger, logger: &subLogger,

@ -1,8 +1,8 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: hostv2.go // Source: hostv2.go
// Package mock_hostv2 is a generated GoMock package. // Package hostv2 is a generated GoMock package.
package mock_hostv2 package hostv2
import ( import (
context "context" context "context"
@ -11,61 +11,94 @@ import (
reflect "reflect" reflect "reflect"
) )
// Mockpubsub is a mock of pubsub interface // MocktopicHandle is a mock of topicHandle interface
type Mockpubsub struct { type MocktopicHandle struct {
ctrl *gomock.Controller ctrl *gomock.Controller
recorder *MockpubsubMockRecorder recorder *MocktopicHandleMockRecorder
} }
// MockpubsubMockRecorder is the mock recorder for Mockpubsub // MocktopicHandleMockRecorder is the mock recorder for MocktopicHandle
type MockpubsubMockRecorder struct { type MocktopicHandleMockRecorder struct {
mock *Mockpubsub mock *MocktopicHandle
} }
// NewMockpubsub creates a new mock instance // NewMocktopicHandle creates a new mock instance
func NewMockpubsub(ctrl *gomock.Controller) *Mockpubsub { func NewMocktopicHandle(ctrl *gomock.Controller) *MocktopicHandle {
mock := &Mockpubsub{ctrl: ctrl} mock := &MocktopicHandle{ctrl: ctrl}
mock.recorder = &MockpubsubMockRecorder{mock} mock.recorder = &MocktopicHandleMockRecorder{mock}
return mock return mock
} }
// EXPECT returns an object that allows the caller to indicate expected use // EXPECT returns an object that allows the caller to indicate expected use
func (m *Mockpubsub) EXPECT() *MockpubsubMockRecorder { func (m *MocktopicHandle) EXPECT() *MocktopicHandleMockRecorder {
return m.recorder return m.recorder
} }
// Publish mocks base method // Publish mocks base method
func (m *Mockpubsub) Publish(topic string, data []byte) error { func (m *MocktopicHandle) Publish(ctx context.Context, data []byte) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Publish", topic, data) ret := m.ctrl.Call(m, "Publish", ctx, data)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// Publish indicates an expected call of Publish // Publish indicates an expected call of Publish
func (mr *MockpubsubMockRecorder) Publish(topic, data interface{}) *gomock.Call { func (mr *MocktopicHandleMockRecorder) Publish(ctx, data interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*Mockpubsub)(nil).Publish), topic, data) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MocktopicHandle)(nil).Publish), ctx, data)
} }
// Subscribe mocks base method // Subscribe mocks base method
func (m *Mockpubsub) Subscribe(topic string, opts ...go_libp2p_pubsub.SubOpt) (*go_libp2p_pubsub.Subscription, error) { func (m *MocktopicHandle) Subscribe() (subscription, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{topic} ret := m.ctrl.Call(m, "Subscribe")
for _, a := range opts { ret0, _ := ret[0].(subscription)
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Subscribe", varargs...)
ret0, _ := ret[0].(*go_libp2p_pubsub.Subscription)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// Subscribe indicates an expected call of Subscribe // Subscribe indicates an expected call of Subscribe
func (mr *MockpubsubMockRecorder) Subscribe(topic interface{}, opts ...interface{}) *gomock.Call { func (mr *MocktopicHandleMockRecorder) Subscribe() *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{topic}, opts...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MocktopicHandle)(nil).Subscribe))
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*Mockpubsub)(nil).Subscribe), varargs...) }
// MocktopicJoiner is a mock of topicJoiner interface
type MocktopicJoiner struct {
ctrl *gomock.Controller
recorder *MocktopicJoinerMockRecorder
}
// MocktopicJoinerMockRecorder is the mock recorder for MocktopicJoiner
type MocktopicJoinerMockRecorder struct {
mock *MocktopicJoiner
}
// NewMocktopicJoiner creates a new mock instance
func NewMocktopicJoiner(ctrl *gomock.Controller) *MocktopicJoiner {
mock := &MocktopicJoiner{ctrl: ctrl}
mock.recorder = &MocktopicJoinerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MocktopicJoiner) EXPECT() *MocktopicJoinerMockRecorder {
return m.recorder
}
// JoinTopic mocks base method
func (m *MocktopicJoiner) JoinTopic(topic string) (topicHandle, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "JoinTopic", topic)
ret0, _ := ret[0].(topicHandle)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// JoinTopic indicates an expected call of JoinTopic
func (mr *MocktopicJoinerMockRecorder) JoinTopic(topic interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JoinTopic", reflect.TypeOf((*MocktopicJoiner)(nil).JoinTopic), topic)
} }
// Mocksubscription is a mock of subscription interface // Mocksubscription is a mock of subscription interface

@ -7,43 +7,87 @@ import (
"testing" "testing"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
libp2p_peer "github.com/libp2p/go-libp2p-peer" libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2p_pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" libp2p_pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
mock "github.com/harmony-one/harmony/p2p/host/hostv2/mock"
) )
func TestHostV2_SendMessageToGroups(t *testing.T) { func TestHostV2_SendMessageToGroups(t *testing.T) {
t.Run("Basic", func(t *testing.T) { t.Run("Basic", func(t *testing.T) {
mc := gomock.NewController(t) mc := gomock.NewController(t)
defer mc.Finish() defer mc.Finish()
groups := []nodeconfig.GroupID{"ABC", "DEF"}
okTopic := NewMocktopicHandle(mc)
newTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"OK", "New"}
data := []byte{1, 2, 3} data := []byte{1, 2, 3}
pubsub := mock.NewMockpubsub(mc) joined := map[string]topicHandle{"OK": okTopic}
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: joined}
gomock.InOrder( gomock.InOrder(
pubsub.EXPECT().Publish("ABC", data), // okTopic is already in joined map, JoinTopic shouldn't be called
pubsub.EXPECT().Publish("DEF", data), joiner.EXPECT().JoinTopic("OK").Times(0),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
// newTopic is not in joined map, JoinTopic should be called
joiner.EXPECT().JoinTopic("New").Return(newTopic, nil),
newTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
) )
host := &HostV2{pubsub: pubsub}
if err := host.SendMessageToGroups(groups, data); err != nil { err := host.SendMessageToGroups(groups, data)
if err != nil {
t.Errorf("expected no error; got %v", err) t.Errorf("expected no error; got %v", err)
} }
}) })
t.Run("Error", func(t *testing.T) { t.Run("JoinError", func(t *testing.T) {
mc := gomock.NewController(t) mc := gomock.NewController(t)
defer mc.Finish() defer mc.Finish()
groups := []nodeconfig.GroupID{"ABC", "DEF"}
okTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"Error", "OK"}
data := []byte{1, 2, 3} data := []byte{1, 2, 3}
pubsub := mock.NewMockpubsub(mc) joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder( gomock.InOrder(
pubsub.EXPECT().Publish("ABC", data).Return(errors.New("FIAL")), // Make first join return an error
pubsub.EXPECT().Publish("DEF", data), // Should not early-return joiner.EXPECT().JoinTopic("Error").Return(nil, errors.New("join error")),
// Subsequent topics should still be processed after an error
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
) )
host := &HostV2{pubsub: pubsub}
err := host.SendMessageToGroups(groups, data)
if err == nil {
t.Error("expected an error; got nil")
}
})
t.Run("PublishError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
okTopic := NewMocktopicHandle(mc)
erringTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"Error", "OK"}
data := []byte{1, 2, 3}
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
// Make first publish return an error
joiner.EXPECT().JoinTopic("Error").Return(erringTopic, nil),
erringTopic.EXPECT().Publish(context.TODO(), data).Return(errors.New("publish error")),
// Subsequent topics should still be processed after an error
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
)
if err := host.SendMessageToGroups(groups, data); err == nil { if err := host.SendMessageToGroups(groups, data); err == nil {
t.Error("expected an error but got none") t.Error("expected an error; got nil")
} }
}) })
} }
@ -51,10 +95,14 @@ func TestHostV2_SendMessageToGroups(t *testing.T) {
func TestGroupReceiver_Close(t *testing.T) { func TestGroupReceiver_Close(t *testing.T) {
mc := gomock.NewController(t) mc := gomock.NewController(t)
defer mc.Finish() defer mc.Finish()
sub := mock.NewMocksubscription(mc)
sub := NewMocksubscription(mc)
sub.EXPECT().Cancel() sub.EXPECT().Cancel()
receiver := GroupReceiverImpl{sub: sub} receiver := GroupReceiverImpl{sub: sub}
if err := receiver.Close(); err != nil {
err := receiver.Close()
if err != nil {
t.Errorf("expected no error but got %v", err) t.Errorf("expected no error but got %v", err)
} }
} }
@ -65,46 +113,71 @@ func pubsubMessage(from libp2p_peer.ID, data []byte) *libp2p_pubsub.Message {
} }
func TestGroupReceiver_Receive(t *testing.T) { func TestGroupReceiver_Receive(t *testing.T) {
t.Run("OK", func(t *testing.T) {
mc := gomock.NewController(t) mc := gomock.NewController(t)
defer mc.Finish() defer mc.Finish()
sub := mock.NewMocksubscription(mc)
ctx := context.Background() ctx := context.Background()
gomock.InOrder( sub := NewMocksubscription(mc)
sub.EXPECT().Next(ctx).Return(pubsubMessage("ABC", []byte{1, 2, 3}), nil),
sub.EXPECT().Next(ctx).Return(pubsubMessage("DEF", []byte{4, 5, 6}), nil),
sub.EXPECT().Next(ctx).Return(nil, errors.New("FIAL")),
)
receiver := GroupReceiverImpl{sub: sub} receiver := GroupReceiverImpl{sub: sub}
verify := func(sender libp2p_peer.ID, msg []byte, shouldError bool) { wantSender := libp2p_peer.ID("OK")
wantMsg := []byte{1, 2, 3}
sub.EXPECT().Next(ctx).Return(pubsubMessage(wantSender, wantMsg), nil)
gotMsg, gotSender, err := receiver.Receive(ctx) gotMsg, gotSender, err := receiver.Receive(ctx)
if (err != nil) != shouldError {
if shouldError { if err != nil {
t.Error("expected an error but got none") t.Errorf("expected no error; got %v", err)
} else {
t.Errorf("expected no error but got %v", err)
} }
if gotSender != wantSender {
t.Errorf("expected sender %v; got %v", wantSender, gotSender)
} }
if gotSender != sender { if !reflect.DeepEqual(gotMsg, wantMsg) {
t.Errorf("expected sender %v but got %v", sender, gotSender) t.Errorf("expected message %v; got %v", wantMsg, gotMsg)
} }
if !reflect.DeepEqual(gotMsg, msg) { })
t.Errorf("expected message %v but got %v", msg, gotMsg) t.Run("Error", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
ctx := context.Background()
sub := NewMocksubscription(mc)
receiver := GroupReceiverImpl{sub: sub}
sub.EXPECT().Next(ctx).Return(nil, errors.New("receive error"))
msg, sender, err := receiver.Receive(ctx)
if err == nil {
t.Error("expected an error; got nil")
} }
if sender != "" {
t.Errorf("expected empty sender; got %v", sender)
} }
verify("ABC", []byte{1, 2, 3}, false) if len(msg) > 0 {
verify("DEF", []byte{4, 5, 6}, false) t.Errorf("expected empty message; got %v", msg)
verify("", nil, true) }
})
} }
func TestHostV2_GroupReceiver(t *testing.T) { func TestHostV2_GroupReceiver(t *testing.T) {
t.Run("Basic", func(t *testing.T) { t.Run("New", func(t *testing.T) {
mc := gomock.NewController(t) mc := gomock.NewController(t)
defer mc.Finish() defer mc.Finish()
sub := &libp2p_pubsub.Subscription{} sub := &libp2p_pubsub.Subscription{}
pubsub := mock.NewMockpubsub(mc) topic := NewMocktopicHandle(mc)
pubsub.EXPECT().Subscribe("ABC").Return(sub, nil) joiner := NewMocktopicJoiner(mc)
host := &HostV2{pubsub: pubsub} host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
topic.EXPECT().Subscribe().Return(sub, nil),
)
gotReceiver, err := host.GroupReceiver("ABC") gotReceiver, err := host.GroupReceiver("ABC")
if r, ok := gotReceiver.(*GroupReceiverImpl); !ok { if r, ok := gotReceiver.(*GroupReceiverImpl); !ok {
t.Errorf("expected a hostv2 GroupReceiverImpl; got %v", gotReceiver) t.Errorf("expected a hostv2 GroupReceiverImpl; got %v", gotReceiver)
} else if r.sub != sub { } else if r.sub != sub {
@ -114,13 +187,39 @@ func TestHostV2_GroupReceiver(t *testing.T) {
t.Errorf("expected no error; got %v", err) t.Errorf("expected no error; got %v", err)
} }
}) })
t.Run("Error", func(t *testing.T) { t.Run("JoinError", func(t *testing.T) {
mc := gomock.NewController(t) mc := gomock.NewController(t)
defer mc.Finish() defer mc.Finish()
pubsub := mock.NewMockpubsub(mc)
pubsub.EXPECT().Subscribe("ABC").Return(nil, errors.New("FIAL")) joiner := NewMocktopicJoiner(mc)
host := &HostV2{pubsub: pubsub} host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("join error"))
gotReceiver, err := host.GroupReceiver("ABC") gotReceiver, err := host.GroupReceiver("ABC")
if gotReceiver != nil {
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
}
if err == nil {
t.Error("expected an error; got none")
}
})
t.Run("SubscribeError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
topic := NewMocktopicHandle(mc)
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
topic.EXPECT().Subscribe().Return(nil, errors.New("subscription error")),
)
gotReceiver, err := host.GroupReceiver("ABC")
if gotReceiver != nil { if gotReceiver != nil {
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver) t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
} }
@ -136,3 +235,65 @@ func TestHostV2_GroupReceiver(t *testing.T) {
} }
}) })
} }
func TestHostV2_getTopic(t *testing.T) {
t.Run("NewOK", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
want := NewMocktopicHandle(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(want, nil)
got, err := host.getTopic("ABC")
if err != nil {
t.Errorf("want nil error; got %v", err)
}
if got != want {
t.Errorf("want topic handle %v; got %v", want, got)
}
if _, ok := host.joined["ABC"]; !ok {
t.Error("topic not found in joined map")
}
})
t.Run("NewError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("OMG"))
got, err := host.getTopic("ABC")
if err == nil {
t.Error("want non-nil error; got nil")
}
if got != nil {
t.Errorf("want nil handle; got %v", got)
}
})
t.Run("Existing", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
want := NewMocktopicHandle(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{"ABC": want}}
joiner.EXPECT().JoinTopic("ABC").Times(0)
got, err := host.getTopic("ABC")
if err != nil {
t.Errorf("want nil error; got %v", err)
}
if got != want {
t.Errorf("want topic handle %v; got %v", want, got)
}
})
}

@ -6,4 +6,5 @@ exec git ls-files '*.go' | grep -v \
-e '/host_mock\.go' \ -e '/host_mock\.go' \
-e '/mock/[^/]*\.go' \ -e '/mock/[^/]*\.go' \
-e '/mock_[^/]*/[^/]*\.go' \ -e '/mock_[^/]*/[^/]*\.go' \
-e '_mock_for_test\.go' \
-e '/gen_[^/]*\.go' -e '/gen_[^/]*\.go'

Loading…
Cancel
Save