add discovery service in the new service framework

Signed-off-by: Leo Chen <leo@harmony.one>
pull/372/head
Leo Chen 6 years ago
parent 43e39bb52f
commit 6b46b91b0e
  1. 31
      api/service/discovery/discovery_test.go
  2. 12
      api/service/discovery/errors.go
  3. 118
      api/service/discovery/service.go

@ -0,0 +1,31 @@
package discovery
import (
"testing"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
var (
ip = "127.0.0.1"
port = "7099"
service *Service
)
func TestDiscoveryService(t *testing.T) {
selfPeer := p2p.Peer{IP: ip, Port: port}
priKey, _, err := utils.GenKeyP2P(ip, port)
host, err := p2pimpl.NewHost(&selfPeer, priKey)
if err != nil {
t.Fatalf("unable to new host in harmony: %v", err)
}
service = New(host, "rendezvous")
if service == nil {
t.Fatalf("unable to create new discovery service")
}
}

@ -0,0 +1,12 @@
package discovery
import "errors"
// Errors of peer discovery
var (
ErrGetPeers = errors.New("[DISCOVERY]: get peer list failed")
ErrConnectionFull = errors.New("[DISCOVERY]: node's incoming connection full")
ErrPing = errors.New("[DISCOVERY]: ping peer failed")
ErrPong = errors.New("[DISCOVERY]: pong peer failed")
ErrDHTBootstrap = errors.New("[DISCOVERY]: DHT bootstrap failed")
)

@ -0,0 +1,118 @@
package discovery
import (
"context"
"io"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
peerstore "github.com/libp2p/go-libp2p-peerstore"
libp2pdis "github.com/libp2p/go-libp2p-discovery"
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
)
// Constants for discovery service.
const (
numIncoming = 128
numOutgoing = 16
)
// Service is the struct for discovery service.
type Service struct {
Host p2p.Host
DHT *libp2pdht.IpfsDHT
Rendezvous string
reader io.Reader
writer io.Writer
ctx context.Context
}
// New returns discovery service.
// h is the p2p host
// r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network)
func New(h p2p.Host, r string) *Service {
ctx := context.Background()
dht, err := libp2pdht.New(ctx, h.GetP2PHost())
if err != nil {
panic(err)
}
return &Service{
Host: h,
DHT: dht,
Rendezvous: r,
ctx: ctx,
}
}
// StartService starts discovery service.
func (s *Service) StartService() {
log.Info("Starting discovery service.")
err := s.Init()
if err != nil {
log.Error("StartService Aborted", "Error", err)
return
}
// We use a rendezvous point "shardID" to announce our location.
log.Info("Announcing ourselves...")
routingDiscovery := libp2pdis.NewRoutingDiscovery(s.DHT)
libp2pdis.Advertise(s.ctx, routingDiscovery, s.Rendezvous)
log.Debug("Successfully announced!")
log.Debug("Searching for other peers...")
peerChan, err := routingDiscovery.FindPeers(s.ctx, s.Rendezvous)
if err != nil {
log.Error("FindPeers", "error", err)
}
for peer := range peerChan {
// skip myself
if peer.ID == s.Host.GetP2PHost().ID() {
continue
}
log.Debug("Found peer", "adding peer", peer)
p := p2p.Peer{PeerID: peer.ID, Addrs: peer.Addrs}
s.Host.AddPeer(&p)
}
select {}
}
// StopService shutdowns discovery service.
func (s *Service) StopService() {
log.Info("Shutting down discovery service.")
}
// Init is to initialize for discoveryService.
func (s *Service) Init() error {
log.Info("Init discovery service")
// Bootstrap the DHT. In the default configuration, this spawns a Background
// thread that will refresh the peer table every five minutes.
log.Debug("Bootstrapping the DHT")
if err := s.DHT.Bootstrap(s.ctx); err != nil {
return ErrDHTBootstrap
}
var wg sync.WaitGroup
for _, peerAddr := range utils.BootNodes {
peerinfo, _ := peerstore.InfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := s.Host.GetP2PHost().Connect(s.ctx, *peerinfo); err != nil {
log.Warn("can't connect to bootnode", "error", err)
} else {
log.Info("connected to bootnode", "node", *peerinfo)
}
}()
}
wg.Wait()
return nil
}
Loading…
Cancel
Save