Merge pull request #393 from harmony-one/split_discovery_service

Split discovery service
pull/394/head
Leo Chen 6 years ago committed by GitHub
commit bae52e362e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      api/service/discovery/discovery_test.go
  2. 100
      api/service/discovery/service.go
  3. 128
      api/service/networkinfo/service.go
  4. 16
      api/service/staking/service.go
  5. 53
      node/node.go
  6. 11
      specs/p2p/peerdiscovery.md

@ -23,7 +23,7 @@ func TestDiscoveryService(t *testing.T) {
t.Fatalf("unable to new host in harmony: %v", err)
}
service = New(host, "rendezvous")
service = New(host, "rendezvous", nil, nil)
if service == nil {
t.Fatalf("unable to create new discovery service")

@ -1,19 +1,10 @@
package discovery
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/log"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
peerstore "github.com/libp2p/go-libp2p-peerstore"
libp2pdis "github.com/libp2p/go-libp2p-discovery"
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
)
// Constants for discovery service.
@ -25,106 +16,66 @@ const (
// Service is the struct for discovery service.
type Service struct {
Host p2p.Host
DHT *libp2pdht.IpfsDHT
Rendezvous string
ctx context.Context
peerChan <-chan peerstore.PeerInfo
peerChan chan p2p.Peer
stakingChan chan p2p.Peer
stopChan chan struct{}
}
// New returns discovery service.
// h is the p2p host
// r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network)
func New(h p2p.Host, r string) *Service {
ctx := context.Background()
dht, err := libp2pdht.New(ctx, h.GetP2PHost())
if err != nil {
panic(err)
}
func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service {
return &Service{
Host: h,
DHT: dht,
Rendezvous: r,
ctx: ctx,
peerChan: make(<-chan peerstore.PeerInfo),
peerChan: peerChan,
stakingChan: stakingChan,
stopChan: make(chan struct{}),
}
}
// StartService starts discovery service.
func (s *Service) StartService() {
log.Info("Starting discovery service.")
err := s.Init()
if err != nil {
log.Error("StartService Aborted", "Error", err)
return
}
// We use a rendezvous point "shardID" to announce our location.
log.Info("Announcing ourselves...")
routingDiscovery := libp2pdis.NewRoutingDiscovery(s.DHT)
libp2pdis.Advertise(s.ctx, routingDiscovery, s.Rendezvous)
log.Debug("Successfully announced!")
log.Debug("Searching for other peers...")
s.peerChan, err = routingDiscovery.FindPeers(s.ctx, s.Rendezvous)
if err != nil {
log.Error("FindPeers", "error", err)
}
s.Init()
s.Run()
}
// StopService shutdowns discovery service.
func (s *Service) StopService() {
log.Info("Shutting down discovery service.")
s.stopChan <- struct{}{}
log.Info("discovery service stopped.")
}
func (s *Service) foundPeers() {
// Run is the main function of the service
func (s *Service) Run() {
go s.contactP2pPeers()
}
func (s *Service) contactP2pPeers() {
for {
select {
case peer, ok := <-s.peerChan:
if !ok {
log.Debug("end of info", "peer", peer.ID)
log.Debug("end of info", "peer", peer.PeerID)
return
}
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 {
log.Debug("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "len", len(peer.ID))
p := p2p.Peer{PeerID: peer.ID, Addrs: peer.Addrs}
s.Host.AddPeer(&p)
log.Debug("[DISCOVERY]", "peer", peer)
s.Host.AddPeer(&peer)
// TODO: stop ping if pinged before
s.pingPeer(p)
}
// TODO: call staking servcie here if it is a new node
s.pingPeer(peer)
case <-s.stopChan:
return
}
}
}
// Init is to initialize for discoveryService.
func (s *Service) Init() error {
func (s *Service) Init() {
log.Info("Init discovery service")
// Bootstrap the DHT. In the default configuration, this spawns a Background
// thread that will refresh the peer table every five minutes.
log.Debug("Bootstrapping the DHT")
if err := s.DHT.Bootstrap(s.ctx); err != nil {
return ErrDHTBootstrap
}
var wg sync.WaitGroup
for _, peerAddr := range utils.BootNodes {
peerinfo, _ := peerstore.InfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := s.Host.GetP2PHost().Connect(s.ctx, *peerinfo); err != nil {
log.Warn("can't connect to bootnode", "error", err)
} else {
log.Info("connected to bootnode", "node", *peerinfo)
}
}()
}
wg.Wait()
go s.foundPeers()
return nil
}
func (s *Service) pingPeer(peer p2p.Peer) {
@ -134,4 +85,5 @@ func (s *Service) pingPeer(peer p2p.Peer) {
content := host.ConstructP2pMessage(byte(0), buffer)
s.Host.SendMessage(peer, content)
log.Debug("Sent Ping Message to", "peer", peer)
s.stakingChan <- peer
}

@ -1,23 +1,57 @@
package networkinfo
import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/prometheus/common/log"
peerstore "github.com/libp2p/go-libp2p-peerstore"
libp2pdis "github.com/libp2p/go-libp2p-discovery"
libp2pdht "github.com/libp2p/go-libp2p-kad-dht"
manet "github.com/multiformats/go-multiaddr-net"
)
// Service is the network info service.
type Service struct {
Host p2p.Host
Rendezvous string
dht *libp2pdht.IpfsDHT
ctx context.Context
cancel context.CancelFunc
stopChan chan struct{}
stoppedChan chan struct{}
peerChan chan *p2p.Peer
peerChan chan p2p.Peer
peerInfo <-chan peerstore.PeerInfo
discovery *libp2pdis.RoutingDiscovery
}
// New returns network info service.
func New(peerChan chan *p2p.Peer) *Service {
// NewService returns role conversion service.
func NewService(h p2p.Host, rendezvous string, peerChan chan p2p.Peer) *Service {
timeout := 30 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
dht, err := libp2pdht.New(ctx, h.GetP2PHost())
if err != nil {
panic(err)
}
return &Service{
Host: h,
dht: dht,
Rendezvous: rendezvous,
ctx: ctx,
cancel: cancel,
stopChan: make(chan struct{}),
stoppedChan: make(chan struct{}),
peerChan: peerChan,
peerInfo: make(<-chan peerstore.PeerInfo),
}
}
@ -27,37 +61,95 @@ func (s *Service) StartService() {
s.Run()
}
// Init initializes network info service.
func (s *Service) Init() {
// Init initializes role conversion service.
func (s *Service) Init() error {
log.Info("Init networkinfo service")
// Bootstrap the DHT. In the default configuration, this spawns a Background
// thread that will refresh the peer table every five minutes.
log.Debug("Bootstrapping the DHT")
if err := s.dht.Bootstrap(s.ctx); err != nil {
return fmt.Errorf("error bootstrap dht")
}
var wg sync.WaitGroup
for _, peerAddr := range utils.BootNodes {
peerinfo, _ := peerstore.InfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := s.Host.GetP2PHost().Connect(s.ctx, *peerinfo); err != nil {
log.Warn("can't connect to bootnode", "error", err)
} else {
log.Info("connected to bootnode", "node", *peerinfo)
}
}()
}
wg.Wait()
// We use a rendezvous point "shardID" to announce our location.
log.Info("Announcing ourselves...")
s.discovery = libp2pdis.NewRoutingDiscovery(s.dht)
libp2pdis.Advertise(s.ctx, s.discovery, s.Rendezvous)
log.Info("Successfully announced!")
go s.DoService()
return nil
}
// Run runs network info.
func (s *Service) Run() {
go func() {
defer close(s.stoppedChan)
var err error
s.peerInfo, err = s.discovery.FindPeers(s.ctx, s.Rendezvous)
if err != nil {
log.Error("FindPeers", "error", err)
}
}
// DoService does network info.
func (s *Service) DoService() {
for {
select {
default:
utils.GetLogInstance().Info("Running network info")
// TODO: Write some logic here.
s.DoService()
case <-s.stopChan:
case peer, ok := <-s.peerInfo:
if !ok {
log.Debug("no more peer info", "peer", peer.ID)
return
}
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 {
log.Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs)
ip := "127.0.0.1"
var port string
for _, addr := range peer.Addrs {
netaddr, err := manet.ToNetAddr(addr)
if err != nil {
continue
}
nip := netaddr.(*net.TCPAddr).IP.String()
if strings.Compare(nip, "127.0.0.1") != 0 {
ip = nip
port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port)
break
}
}
p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs}
log.Info("Notify peerChan", "peer", p)
s.peerChan <- p
}
case <-s.ctx.Done():
return
}
}
}()
}
// DoService does network info.
func (s *Service) DoService() {
// At the end, send Peer info to peer channel
s.peerChan <- &p2p.Peer{}
}
// StopService stops network info service.
func (s *Service) StopService() {
utils.GetLogInstance().Info("Stopping network info service.")
defer s.cancel()
s.stopChan <- struct{}{}
<-s.stoppedChan
utils.GetLogInstance().Info("Role conversion stopped.")
utils.GetLogInstance().Info("Network info service stopped.")
}

@ -1,6 +1,7 @@
package staking
import (
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
@ -9,11 +10,11 @@ import (
type Service struct {
stopChan chan struct{}
stoppedChan chan struct{}
peerChan chan *p2p.Peer
peerChan <-chan p2p.Peer
}
// New returns staking service.
func New(peerChan chan *p2p.Peer) *Service {
// NewService returns staking service.
func NewService(peerChan <-chan p2p.Peer) *Service {
return &Service{
stopChan: make(chan struct{}),
stoppedChan: make(chan struct{}),
@ -23,6 +24,7 @@ func New(peerChan chan *p2p.Peer) *Service {
// StartService starts staking service.
func (s *Service) StartService() {
log.Info("Start Staking Service")
s.Init()
s.Run()
}
@ -34,15 +36,14 @@ func (s *Service) Init() {
// Run runs staking.
func (s *Service) Run() {
// Wait until peer info of beacon chain is ready.
peer := <-s.peerChan
go func() {
defer close(s.stoppedChan)
for {
select {
default:
utils.GetLogInstance().Info("Running staking")
case peer := <-s.peerChan:
utils.GetLogInstance().Info("Running role conversion")
// TODO: Write some logic here.
s.DoService(peer)
s.DoService(&peer)
case <-s.stopChan:
return
}
@ -52,6 +53,7 @@ func (s *Service) Run() {
// DoService does staking.
func (s *Service) DoService(peer *p2p.Peer) {
utils.GetLogInstance().Info("Staking with Peer")
}
// StopService stops staking service.

@ -4,7 +4,6 @@ import (
"bytes"
"crypto/ecdsa"
"encoding/binary"
"encoding/gob"
"encoding/hex"
"fmt"
"math/big"
@ -28,6 +27,7 @@ import (
blockproposal "github.com/harmony-one/harmony/api/service/blockproposal"
"github.com/harmony-one/harmony/api/service/clientsupport"
consensus_service "github.com/harmony-one/harmony/api/service/consensus"
"github.com/harmony-one/harmony/api/service/discovery"
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/api/service/networkinfo"
"github.com/harmony-one/harmony/api/service/staking"
@ -112,12 +112,6 @@ type syncConfig struct {
client *downloader.Client
}
// NetworkNode ...
type NetworkNode struct {
SelfPeer p2p.Peer
IDCPeer p2p.Peer
}
// Node represents a protocol-participating node in the network
type Node struct {
Consensus *bft.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits)
@ -132,7 +126,7 @@ type Node struct {
ClientPeer *p2p.Peer // The peer for the harmony tx generator client, used for leaders to return proof-of-accept
Client *client.Client // The presence of a client object means this node will also act as a client
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer
BCPeers []p2p.Peer // list of Beacon Chain Peers. This is needed by all nodes.
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State State // State of the Node
@ -212,34 +206,6 @@ func (node *Node) countNumTransactionsInBlockchain() int {
return count
}
// SerializeNode serializes the node
// https://stackoverflow.com/questions/12854125/how-do-i-dump-the-struct-into-the-byte-array-without-reflection/12854659#12854659
func (node *Node) SerializeNode(nnode *NetworkNode) []byte {
//Needs to escape the serialization of unexported fields
var result bytes.Buffer
encoder := gob.NewEncoder(&result)
err := encoder.Encode(nnode)
if err != nil {
fmt.Println("Could not serialize node")
fmt.Println("ERROR", err)
//utils.GetLogInstance().Error("Could not serialize node")
}
return result.Bytes()
}
// DeserializeNode deserializes the node
func DeserializeNode(d []byte) *NetworkNode {
var wn NetworkNode
r := bytes.NewBuffer(d)
decoder := gob.NewDecoder(r)
err := decoder.Decode(&wn)
if err != nil {
log.Error("Could not de-serialize node 1")
}
return &wn
}
// New creates a new node.
func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
node := Node{}
@ -692,11 +658,16 @@ func (node *Node) setupForBeaconValidator() {
}
func (node *Node) setupForNewNode() {
chanPeer := make(chan *p2p.Peer)
// Add network info serivce.
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(chanPeer))
// Add staking service.
node.serviceManager.RegisterService(service_manager.Staking, staking.New(chanPeer))
chanPeer := make(chan p2p.Peer)
stakingPeer := make(chan p2p.Peer)
// Register staking service.
node.serviceManager.RegisterService(service_manager.Staking, staking.NewService(stakingPeer))
// Register peer discovery service.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, fmt.Sprintf("%v", node.Consensus.ShardID), chanPeer, stakingPeer))
// Register networkinfo service.
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.NewService(node.host, fmt.Sprintf("%v", node.Consensus.ShardID), chanPeer))
}
// ServiceManagerSetup setups service store.

@ -57,3 +57,14 @@ The new node are connected to the two P2P overlay networks.
Harmony utilizes libp2p as the underlying networking layer for peer discovery and p2p network transportation.
It is still a crucial task to understand the protocol messages and how the libp2p handles the messages.
We may need to fork the libp2p to fit our requirement during the development.
## Two Stages of Peer Discovery
Harmony uses two stages of peer discovery mechanism to form the overlay of p2p networks.
The first stage is to connect to beacon chain to stake and get the shard information.
The second stage is to create the real p2p network within the shard afterwards.
New nodes will always keep the connection to some beacon chain nodes for further communication.
The current implementation works like the following.
* beacon chain nodes bootstrap by contacting bootnodes using discovery rendezvous string "0". Then the beacon chain is formed.
* new nodes contact bootnodes using rendezvous string "0" to connect to beacon chain nodes.
* new nodes use pubsub to stake in beacon chain, and get shard information from beacon chain after the randomness and resharding algorithm.
* new nodes use the new shardID as the rendezvous string to connect to bootnodes again to form new p2p network at the shard level.

Loading…
Cancel
Save