diff --git a/api/service/discovery/discovery_test.go b/api/service/discovery/discovery_test.go index 030f83c43..e9f3043c9 100644 --- a/api/service/discovery/discovery_test.go +++ b/api/service/discovery/discovery_test.go @@ -26,7 +26,7 @@ func TestDiscoveryService(t *testing.T) { config := service.NodeConfig{} - dService = New(host, config, nil) + dService = New(host, config, nil, nil) if dService == nil { t.Fatalf("unable to create new discovery service") diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index 3e07c696c..293080ef3 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -14,27 +14,29 @@ import ( // Service is the struct for discovery service. type Service struct { - host p2p.Host - peerChan chan p2p.Peer - stopChan chan struct{} - actionChan chan p2p.GroupAction - config service.NodeConfig - actions map[p2p.GroupID]p2p.ActionType - messageChan chan *msg_pb.Message + host p2p.Host + peerChan chan p2p.Peer + stopChan chan struct{} + actionChan chan p2p.GroupAction + config service.NodeConfig + actions map[p2p.GroupID]p2p.ActionType + messageChan chan *msg_pb.Message + addBeaconPeerFunc func(*p2p.Peer) bool } // New returns discovery service. // h is the p2p host // config is the node config // (TODO: leo, build two overlays of network) -func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer) *Service { +func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer, addPeer func(*p2p.Peer) bool) *Service { return &Service{ - host: h, - peerChan: peerChan, - stopChan: make(chan struct{}), - actionChan: make(chan p2p.GroupAction), - config: config, - actions: make(map[p2p.GroupID]p2p.ActionType), + host: h, + peerChan: peerChan, + stopChan: make(chan struct{}), + actionChan: make(chan p2p.GroupAction), + config: config, + actions: make(map[p2p.GroupID]p2p.ActionType), + addBeaconPeerFunc: addPeer, } } @@ -83,13 +85,19 @@ func (s *Service) contactP2pPeers() { select { case peer, ok := <-s.peerChan: if !ok { - utils.GetLogInstance().Debug("end of info", "peer", peer.PeerID) + utils.GetLogInstance().Debug("[DISCOVERY] No More Peer!") break } - s.host.AddPeer(&peer) + // TODO (leo) this one assumes all peers received in the channel are beacon chain node + // It is just a temporary hack. When we work on re-sharding to regular shard, this has to be changed. + if !s.config.IsBeacon { + if s.addBeaconPeerFunc != nil { + s.addBeaconPeerFunc(&peer) + } + } // Add to outgoing peer list - //s.host.AddOutgoingPeer(peer) - utils.GetLogInstance().Debug("[DISCOVERY]", "add outgoing peer", peer) + // s.host.AddOutgoingPeer(peer) + // utils.GetLogInstance().Debug("[DISCOVERY]", "add outgoing peer", peer) case <-s.stopChan: utils.GetLogInstance().Debug("[DISCOVERY] stop pinging ...") return diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index e2ec4deca..336ecb813 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -112,7 +112,7 @@ func (s *Service) DoService() { select { case peer := <-s.peerInfo: if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { - utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) + // utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil { utils.GetLogInstance().Warn("can't connect to peer node", "error", err) } else { diff --git a/node/node.go b/node/node.go index 967dacdaf..c36a03a8b 100644 --- a/node/node.go +++ b/node/node.go @@ -122,6 +122,9 @@ type Node struct { State State // State of the Node stateMutex sync.Mutex // mutex for change node state + // BeaconNeighbors store only neighbor nodes in the beacon chain shard + BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer + TxPool *core.TxPool Worker *worker.Worker BeaconWorker *worker.Worker // worker for beacon chain @@ -290,10 +293,10 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N func (node *Node) AddPeers(peers []*p2p.Peer) int { count := 0 for _, p := range peers { - key := fmt.Sprintf("%v", p.PubKey) - _, ok := node.Neighbors.Load(key) + key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID) + _, ok := node.Neighbors.LoadOrStore(key, *p) if !ok { - node.Neighbors.Store(key, *p) + // !ok means new peer is stored count++ node.host.AddPeer(p) continue @@ -311,6 +314,15 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int { return count } +// AddBeaconPeer adds beacon chain neighbors nodes +// Return false means new neighbor peer was added +// Return true means redundant neighbor peer wasn't added +func (node *Node) AddBeaconPeer(p *p2p.Peer) bool { + key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID) + _, ok := node.BeaconNeighbors.LoadOrStore(key, *p) + return ok +} + // CalculateResponse implements DownloadInterface on Node object. func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) { response := &downloader_pb.DownloaderResponse{} diff --git a/node/node_syncing.go b/node/node_syncing.go index 0cc534823..c41831106 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strconv" + "sync" "time" "github.com/ethereum/go-ethereum/rlp" @@ -32,21 +33,11 @@ func getPeerFromIPandPort(ip, port string) p2p.Peer { return p2p.Peer{IP: ip, Port: port, PeerID: peerID} } -// GetBeaconSyncingPeers returns a list of peers for beaconchain syncing -// TODO: currently hard coded -func (node *Node) GetBeaconSyncingPeers() []p2p.Peer { - p1 := getPeerFromIPandPort("127.0.0.1", "6001") - p2 := getPeerFromIPandPort("127.0.0.1", "6002") - p3 := getPeerFromIPandPort("127.0.0.1", "6003") - p4 := getPeerFromIPandPort("127.0.0.1", "6004") - p5 := getPeerFromIPandPort("127.0.0.1", "6005") - return []p2p.Peer{p1, p2, p3, p4, p5} -} - -// GetSyncingPeers returns list of peers. -func (node *Node) GetSyncingPeers() []p2p.Peer { +// getNeighborPeers is a helper function to return list of peers +// based on different neightbor map +func (node *Node) getNeighborPeers(neighbor sync.Map) []p2p.Peer { res := []p2p.Peer{} - node.Neighbors.Range(func(k, v interface{}) bool { + neighbor.Range(func(k, v interface{}) bool { res = append(res, v.(p2p.Peer)) return true }) @@ -65,6 +56,16 @@ func (node *Node) GetSyncingPeers() []p2p.Peer { return res } +// GetBeaconSyncingPeers returns a list of peers for beaconchain syncing +func (node *Node) GetBeaconSyncingPeers() []p2p.Peer { + return node.getNeighborPeers(node.BeaconNeighbors) +} + +// GetSyncingPeers returns list of peers for regular shard syncing. +func (node *Node) GetSyncingPeers() []p2p.Peer { + return node.getNeighborPeers(node.Neighbors) +} + // DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks func (node *Node) DoBeaconSyncing() { for { diff --git a/node/node_test.go b/node/node_test.go index deb221c38..1042dfcea 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -110,6 +110,53 @@ func TestAddPeers(t *testing.T) { } } +func TestAddBeaconPeer(t *testing.T) { + _, pubKey1 := utils.GenKey("127.0.0.1", "2000") + _, pubKey2 := utils.GenKey("127.0.0.1", "8000") + + peers1 := []*p2p.Peer{ + &p2p.Peer{ + IP: "127.0.0.1", + Port: "8888", + PubKey: pubKey1, + ValidatorID: 1, + PeerID: "1234", + }, + &p2p.Peer{ + IP: "127.0.0.1", + Port: "9999", + PubKey: pubKey2, + ValidatorID: 2, + PeerID: "4567", + }, + } + _, pubKey := utils.GenKey("1", "2") + leader := p2p.Peer{IP: "127.0.0.1", Port: "8982", PubKey: pubKey} + validator := p2p.Peer{IP: "127.0.0.1", Port: "8985"} + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } + consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) + dRand := drand.New(host, "0", []p2p.Peer{leader, validator}, leader, nil, true) + + node := New(host, consensus, nil) + node.DRand = dRand + for _, p := range peers1 { + ret := node.AddBeaconPeer(p) + if ret { + t.Errorf("AddBeaconPeer Failed, expecting false, got %v, peer %v", ret, p) + } + } + for _, p := range peers1 { + ret := node.AddBeaconPeer(p) + if !ret { + t.Errorf("AddBeaconPeer Failed, expecting true, got %v, peer %v", ret, p) + } + } +} + func sendPingMessage(node *Node, leader p2p.Peer) { pubKey1 := pki.GetBLSPrivateKeyFromInt(333).GetPublicKey() diff --git a/node/service_setup.go b/node/service_setup.go index abc35c275..7125b0118 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -25,7 +25,7 @@ func (node *Node) setupForShardLeader() { nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) @@ -45,7 +45,7 @@ func (node *Node) setupForShardValidator() { nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) } @@ -54,7 +54,7 @@ func (node *Node) setupForBeaconLeader() { nodeConfig, chanPeer := node.initBeaconNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) // Register networkinfo service. node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) // Register consensus service. @@ -71,7 +71,7 @@ func (node *Node) setupForBeaconValidator() { nodeConfig, chanPeer := node.initBeaconNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) // Register networkinfo service. node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) } @@ -82,7 +82,7 @@ func (node *Node) setupForNewNode() { // Register staking service. node.serviceManager.RegisterService(service.Staking, staking.New(node.host, node.AccountKey, 0, node.beaconChain)) // Register peer discovery service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) @@ -93,7 +93,7 @@ func (node *Node) setupForClientNode() { nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) }