Merge pull request #513 from LeoHChen/beaconpeer

Add beacon peer for every node
pull/514/head
Leo Chen 6 years ago committed by GitHub
commit 08b4e11ba0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      api/service/discovery/discovery_test.go
  2. 44
      api/service/discovery/service.go
  3. 2
      api/service/networkinfo/service.go
  4. 18
      node/node.go
  5. 29
      node/node_syncing.go
  6. 47
      node/node_test.go
  7. 12
      node/service_setup.go

@ -26,7 +26,7 @@ func TestDiscoveryService(t *testing.T) {
config := service.NodeConfig{} config := service.NodeConfig{}
dService = New(host, config, nil) dService = New(host, config, nil, nil)
if dService == nil { if dService == nil {
t.Fatalf("unable to create new discovery service") t.Fatalf("unable to create new discovery service")

@ -14,27 +14,29 @@ import (
// Service is the struct for discovery service. // Service is the struct for discovery service.
type Service struct { type Service struct {
host p2p.Host host p2p.Host
peerChan chan p2p.Peer peerChan chan p2p.Peer
stopChan chan struct{} stopChan chan struct{}
actionChan chan p2p.GroupAction actionChan chan p2p.GroupAction
config service.NodeConfig config service.NodeConfig
actions map[p2p.GroupID]p2p.ActionType actions map[p2p.GroupID]p2p.ActionType
messageChan chan *msg_pb.Message messageChan chan *msg_pb.Message
addBeaconPeerFunc func(*p2p.Peer) bool
} }
// New returns discovery service. // New returns discovery service.
// h is the p2p host // h is the p2p host
// config is the node config // config is the node config
// (TODO: leo, build two overlays of network) // (TODO: leo, build two overlays of network)
func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer) *Service { func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer, addPeer func(*p2p.Peer) bool) *Service {
return &Service{ return &Service{
host: h, host: h,
peerChan: peerChan, peerChan: peerChan,
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
actionChan: make(chan p2p.GroupAction), actionChan: make(chan p2p.GroupAction),
config: config, config: config,
actions: make(map[p2p.GroupID]p2p.ActionType), actions: make(map[p2p.GroupID]p2p.ActionType),
addBeaconPeerFunc: addPeer,
} }
} }
@ -83,13 +85,19 @@ func (s *Service) contactP2pPeers() {
select { select {
case peer, ok := <-s.peerChan: case peer, ok := <-s.peerChan:
if !ok { if !ok {
utils.GetLogInstance().Debug("end of info", "peer", peer.PeerID) utils.GetLogInstance().Debug("[DISCOVERY] No More Peer!")
break break
} }
s.host.AddPeer(&peer) // TODO (leo) this one assumes all peers received in the channel are beacon chain node
// It is just a temporary hack. When we work on re-sharding to regular shard, this has to be changed.
if !s.config.IsBeacon {
if s.addBeaconPeerFunc != nil {
s.addBeaconPeerFunc(&peer)
}
}
// Add to outgoing peer list // Add to outgoing peer list
//s.host.AddOutgoingPeer(peer) // s.host.AddOutgoingPeer(peer)
utils.GetLogInstance().Debug("[DISCOVERY]", "add outgoing peer", peer) // utils.GetLogInstance().Debug("[DISCOVERY]", "add outgoing peer", peer)
case <-s.stopChan: case <-s.stopChan:
utils.GetLogInstance().Debug("[DISCOVERY] stop pinging ...") utils.GetLogInstance().Debug("[DISCOVERY] stop pinging ...")
return return

@ -112,7 +112,7 @@ func (s *Service) DoService() {
select { select {
case peer := <-s.peerInfo: case peer := <-s.peerInfo:
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 {
utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) // utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID())
if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil { if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil {
utils.GetLogInstance().Warn("can't connect to peer node", "error", err) utils.GetLogInstance().Warn("can't connect to peer node", "error", err)
} else { } else {

@ -122,6 +122,9 @@ type Node struct {
State State // State of the Node State State // State of the Node
stateMutex sync.Mutex // mutex for change node state stateMutex sync.Mutex // mutex for change node state
// BeaconNeighbors store only neighbor nodes in the beacon chain shard
BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
TxPool *core.TxPool TxPool *core.TxPool
Worker *worker.Worker Worker *worker.Worker
BeaconWorker *worker.Worker // worker for beacon chain BeaconWorker *worker.Worker // worker for beacon chain
@ -290,10 +293,10 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N
func (node *Node) AddPeers(peers []*p2p.Peer) int { func (node *Node) AddPeers(peers []*p2p.Peer) int {
count := 0 count := 0
for _, p := range peers { for _, p := range peers {
key := fmt.Sprintf("%v", p.PubKey) key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID)
_, ok := node.Neighbors.Load(key) _, ok := node.Neighbors.LoadOrStore(key, *p)
if !ok { if !ok {
node.Neighbors.Store(key, *p) // !ok means new peer is stored
count++ count++
node.host.AddPeer(p) node.host.AddPeer(p)
continue continue
@ -311,6 +314,15 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int {
return count return count
} }
// AddBeaconPeer adds beacon chain neighbors nodes
// Return false means new neighbor peer was added
// Return true means redundant neighbor peer wasn't added
func (node *Node) AddBeaconPeer(p *p2p.Peer) bool {
key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID)
_, ok := node.BeaconNeighbors.LoadOrStore(key, *p)
return ok
}
// CalculateResponse implements DownloadInterface on Node object. // CalculateResponse implements DownloadInterface on Node object.
func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) { func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) {
response := &downloader_pb.DownloaderResponse{} response := &downloader_pb.DownloaderResponse{}

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -32,21 +33,11 @@ func getPeerFromIPandPort(ip, port string) p2p.Peer {
return p2p.Peer{IP: ip, Port: port, PeerID: peerID} return p2p.Peer{IP: ip, Port: port, PeerID: peerID}
} }
// GetBeaconSyncingPeers returns a list of peers for beaconchain syncing // getNeighborPeers is a helper function to return list of peers
// TODO: currently hard coded // based on different neightbor map
func (node *Node) GetBeaconSyncingPeers() []p2p.Peer { func (node *Node) getNeighborPeers(neighbor sync.Map) []p2p.Peer {
p1 := getPeerFromIPandPort("127.0.0.1", "6001")
p2 := getPeerFromIPandPort("127.0.0.1", "6002")
p3 := getPeerFromIPandPort("127.0.0.1", "6003")
p4 := getPeerFromIPandPort("127.0.0.1", "6004")
p5 := getPeerFromIPandPort("127.0.0.1", "6005")
return []p2p.Peer{p1, p2, p3, p4, p5}
}
// GetSyncingPeers returns list of peers.
func (node *Node) GetSyncingPeers() []p2p.Peer {
res := []p2p.Peer{} res := []p2p.Peer{}
node.Neighbors.Range(func(k, v interface{}) bool { neighbor.Range(func(k, v interface{}) bool {
res = append(res, v.(p2p.Peer)) res = append(res, v.(p2p.Peer))
return true return true
}) })
@ -65,6 +56,16 @@ func (node *Node) GetSyncingPeers() []p2p.Peer {
return res return res
} }
// GetBeaconSyncingPeers returns a list of peers for beaconchain syncing
func (node *Node) GetBeaconSyncingPeers() []p2p.Peer {
return node.getNeighborPeers(node.BeaconNeighbors)
}
// GetSyncingPeers returns list of peers for regular shard syncing.
func (node *Node) GetSyncingPeers() []p2p.Peer {
return node.getNeighborPeers(node.Neighbors)
}
// DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks // DoBeaconSyncing update received beaconchain blocks and downloads missing beacon chain blocks
func (node *Node) DoBeaconSyncing() { func (node *Node) DoBeaconSyncing() {
for { for {

@ -110,6 +110,53 @@ func TestAddPeers(t *testing.T) {
} }
} }
func TestAddBeaconPeer(t *testing.T) {
_, pubKey1 := utils.GenKey("127.0.0.1", "2000")
_, pubKey2 := utils.GenKey("127.0.0.1", "8000")
peers1 := []*p2p.Peer{
&p2p.Peer{
IP: "127.0.0.1",
Port: "8888",
PubKey: pubKey1,
ValidatorID: 1,
PeerID: "1234",
},
&p2p.Peer{
IP: "127.0.0.1",
Port: "9999",
PubKey: pubKey2,
ValidatorID: 2,
PeerID: "4567",
},
}
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8982", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "8985"}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
dRand := drand.New(host, "0", []p2p.Peer{leader, validator}, leader, nil, true)
node := New(host, consensus, nil)
node.DRand = dRand
for _, p := range peers1 {
ret := node.AddBeaconPeer(p)
if ret {
t.Errorf("AddBeaconPeer Failed, expecting false, got %v, peer %v", ret, p)
}
}
for _, p := range peers1 {
ret := node.AddBeaconPeer(p)
if !ret {
t.Errorf("AddBeaconPeer Failed, expecting true, got %v, peer %v", ret, p)
}
}
}
func sendPingMessage(node *Node, leader p2p.Peer) { func sendPingMessage(node *Node, leader p2p.Peer) {
pubKey1 := pki.GetBLSPrivateKeyFromInt(333).GetPublicKey() pubKey1 := pki.GetBLSPrivateKeyFromInt(333).GetPublicKey()

@ -25,7 +25,7 @@ func (node *Node) setupForShardLeader() {
nodeConfig, chanPeer := node.initNodeConfiguration() nodeConfig, chanPeer := node.initNodeConfiguration()
// Register peer discovery service. No need to do staking for beacon chain node. // Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID // Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
@ -45,7 +45,7 @@ func (node *Node) setupForShardValidator() {
nodeConfig, chanPeer := node.initNodeConfiguration() nodeConfig, chanPeer := node.initNodeConfiguration()
// Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID // Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
} }
@ -54,7 +54,7 @@ func (node *Node) setupForBeaconLeader() {
nodeConfig, chanPeer := node.initBeaconNodeConfiguration() nodeConfig, chanPeer := node.initBeaconNodeConfiguration()
// Register peer discovery service. No need to do staking for beacon chain node. // Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil))
// Register networkinfo service. // Register networkinfo service.
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
// Register consensus service. // Register consensus service.
@ -71,7 +71,7 @@ func (node *Node) setupForBeaconValidator() {
nodeConfig, chanPeer := node.initBeaconNodeConfiguration() nodeConfig, chanPeer := node.initBeaconNodeConfiguration()
// Register peer discovery service. No need to do staking for beacon chain node. // Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil))
// Register networkinfo service. // Register networkinfo service.
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
} }
@ -82,7 +82,7 @@ func (node *Node) setupForNewNode() {
// Register staking service. // Register staking service.
node.serviceManager.RegisterService(service.Staking, staking.New(node.host, node.AccountKey, 0, node.beaconChain)) node.serviceManager.RegisterService(service.Staking, staking.New(node.host, node.AccountKey, 0, node.beaconChain))
// Register peer discovery service. "0" is the beacon shard ID // Register peer discovery service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID // Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
@ -93,7 +93,7 @@ func (node *Node) setupForClientNode() {
nodeConfig, chanPeer := node.initNodeConfiguration() nodeConfig, chanPeer := node.initNodeConfiguration()
// Register peer discovery service. // Register peer discovery service.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil))
// Register networkinfo service. "0" is the beacon shard ID // Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
} }

Loading…
Cancel
Save