diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 218090292..92a48ad5f 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -28,6 +28,7 @@ type Service struct { peerChan chan p2p.Peer peerInfo <-chan peerstore.PeerInfo discovery *libp2pdis.RoutingDiscovery + lock sync.Mutex } // New returns role conversion service. @@ -116,17 +117,14 @@ func (s *Service) DoService() { } if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil { - utils.GetLogInstance().Warn("can't connect to peer node", "error", err) - } else { - utils.GetLogInstance().Info("connected to peer node", "peer", peer) - } - }() - wg.Wait() + s.lock.Lock() + if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil { + utils.GetLogInstance().Warn("can't connect to peer node", "error", err) + } else { + utils.GetLogInstance().Info("connected to peer node", "peer", peer) + } + s.lock.Unlock() + // figure out the public ip/port ip := "127.0.0.1" var port string for _, addr := range peer.Addrs { diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index d0a8624a6..d37519c9e 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -45,6 +45,7 @@ type HostV2 struct { pubsub PubSub self p2p.Peer priKey p2p_crypto.PrivKey + lock sync.Mutex incomingPeers []p2p.Peer // list of incoming Peers. TODO: fixed number incoming outgoingPeers []p2p.Peer // list of outgoing Peers. TODO: fixed number of outgoing @@ -161,7 +162,8 @@ func New(self *p2p.Peer, priKey p2p_crypto.PrivKey, opts ...p2p_config.Option) * append(opts, libp2p.ListenAddrs(listenAddr), libp2p.Identity(priKey))..., ) catchError(err) - pubsub, err := pubsub.NewGossipSub(ctx, p2pHost) + // pubsub, err := pubsub.NewGossipSub(ctx, p2pHost) + pubsub, err := pubsub.NewFloodSub(ctx, p2pHost) catchError(err) self.PeerID = p2pHost.ID() @@ -244,15 +246,11 @@ func (host *HostV2) ConnectHostPeer(peer p2p.Peer) { utils.GetLogInstance().Error("ConnectHostPeer", "new peerinfo error", err, "peer", peer) return } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if err := host.h.Connect(ctx, *peerInfo); err != nil { - utils.GetLogInstance().Warn("can't connect to peer", "error", err, "peer", peer) - } else { - utils.GetLogInstance().Info("connected to peer host", "node", *peerInfo) - } - }() - wg.Wait() + host.lock.Lock() + defer host.lock.Unlock() + if err := host.h.Connect(ctx, *peerInfo); err != nil { + utils.GetLogInstance().Warn("can't connect to peer", "error", err, "peer", peer) + } else { + utils.GetLogInstance().Info("connected to peer host", "node", *peerInfo) + } } diff --git a/p2p/host/mock/host_mock.go b/p2p/host/mock/host_mock.go index 694459f0c..1d00a3df3 100644 --- a/p2p/host/mock/host_mock.go +++ b/p2p/host/mock/host_mock.go @@ -5,11 +5,12 @@ package mock_p2p import ( + reflect "reflect" + gomock "github.com/golang/mock/gomock" p2p "github.com/harmony-one/harmony/p2p" go_libp2p_host "github.com/libp2p/go-libp2p-host" go_libp2p_peer "github.com/libp2p/go-libp2p-peer" - reflect "reflect" ) // MockHost is a mock of Host interface @@ -141,3 +142,33 @@ func (m *MockHost) GroupReceiver(arg0 p2p.GroupID) (p2p.GroupReceiver, error) { func (mr *MockHostMockRecorder) GroupReceiver(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GroupReceiver", reflect.TypeOf((*MockHost)(nil).GroupReceiver), arg0) } + +// AddIncomingPeer mocks base method +func (m *MockHost) AddIncomingPeer(peer p2p.Peer) { + m.ctrl.Call(m, "AddIncomingPeer", peer) +} + +// AddIncomingPeer indicates an expected call of AddIncomingPeer +func (mr *MockHostMockRecorder) AddIncomingPeer(groups, msg interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddIncomingPeer", reflect.TypeOf((*MockHost)(nil).AddIncomingPeer), groups, msg) +} + +// AddOutgoingPeer mocks base method +func (m *MockHost) AddOutgoingPeer(peer p2p.Peer) { + m.ctrl.Call(m, "AddOutgoingPeer", peer) +} + +// AddOutgoingPeer indicates an expected call of AddOutgoingPeer +func (mr *MockHostMockRecorder) AddOutgoingPeer(groups, msg interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddOutgoingPeer", reflect.TypeOf((*MockHost)(nil).AddOutgoingPeer), groups, msg) +} + +// ConnectHostPeer mocks base method +func (m *MockHost) ConnectHostPeer(peer p2p.Peer) { + m.ctrl.Call(m, "ConnectHostPeer", peer) +} + +// ConnectHostPeer indicates an expected call of ConnectHostPeer +func (mr *MockHostMockRecorder) ConnectHostPeer(groups, msg interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectHostPeer", reflect.TypeOf((*MockHost)(nil).ConnectHostPeer), groups, msg) +}