diff --git a/api/service/discovery/discovery_test.go b/api/service/discovery/discovery_test.go index ef6b24be2..b69ab0e78 100644 --- a/api/service/discovery/discovery_test.go +++ b/api/service/discovery/discovery_test.go @@ -23,7 +23,7 @@ func TestDiscoveryService(t *testing.T) { t.Fatalf("unable to new host in harmony: %v", err) } - service = New(host, "rendezvous") + service = New(host, "rendezvous", nil, nil) if service == nil { t.Fatalf("unable to create new discovery service") diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index fc02b2b01..62b04b7da 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -1,19 +1,10 @@ package discovery import ( - "context" - "sync" - "github.com/ethereum/go-ethereum/log" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" - "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" - - peerstore "github.com/libp2p/go-libp2p-peerstore" - - libp2pdis "github.com/libp2p/go-libp2p-discovery" - libp2pdht "github.com/libp2p/go-libp2p-kad-dht" ) // Constants for discovery service. @@ -24,107 +15,67 @@ const ( // Service is the struct for discovery service. type Service struct { - Host p2p.Host - DHT *libp2pdht.IpfsDHT - Rendezvous string - ctx context.Context - peerChan <-chan peerstore.PeerInfo + Host p2p.Host + Rendezvous string + peerChan chan p2p.Peer + stakingChan chan p2p.Peer + stopChan chan struct{} } // New returns discovery service. // h is the p2p host // r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network) -func New(h p2p.Host, r string) *Service { - ctx := context.Background() - dht, err := libp2pdht.New(ctx, h.GetP2PHost()) - if err != nil { - panic(err) - } - +func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service { return &Service{ - Host: h, - DHT: dht, - Rendezvous: r, - ctx: ctx, - peerChan: make(<-chan peerstore.PeerInfo), + Host: h, + Rendezvous: r, + peerChan: peerChan, + stakingChan: stakingChan, + stopChan: make(chan struct{}), } } // StartService starts discovery service. func (s *Service) StartService() { log.Info("Starting discovery service.") - err := s.Init() - if err != nil { - log.Error("StartService Aborted", "Error", err) - return - } - - // We use a rendezvous point "shardID" to announce our location. - log.Info("Announcing ourselves...") - routingDiscovery := libp2pdis.NewRoutingDiscovery(s.DHT) - libp2pdis.Advertise(s.ctx, routingDiscovery, s.Rendezvous) - log.Debug("Successfully announced!") - - log.Debug("Searching for other peers...") - s.peerChan, err = routingDiscovery.FindPeers(s.ctx, s.Rendezvous) - if err != nil { - log.Error("FindPeers", "error", err) - } + s.Init() + s.Run() } // StopService shutdowns discovery service. func (s *Service) StopService() { log.Info("Shutting down discovery service.") + s.stopChan <- struct{}{} + log.Info("discovery service stopped.") } -func (s *Service) foundPeers() { +// Run is the main function of the service +func (s *Service) Run() { + go s.contactP2pPeers() +} + +func (s *Service) contactP2pPeers() { for { select { case peer, ok := <-s.peerChan: if !ok { - log.Debug("end of info", "peer", peer.ID) + log.Debug("end of info", "peer", peer.PeerID) return } - if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { - log.Debug("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "len", len(peer.ID)) - p := p2p.Peer{PeerID: peer.ID, Addrs: peer.Addrs} - s.Host.AddPeer(&p) - // TODO: stop ping if pinged before - s.pingPeer(p) - } + log.Debug("[DISCOVERY]", "peer", peer) + s.Host.AddPeer(&peer) + // TODO: stop ping if pinged before + // TODO: call staking servcie here if it is a new node + s.pingPeer(peer) + case <-s.stopChan: + return } } } // Init is to initialize for discoveryService. -func (s *Service) Init() error { +func (s *Service) Init() { log.Info("Init discovery service") - - // Bootstrap the DHT. In the default configuration, this spawns a Background - // thread that will refresh the peer table every five minutes. - log.Debug("Bootstrapping the DHT") - if err := s.DHT.Bootstrap(s.ctx); err != nil { - return ErrDHTBootstrap - } - - var wg sync.WaitGroup - for _, peerAddr := range utils.BootNodes { - peerinfo, _ := peerstore.InfoFromP2pAddr(peerAddr) - wg.Add(1) - go func() { - defer wg.Done() - if err := s.Host.GetP2PHost().Connect(s.ctx, *peerinfo); err != nil { - log.Warn("can't connect to bootnode", "error", err) - } else { - log.Info("connected to bootnode", "node", *peerinfo) - } - }() - } - wg.Wait() - - go s.foundPeers() - - return nil } func (s *Service) pingPeer(peer p2p.Peer) { @@ -134,4 +85,5 @@ func (s *Service) pingPeer(peer p2p.Peer) { content := host.ConstructP2pMessage(byte(0), buffer) s.Host.SendMessage(peer, content) log.Debug("Sent Ping Message to", "peer", peer) + s.stakingChan <- peer } diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index a39698118..625121bcd 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -1,23 +1,57 @@ package networkinfo import ( + "context" + "fmt" + "net" + "strings" + "sync" + "time" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" + "github.com/prometheus/common/log" + + peerstore "github.com/libp2p/go-libp2p-peerstore" + + libp2pdis "github.com/libp2p/go-libp2p-discovery" + libp2pdht "github.com/libp2p/go-libp2p-kad-dht" + manet "github.com/multiformats/go-multiaddr-net" ) // Service is the network info service. type Service struct { + Host p2p.Host + Rendezvous string + dht *libp2pdht.IpfsDHT + ctx context.Context + cancel context.CancelFunc stopChan chan struct{} stoppedChan chan struct{} - peerChan chan *p2p.Peer + peerChan chan p2p.Peer + peerInfo <-chan peerstore.PeerInfo + discovery *libp2pdis.RoutingDiscovery } -// New returns network info service. -func New(peerChan chan *p2p.Peer) *Service { +// NewService returns role conversion service. +func NewService(h p2p.Host, rendezvous string, peerChan chan p2p.Peer) *Service { + timeout := 30 * time.Minute + ctx, cancel := context.WithTimeout(context.Background(), timeout) + dht, err := libp2pdht.New(ctx, h.GetP2PHost()) + if err != nil { + panic(err) + } + return &Service{ + Host: h, + dht: dht, + Rendezvous: rendezvous, + ctx: ctx, + cancel: cancel, stopChan: make(chan struct{}), stoppedChan: make(chan struct{}), peerChan: peerChan, + peerInfo: make(<-chan peerstore.PeerInfo), } } @@ -27,37 +61,95 @@ func (s *Service) StartService() { s.Run() } -// Init initializes network info service. -func (s *Service) Init() { +// Init initializes role conversion service. +func (s *Service) Init() error { + log.Info("Init networkinfo service") + + // Bootstrap the DHT. In the default configuration, this spawns a Background + // thread that will refresh the peer table every five minutes. + log.Debug("Bootstrapping the DHT") + if err := s.dht.Bootstrap(s.ctx); err != nil { + return fmt.Errorf("error bootstrap dht") + } + + var wg sync.WaitGroup + for _, peerAddr := range utils.BootNodes { + peerinfo, _ := peerstore.InfoFromP2pAddr(peerAddr) + wg.Add(1) + go func() { + defer wg.Done() + if err := s.Host.GetP2PHost().Connect(s.ctx, *peerinfo); err != nil { + log.Warn("can't connect to bootnode", "error", err) + } else { + log.Info("connected to bootnode", "node", *peerinfo) + } + }() + } + wg.Wait() + + // We use a rendezvous point "shardID" to announce our location. + log.Info("Announcing ourselves...") + s.discovery = libp2pdis.NewRoutingDiscovery(s.dht) + libp2pdis.Advertise(s.ctx, s.discovery, s.Rendezvous) + log.Info("Successfully announced!") + + go s.DoService() + + return nil } // Run runs network info. func (s *Service) Run() { - go func() { - defer close(s.stoppedChan) - for { - select { - default: - utils.GetLogInstance().Info("Running network info") - // TODO: Write some logic here. - s.DoService() - case <-s.stopChan: - return - } - } - }() + defer close(s.stoppedChan) + var err error + s.peerInfo, err = s.discovery.FindPeers(s.ctx, s.Rendezvous) + if err != nil { + log.Error("FindPeers", "error", err) + } } // DoService does network info. func (s *Service) DoService() { - // At the end, send Peer info to peer channel - s.peerChan <- &p2p.Peer{} + for { + select { + case peer, ok := <-s.peerInfo: + if !ok { + log.Debug("no more peer info", "peer", peer.ID) + return + } + if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { + log.Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs) + ip := "127.0.0.1" + var port string + for _, addr := range peer.Addrs { + netaddr, err := manet.ToNetAddr(addr) + if err != nil { + continue + } + nip := netaddr.(*net.TCPAddr).IP.String() + if strings.Compare(nip, "127.0.0.1") != 0 { + ip = nip + port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port) + break + } + } + p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs} + log.Info("Notify peerChan", "peer", p) + s.peerChan <- p + } + case <-s.ctx.Done(): + return + } + } + } // StopService stops network info service. func (s *Service) StopService() { utils.GetLogInstance().Info("Stopping network info service.") + defer s.cancel() + s.stopChan <- struct{}{} <-s.stoppedChan - utils.GetLogInstance().Info("Role conversion stopped.") + utils.GetLogInstance().Info("Network info service stopped.") } diff --git a/api/service/staking/service.go b/api/service/staking/service.go index 0c842f2ff..3ae2634b9 100644 --- a/api/service/staking/service.go +++ b/api/service/staking/service.go @@ -1,6 +1,7 @@ package staking import ( + "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" ) @@ -9,11 +10,11 @@ import ( type Service struct { stopChan chan struct{} stoppedChan chan struct{} - peerChan chan *p2p.Peer + peerChan <-chan p2p.Peer } -// New returns staking service. -func New(peerChan chan *p2p.Peer) *Service { +// NewService returns staking service. +func NewService(peerChan <-chan p2p.Peer) *Service { return &Service{ stopChan: make(chan struct{}), stoppedChan: make(chan struct{}), @@ -23,6 +24,7 @@ func New(peerChan chan *p2p.Peer) *Service { // StartService starts staking service. func (s *Service) StartService() { + log.Info("Start Staking Service") s.Init() s.Run() } @@ -34,15 +36,14 @@ func (s *Service) Init() { // Run runs staking. func (s *Service) Run() { // Wait until peer info of beacon chain is ready. - peer := <-s.peerChan go func() { defer close(s.stoppedChan) for { select { - default: - utils.GetLogInstance().Info("Running staking") + case peer := <-s.peerChan: + utils.GetLogInstance().Info("Running role conversion") // TODO: Write some logic here. - s.DoService(peer) + s.DoService(&peer) case <-s.stopChan: return } @@ -52,6 +53,7 @@ func (s *Service) Run() { // DoService does staking. func (s *Service) DoService(peer *p2p.Peer) { + utils.GetLogInstance().Info("Staking with Peer") } // StopService stops staking service. diff --git a/node/node.go b/node/node.go index 61e5e6e57..3bb620415 100644 --- a/node/node.go +++ b/node/node.go @@ -4,7 +4,6 @@ import ( "bytes" "crypto/ecdsa" "encoding/binary" - "encoding/gob" "encoding/hex" "fmt" "math/big" @@ -28,6 +27,7 @@ import ( blockproposal "github.com/harmony-one/harmony/api/service/blockproposal" "github.com/harmony-one/harmony/api/service/clientsupport" consensus_service "github.com/harmony-one/harmony/api/service/consensus" + "github.com/harmony-one/harmony/api/service/discovery" "github.com/harmony-one/harmony/api/service/explorer" "github.com/harmony-one/harmony/api/service/networkinfo" "github.com/harmony-one/harmony/api/service/staking" @@ -112,12 +112,6 @@ type syncConfig struct { client *downloader.Client } -// NetworkNode ... -type NetworkNode struct { - SelfPeer p2p.Peer - IDCPeer p2p.Peer -} - // Node represents a protocol-participating node in the network type Node struct { Consensus *bft.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits) @@ -132,7 +126,7 @@ type Node struct { ClientPeer *p2p.Peer // The peer for the harmony tx generator client, used for leaders to return proof-of-accept Client *client.Client // The presence of a client object means this node will also act as a client SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. - IDCPeer p2p.Peer + BCPeers []p2p.Peer // list of Beacon Chain Peers. This is needed by all nodes. Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer State State // State of the Node @@ -212,34 +206,6 @@ func (node *Node) countNumTransactionsInBlockchain() int { return count } -// SerializeNode serializes the node -// https://stackoverflow.com/questions/12854125/how-do-i-dump-the-struct-into-the-byte-array-without-reflection/12854659#12854659 -func (node *Node) SerializeNode(nnode *NetworkNode) []byte { - //Needs to escape the serialization of unexported fields - var result bytes.Buffer - encoder := gob.NewEncoder(&result) - err := encoder.Encode(nnode) - if err != nil { - fmt.Println("Could not serialize node") - fmt.Println("ERROR", err) - //utils.GetLogInstance().Error("Could not serialize node") - } - - return result.Bytes() -} - -// DeserializeNode deserializes the node -func DeserializeNode(d []byte) *NetworkNode { - var wn NetworkNode - r := bytes.NewBuffer(d) - decoder := gob.NewDecoder(r) - err := decoder.Decode(&wn) - if err != nil { - log.Error("Could not de-serialize node 1") - } - return &wn -} - // New creates a new node. func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { node := Node{} @@ -692,11 +658,16 @@ func (node *Node) setupForBeaconValidator() { } func (node *Node) setupForNewNode() { - chanPeer := make(chan *p2p.Peer) - // Add network info serivce. - node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(chanPeer)) - // Add staking service. - node.serviceManager.RegisterService(service_manager.Staking, staking.New(chanPeer)) + chanPeer := make(chan p2p.Peer) + stakingPeer := make(chan p2p.Peer) + + // Register staking service. + node.serviceManager.RegisterService(service_manager.Staking, staking.NewService(stakingPeer)) + // Register peer discovery service. + node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, fmt.Sprintf("%v", node.Consensus.ShardID), chanPeer, stakingPeer)) + // Register networkinfo service. + node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.NewService(node.host, fmt.Sprintf("%v", node.Consensus.ShardID), chanPeer)) + } // ServiceManagerSetup setups service store. diff --git a/specs/p2p/peerdiscovery.md b/specs/p2p/peerdiscovery.md index 235626285..1a8768b00 100644 --- a/specs/p2p/peerdiscovery.md +++ b/specs/p2p/peerdiscovery.md @@ -57,3 +57,14 @@ The new node are connected to the two P2P overlay networks. Harmony utilizes libp2p as the underlying networking layer for peer discovery and p2p network transportation. It is still a crucial task to understand the protocol messages and how the libp2p handles the messages. We may need to fork the libp2p to fit our requirement during the development. + +## Two Stages of Peer Discovery +Harmony uses two stages of peer discovery mechanism to form the overlay of p2p networks. +The first stage is to connect to beacon chain to stake and get the shard information. +The second stage is to create the real p2p network within the shard afterwards. +New nodes will always keep the connection to some beacon chain nodes for further communication. +The current implementation works like the following. +* beacon chain nodes bootstrap by contacting bootnodes using discovery rendezvous string "0". Then the beacon chain is formed. +* new nodes contact bootnodes using rendezvous string "0" to connect to beacon chain nodes. +* new nodes use pubsub to stake in beacon chain, and get shard information from beacon chain after the randomness and resharding algorithm. +* new nodes use the new shardID as the rendezvous string to connect to bootnodes again to form new p2p network at the shard level.