@ -30,12 +30,23 @@ type Service struct {
peerInfo <- chan peerstore . PeerInfo
peerInfo <- chan peerstore . PeerInfo
discovery * libp2pdis . RoutingDiscovery
discovery * libp2pdis . RoutingDiscovery
messageChan chan * msg_pb . Message
messageChan chan * msg_pb . Message
started bool
}
}
// ConnectionRetry set the number of retry of connection to bootnode in case the initial connection is failed
var (
// retry for 10 minutes and give up then
ConnectionRetry = 300
)
const (
waitInRetry = 2 * time . Second
connectionTimeout = 3 * time . Minute
)
// New returns role conversion service.
// New returns role conversion service.
func New ( h p2p . Host , rendezvous p2p . GroupID , peerChan chan p2p . Peer , bootnodes utils . AddrList ) * Service {
func New ( h p2p . Host , rendezvous p2p . GroupID , peerChan chan p2p . Peer , bootnodes utils . AddrList ) * Service {
timeout := 30 * time . Minute
ctx , cancel := context . WithTimeout ( context . Background ( ) , connectionTimeout )
ctx , cancel := context . WithTimeout ( context . Background ( ) , timeout )
dht , err := libp2pdht . New ( ctx , h . GetP2PHost ( ) )
dht , err := libp2pdht . New ( ctx , h . GetP2PHost ( ) )
if err != nil {
if err != nil {
panic ( err )
panic ( err )
@ -51,13 +62,20 @@ func New(h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer, bootnodes u
stoppedChan : make ( chan struct { } ) ,
stoppedChan : make ( chan struct { } ) ,
peerChan : peerChan ,
peerChan : peerChan ,
bootnodes : bootnodes ,
bootnodes : bootnodes ,
discovery : nil ,
started : false ,
}
}
}
}
// StartService starts network info service.
// StartService starts network info service.
func ( s * Service ) StartService ( ) {
func ( s * Service ) StartService ( ) {
s . Init ( )
err := s . Init ( )
if err != nil {
utils . GetLogInstance ( ) . Error ( "Service Init Failed" , "error" , err )
return
}
s . Run ( )
s . Run ( )
s . started = true
}
}
// Init initializes role conversion service.
// Init initializes role conversion service.
@ -76,20 +94,31 @@ func (s *Service) Init() error {
s . bootnodes = utils . BootNodes
s . bootnodes = utils . BootNodes
}
}
connected := false
for _ , peerAddr := range s . bootnodes {
for _ , peerAddr := range s . bootnodes {
peerinfo , _ := peerstore . InfoFromP2pAddr ( peerAddr )
peerinfo , _ := peerstore . InfoFromP2pAddr ( peerAddr )
wg . Add ( 1 )
wg . Add ( 1 )
go func ( ) {
go func ( ) {
defer wg . Done ( )
defer wg . Done ( )
if err := s . Host . GetP2PHost ( ) . Connect ( s . ctx , * peerinfo ) ; err != nil {
for i := 0 ; i < ConnectionRetry ; i ++ {
utils . GetLogInstance ( ) . Warn ( "can't connect to bootnode" , "error" , err )
if err := s . Host . GetP2PHost ( ) . Connect ( s . ctx , * peerinfo ) ; err != nil {
} else {
utils . GetLogInstance ( ) . Warn ( "can't connect to bootnode" , "error" , err , "try" , i )
utils . GetLogInstance ( ) . Info ( "connected to bootnode" , "node" , * peerinfo )
time . Sleep ( waitInRetry )
} else {
utils . GetLogInstance ( ) . Info ( "connected to bootnode" , "node" , * peerinfo , "try" , i )
// it is okay if any bootnode is connected
connected = true
break
}
}
}
} ( )
} ( )
}
}
wg . Wait ( )
wg . Wait ( )
if ! connected {
return fmt . Errorf ( "[FATAL] error connecting to bootnodes" )
}
// We use a rendezvous point "shardID" to announce our location.
// We use a rendezvous point "shardID" to announce our location.
utils . GetLogInstance ( ) . Info ( "Announcing ourselves..." )
utils . GetLogInstance ( ) . Info ( "Announcing ourselves..." )
s . discovery = libp2pdis . NewRoutingDiscovery ( s . dht )
s . discovery = libp2pdis . NewRoutingDiscovery ( s . dht )
@ -102,10 +131,16 @@ func (s *Service) Init() error {
// Run runs network info.
// Run runs network info.
func ( s * Service ) Run ( ) {
func ( s * Service ) Run ( ) {
defer close ( s . stoppedChan )
defer close ( s . stoppedChan )
if s . discovery == nil {
utils . GetLogInstance ( ) . Error ( "discovery is not initialized" )
return
}
var err error
var err error
s . peerInfo , err = s . discovery . FindPeers ( s . ctx , string ( s . Rendezvous ) )
s . peerInfo , err = s . discovery . FindPeers ( s . ctx , string ( s . Rendezvous ) )
if err != nil {
if err != nil {
utils . GetLogInstance ( ) . Error ( "FindPeers" , "error" , err )
utils . GetLogInstance ( ) . Error ( "FindPeers" , "error" , err )
return
}
}
go s . DoService ( )
go s . DoService ( )
@ -160,6 +195,11 @@ func (s *Service) StopService() {
utils . GetLogInstance ( ) . Info ( "Stopping network info service." )
utils . GetLogInstance ( ) . Info ( "Stopping network info service." )
defer s . cancel ( )
defer s . cancel ( )
if ! s . started {
utils . GetLogInstance ( ) . Info ( "Service didn't started. Exit." )
return
}
s . stopChan <- struct { } { }
s . stopChan <- struct { } { }
<- s . stoppedChan
<- s . stoppedChan
utils . GetLogInstance ( ) . Info ( "Network info service stopped." )
utils . GetLogInstance ( ) . Info ( "Network info service stopped." )