|
|
|
package hostv2
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"reflect"
|
|
|
|
"testing"
|
|
|
|
|
|
|
|
"github.com/golang/mock/gomock"
|
|
|
|
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
|
|
libp2p_pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
|
|
|
|
|
|
|
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestHostV2_SendMessageToGroups(t *testing.T) {
|
|
|
|
t.Run("Basic", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
okTopic := NewMocktopicHandle(mc)
|
|
|
|
newTopic := NewMocktopicHandle(mc)
|
|
|
|
groups := []nodeconfig.GroupID{"OK", "New"}
|
|
|
|
data := []byte{1, 2, 3}
|
|
|
|
joined := map[string]topicHandle{"OK": okTopic}
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: joined}
|
|
|
|
|
|
|
|
gomock.InOrder(
|
|
|
|
// okTopic is already in joined map, JoinTopic shouldn't be called
|
|
|
|
joiner.EXPECT().JoinTopic("OK").Times(0),
|
|
|
|
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
|
|
|
|
// newTopic is not in joined map, JoinTopic should be called
|
|
|
|
joiner.EXPECT().JoinTopic("New").Return(newTopic, nil),
|
|
|
|
newTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
|
|
|
|
)
|
|
|
|
|
|
|
|
err := host.SendMessageToGroups(groups, data)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("expected no error; got %v", err)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("JoinError", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
okTopic := NewMocktopicHandle(mc)
|
|
|
|
groups := []nodeconfig.GroupID{"Error", "OK"}
|
|
|
|
data := []byte{1, 2, 3}
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
|
|
|
|
|
|
|
|
gomock.InOrder(
|
|
|
|
// Make first join return an error
|
|
|
|
joiner.EXPECT().JoinTopic("Error").Return(nil, errors.New("join error")),
|
|
|
|
// Subsequent topics should still be processed after an error
|
|
|
|
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
|
|
|
|
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
|
|
|
|
)
|
|
|
|
|
|
|
|
err := host.SendMessageToGroups(groups, data)
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
t.Error("expected an error; got nil")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("PublishError", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
okTopic := NewMocktopicHandle(mc)
|
|
|
|
erringTopic := NewMocktopicHandle(mc)
|
|
|
|
groups := []nodeconfig.GroupID{"Error", "OK"}
|
|
|
|
data := []byte{1, 2, 3}
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
|
|
|
|
|
|
|
|
gomock.InOrder(
|
|
|
|
// Make first publish return an error
|
|
|
|
joiner.EXPECT().JoinTopic("Error").Return(erringTopic, nil),
|
|
|
|
erringTopic.EXPECT().Publish(context.TODO(), data).Return(errors.New("publish error")),
|
|
|
|
// Subsequent topics should still be processed after an error
|
|
|
|
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
|
|
|
|
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
|
|
|
|
)
|
|
|
|
|
|
|
|
if err := host.SendMessageToGroups(groups, data); err == nil {
|
|
|
|
t.Error("expected an error; got nil")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestGroupReceiver_Close(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
sub := NewMocksubscription(mc)
|
|
|
|
sub.EXPECT().Cancel()
|
|
|
|
receiver := GroupReceiverImpl{sub: sub}
|
|
|
|
|
|
|
|
err := receiver.Close()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("expected no error but got %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func pubsubMessage(from libp2p_peer.ID, data []byte) *libp2p_pubsub.Message {
|
|
|
|
m := libp2p_pubsub_pb.Message{From: []byte(from), Data: data}
|
|
|
|
return &libp2p_pubsub.Message{Message: &m}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestGroupReceiver_Receive(t *testing.T) {
|
|
|
|
t.Run("OK", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
sub := NewMocksubscription(mc)
|
|
|
|
receiver := GroupReceiverImpl{sub: sub}
|
|
|
|
wantSender := libp2p_peer.ID("OK")
|
|
|
|
wantMsg := []byte{1, 2, 3}
|
|
|
|
|
|
|
|
sub.EXPECT().Next(ctx).Return(pubsubMessage(wantSender, wantMsg), nil)
|
|
|
|
|
|
|
|
gotMsg, gotSender, err := receiver.Receive(ctx)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("expected no error; got %v", err)
|
|
|
|
}
|
|
|
|
if gotSender != wantSender {
|
|
|
|
t.Errorf("expected sender %v; got %v", wantSender, gotSender)
|
|
|
|
}
|
|
|
|
if !reflect.DeepEqual(gotMsg, wantMsg) {
|
|
|
|
t.Errorf("expected message %v; got %v", wantMsg, gotMsg)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("Error", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
sub := NewMocksubscription(mc)
|
|
|
|
receiver := GroupReceiverImpl{sub: sub}
|
|
|
|
|
|
|
|
sub.EXPECT().Next(ctx).Return(nil, errors.New("receive error"))
|
|
|
|
|
|
|
|
msg, sender, err := receiver.Receive(ctx)
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
t.Error("expected an error; got nil")
|
|
|
|
}
|
|
|
|
if sender != "" {
|
|
|
|
t.Errorf("expected empty sender; got %v", sender)
|
|
|
|
}
|
|
|
|
if len(msg) > 0 {
|
|
|
|
t.Errorf("expected empty message; got %v", msg)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestHostV2_GroupReceiver(t *testing.T) {
|
|
|
|
t.Run("New", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
sub := &libp2p_pubsub.Subscription{}
|
|
|
|
topic := NewMocktopicHandle(mc)
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
|
|
|
|
|
|
|
|
gomock.InOrder(
|
|
|
|
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
|
|
|
|
topic.EXPECT().Subscribe().Return(sub, nil),
|
|
|
|
)
|
|
|
|
|
|
|
|
gotReceiver, err := host.GroupReceiver("ABC")
|
|
|
|
|
|
|
|
if r, ok := gotReceiver.(*GroupReceiverImpl); !ok {
|
|
|
|
t.Errorf("expected a hostv2 GroupReceiverImpl; got %v", gotReceiver)
|
|
|
|
} else if r.sub != sub {
|
|
|
|
t.Errorf("unexpected subscriber %v", r.sub)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("expected no error; got %v", err)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("JoinError", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
|
|
|
|
|
|
|
|
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("join error"))
|
|
|
|
|
|
|
|
gotReceiver, err := host.GroupReceiver("ABC")
|
|
|
|
|
|
|
|
if gotReceiver != nil {
|
|
|
|
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
|
|
|
|
}
|
|
|
|
if err == nil {
|
|
|
|
t.Error("expected an error; got none")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("SubscribeError", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
topic := NewMocktopicHandle(mc)
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
|
|
|
|
|
|
|
|
gomock.InOrder(
|
|
|
|
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
|
|
|
|
topic.EXPECT().Subscribe().Return(nil, errors.New("subscription error")),
|
|
|
|
)
|
|
|
|
|
|
|
|
gotReceiver, err := host.GroupReceiver("ABC")
|
|
|
|
|
|
|
|
if gotReceiver != nil {
|
|
|
|
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
|
|
|
|
}
|
|
|
|
if err == nil {
|
|
|
|
t.Error("expected an error; got none")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("Closed", func(t *testing.T) {
|
|
|
|
var emptyReceiver GroupReceiverImpl
|
|
|
|
_, _, err := emptyReceiver.Receive(context.Background())
|
|
|
|
if err == nil {
|
|
|
|
t.Errorf("Receive() from nil/closed receiver did not return error")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestHostV2_getTopic(t *testing.T) {
|
|
|
|
t.Run("NewOK", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
want := NewMocktopicHandle(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
|
|
|
|
|
|
|
|
joiner.EXPECT().JoinTopic("ABC").Return(want, nil)
|
|
|
|
|
|
|
|
got, err := host.getTopic("ABC")
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("want nil error; got %v", err)
|
|
|
|
}
|
|
|
|
if got != want {
|
|
|
|
t.Errorf("want topic handle %v; got %v", want, got)
|
|
|
|
}
|
|
|
|
if _, ok := host.joined["ABC"]; !ok {
|
|
|
|
t.Error("topic not found in joined map")
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("NewError", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
|
|
|
|
|
|
|
|
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("OMG"))
|
|
|
|
|
|
|
|
got, err := host.getTopic("ABC")
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
t.Error("want non-nil error; got nil")
|
|
|
|
}
|
|
|
|
if got != nil {
|
|
|
|
t.Errorf("want nil handle; got %v", got)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("Existing", func(t *testing.T) {
|
|
|
|
mc := gomock.NewController(t)
|
|
|
|
defer mc.Finish()
|
|
|
|
|
|
|
|
joiner := NewMocktopicJoiner(mc)
|
|
|
|
want := NewMocktopicHandle(mc)
|
|
|
|
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{"ABC": want}}
|
|
|
|
|
|
|
|
joiner.EXPECT().JoinTopic("ABC").Times(0)
|
|
|
|
|
|
|
|
got, err := host.getTopic("ABC")
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("want nil error; got %v", err)
|
|
|
|
}
|
|
|
|
if got != want {
|
|
|
|
t.Errorf("want topic handle %v; got %v", want, got)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|