diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index d00c56663..83c8d0dc2 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -15,7 +15,7 @@ const ( // Service is the struct for discovery service. type Service struct { - Host p2p.Host + host p2p.Host Rendezvous string peerChan chan p2p.Peer stakingChan chan p2p.Peer @@ -27,7 +27,7 @@ type Service struct { // r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network) func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service { return &Service{ - Host: h, + host: h, Rendezvous: r, peerChan: peerChan, stakingChan: stakingChan, @@ -63,7 +63,7 @@ func (s *Service) contactP2pPeers() { return } log.Debug("[DISCOVERY]", "peer", peer) - s.Host.AddPeer(&peer) + s.host.AddPeer(&peer) // TODO: stop ping if pinged before // TODO: call staking servcie here if it is a new node s.pingPeer(peer) @@ -79,12 +79,18 @@ func (s *Service) Init() { } func (s *Service) pingPeer(peer p2p.Peer) { - ping := proto_discovery.NewPingMessage(s.Host.GetSelfPeer()) + ping := proto_discovery.NewPingMessage(s.host.GetSelfPeer()) buffer := ping.ConstructPingMessage() log.Debug("Sending Ping Message to", "peer", peer) content := host.ConstructP2pMessage(byte(0), buffer) - s.Host.SendMessage(peer, content) - log.Debug("Sent Ping Message to", "peer", peer) + // s.host.SendMessage(peer, content) + // log.Debug("Sent Ping Message via unicast to", "peer", peer) + err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content) + if err != nil { + log.Error("Failed to send ping message", "group", p2p.GroupIDBeacon) + } else { + log.Debug("Sent Ping Message via group send to", "peer", peer) + } if s.stakingChan != nil { s.stakingChan <- peer } diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index d3d2cd70f..218090292 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -115,7 +115,18 @@ func (s *Service) DoService() { return } if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { - utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs) + utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil { + utils.GetLogInstance().Warn("can't connect to peer node", "error", err) + } else { + utils.GetLogInstance().Info("connected to peer node", "peer", peer) + } + }() + wg.Wait() ip := "127.0.0.1" var port string for _, addr := range peer.Addrs { diff --git a/cmd/harmony.go b/cmd/harmony.go index e62d760f1..886a55343 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -236,15 +236,17 @@ func main() { // Current node. currentNode := node.New(host, consensus, ldb) currentNode.Consensus.OfflinePeers = currentNode.OfflinePeers - if role == "leader" { - if *isBeacon { + currentNode.Role = node.NewNode + + if *isBeacon { + if role == "leader" { currentNode.Role = node.BeaconLeader } else { - currentNode.Role = node.ShardLeader + currentNode.Role = node.BeaconValidator } } else { - if *isBeacon { - currentNode.Role = node.BeaconValidator + if role == "leader" { + currentNode.Role = node.ShardLeader } else { currentNode.Role = node.ShardValidator } @@ -260,9 +262,7 @@ func main() { consensus.OnConsensusDone = currentNode.PostConsensusProcessing currentNode.State = node.NodeWaitToJoin - if *libp2pPD { - currentNode.Role = node.NewNode - } else { + if !*libp2pPD { if consensus.IsLeader { currentNode.State = node.NodeLeader } else { diff --git a/node/node.go b/node/node.go index 4838f567f..4e511f069 100644 --- a/node/node.go +++ b/node/node.go @@ -167,6 +167,9 @@ type Node struct { TestBankKeys []*ecdsa.PrivateKey ContractKeys []*ecdsa.PrivateKey ContractAddresses []common.Address + + // Group Message Receiver + groupReceiver p2p.GroupReceiver } // Blockchain returns the blockchain from node @@ -274,6 +277,9 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { node.OfflinePeers = make(chan p2p.Peer) go node.RemovePeersHandler() + // start the goroutine to receive group message + go node.ReceiveGroupMessage() + return &node } @@ -587,6 +593,13 @@ func (node *Node) setupForShardValidator() { func (node *Node) setupForBeaconLeader() { chanPeer := make(chan p2p.Peer) + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) // Register networkinfo service. "0" is the beacon shard ID @@ -603,6 +616,13 @@ func (node *Node) setupForBeaconLeader() { func (node *Node) setupForBeaconValidator() { chanPeer := make(chan p2p.Peer) + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) // Register networkinfo service. "0" is the beacon shard ID @@ -613,6 +633,13 @@ func (node *Node) setupForNewNode() { chanPeer := make(chan p2p.Peer) stakingPeer := make(chan p2p.Peer) + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + // Register staking service. node.serviceManager.RegisterService(service_manager.Staking, staking.New(stakingPeer)) // Register peer discovery service. "0" is the beacon shard ID diff --git a/node/node_handler.go b/node/node_handler.go index 5e3716017..2504c0032 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -2,8 +2,10 @@ package node import ( "bytes" + "context" "fmt" "os" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" @@ -43,9 +45,28 @@ func (node *Node) StreamHandler(s p2p.Stream) { utils.GetLogInstance().Error("Read p2p data failed", "err", err, "node", node) return } + node.messageHandler(content) } +// ReceiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages +func (node *Node) ReceiveGroupMessage() { + ctx := context.Background() + for { + if node.groupReceiver == nil { + time.Sleep(100 * time.Millisecond) + continue + } + msg, sender, err := node.groupReceiver.Receive(ctx) + if sender != node.host.GetID() { + utils.GetLogInstance().Info("[PUBSUB]", "msg size", len(msg), "sender", sender) + if err == nil { + node.messageHandler(msg) + } + } + } +} + // messageHandler parses the message and dispatch the actions func (node *Node) messageHandler(content []byte) { node.MaybeBroadcastAsValidator(content) @@ -237,7 +258,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { utils.GetLogInstance().Error("Can't get Ping Message") return -1 } - // utils.GetLogInstance().Info("Ping", "Msg", ping) + utils.GetLogInstance().Debug("Ping", "Msg", ping) peer := new(p2p.Peer) peer.IP = ping.Node.IP diff --git a/p2p/group.go b/p2p/group.go index 291d19a2f..5fcf6c690 100644 --- a/p2p/group.go +++ b/p2p/group.go @@ -22,6 +22,12 @@ func (id GroupID) String() string { return fmt.Sprintf("%x", string(id)) } +// Const of group ID +const ( + GroupIDBeacon GroupID = "harmony/0.0.1/beacon" + GroupIDGlobal GroupID = "harmony/0.0.1/global" +) + // GroupReceiver is a multicast group message receiver interface. type GroupReceiver interface { // Close closes this receiver.