[libp2p] init integration of group message by pubsub

Signed-off-by: Leo Chen <leo@harmony.one>
pull/405/head
Leo Chen 6 years ago
parent f3b0432e2d
commit ec5102708e
  1. 18
      api/service/discovery/service.go
  2. 13
      api/service/networkinfo/service.go
  3. 16
      cmd/harmony.go
  4. 27
      node/node.go
  5. 23
      node/node_handler.go
  6. 6
      p2p/group.go

@ -15,7 +15,7 @@ const (
// Service is the struct for discovery service. // Service is the struct for discovery service.
type Service struct { type Service struct {
Host p2p.Host host p2p.Host
Rendezvous string Rendezvous string
peerChan chan p2p.Peer peerChan chan p2p.Peer
stakingChan chan p2p.Peer stakingChan chan p2p.Peer
@ -27,7 +27,7 @@ type Service struct {
// r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network) // r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network)
func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service { func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service {
return &Service{ return &Service{
Host: h, host: h,
Rendezvous: r, Rendezvous: r,
peerChan: peerChan, peerChan: peerChan,
stakingChan: stakingChan, stakingChan: stakingChan,
@ -63,7 +63,7 @@ func (s *Service) contactP2pPeers() {
return return
} }
log.Debug("[DISCOVERY]", "peer", peer) log.Debug("[DISCOVERY]", "peer", peer)
s.Host.AddPeer(&peer) s.host.AddPeer(&peer)
// TODO: stop ping if pinged before // TODO: stop ping if pinged before
// TODO: call staking servcie here if it is a new node // TODO: call staking servcie here if it is a new node
s.pingPeer(peer) s.pingPeer(peer)
@ -79,12 +79,18 @@ func (s *Service) Init() {
} }
func (s *Service) pingPeer(peer p2p.Peer) { func (s *Service) pingPeer(peer p2p.Peer) {
ping := proto_discovery.NewPingMessage(s.Host.GetSelfPeer()) ping := proto_discovery.NewPingMessage(s.host.GetSelfPeer())
buffer := ping.ConstructPingMessage() buffer := ping.ConstructPingMessage()
log.Debug("Sending Ping Message to", "peer", peer) log.Debug("Sending Ping Message to", "peer", peer)
content := host.ConstructP2pMessage(byte(0), buffer) content := host.ConstructP2pMessage(byte(0), buffer)
s.Host.SendMessage(peer, content) // s.host.SendMessage(peer, content)
log.Debug("Sent Ping Message to", "peer", peer) // log.Debug("Sent Ping Message via unicast to", "peer", peer)
err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content)
if err != nil {
log.Error("Failed to send ping message", "group", p2p.GroupIDBeacon)
} else {
log.Debug("Sent Ping Message via group send to", "peer", peer)
}
if s.stakingChan != nil { if s.stakingChan != nil {
s.stakingChan <- peer s.stakingChan <- peer
} }

@ -115,7 +115,18 @@ func (s *Service) DoService() {
return return
} }
if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 {
utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs) utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil {
utils.GetLogInstance().Warn("can't connect to peer node", "error", err)
} else {
utils.GetLogInstance().Info("connected to peer node", "peer", peer)
}
}()
wg.Wait()
ip := "127.0.0.1" ip := "127.0.0.1"
var port string var port string
for _, addr := range peer.Addrs { for _, addr := range peer.Addrs {

@ -236,15 +236,17 @@ func main() {
// Current node. // Current node.
currentNode := node.New(host, consensus, ldb) currentNode := node.New(host, consensus, ldb)
currentNode.Consensus.OfflinePeers = currentNode.OfflinePeers currentNode.Consensus.OfflinePeers = currentNode.OfflinePeers
if role == "leader" { currentNode.Role = node.NewNode
if *isBeacon {
if *isBeacon {
if role == "leader" {
currentNode.Role = node.BeaconLeader currentNode.Role = node.BeaconLeader
} else { } else {
currentNode.Role = node.ShardLeader currentNode.Role = node.BeaconValidator
} }
} else { } else {
if *isBeacon { if role == "leader" {
currentNode.Role = node.BeaconValidator currentNode.Role = node.ShardLeader
} else { } else {
currentNode.Role = node.ShardValidator currentNode.Role = node.ShardValidator
} }
@ -260,9 +262,7 @@ func main() {
consensus.OnConsensusDone = currentNode.PostConsensusProcessing consensus.OnConsensusDone = currentNode.PostConsensusProcessing
currentNode.State = node.NodeWaitToJoin currentNode.State = node.NodeWaitToJoin
if *libp2pPD { if !*libp2pPD {
currentNode.Role = node.NewNode
} else {
if consensus.IsLeader { if consensus.IsLeader {
currentNode.State = node.NodeLeader currentNode.State = node.NodeLeader
} else { } else {

@ -167,6 +167,9 @@ type Node struct {
TestBankKeys []*ecdsa.PrivateKey TestBankKeys []*ecdsa.PrivateKey
ContractKeys []*ecdsa.PrivateKey ContractKeys []*ecdsa.PrivateKey
ContractAddresses []common.Address ContractAddresses []common.Address
// Group Message Receiver
groupReceiver p2p.GroupReceiver
} }
// Blockchain returns the blockchain from node // Blockchain returns the blockchain from node
@ -274,6 +277,9 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
node.OfflinePeers = make(chan p2p.Peer) node.OfflinePeers = make(chan p2p.Peer)
go node.RemovePeersHandler() go node.RemovePeersHandler()
// start the goroutine to receive group message
go node.ReceiveGroupMessage()
return &node return &node
} }
@ -587,6 +593,13 @@ func (node *Node) setupForShardValidator() {
func (node *Node) setupForBeaconLeader() { func (node *Node) setupForBeaconLeader() {
chanPeer := make(chan p2p.Peer) chanPeer := make(chan p2p.Peer)
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
// Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil))
// Register networkinfo service. "0" is the beacon shard ID // Register networkinfo service. "0" is the beacon shard ID
@ -603,6 +616,13 @@ func (node *Node) setupForBeaconLeader() {
func (node *Node) setupForBeaconValidator() { func (node *Node) setupForBeaconValidator() {
chanPeer := make(chan p2p.Peer) chanPeer := make(chan p2p.Peer)
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
// Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil))
// Register networkinfo service. "0" is the beacon shard ID // Register networkinfo service. "0" is the beacon shard ID
@ -613,6 +633,13 @@ func (node *Node) setupForNewNode() {
chanPeer := make(chan p2p.Peer) chanPeer := make(chan p2p.Peer)
stakingPeer := make(chan p2p.Peer) stakingPeer := make(chan p2p.Peer)
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
// Register staking service. // Register staking service.
node.serviceManager.RegisterService(service_manager.Staking, staking.New(stakingPeer)) node.serviceManager.RegisterService(service_manager.Staking, staking.New(stakingPeer))
// Register peer discovery service. "0" is the beacon shard ID // Register peer discovery service. "0" is the beacon shard ID

@ -2,8 +2,10 @@ package node
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"os" "os"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -43,9 +45,28 @@ func (node *Node) StreamHandler(s p2p.Stream) {
utils.GetLogInstance().Error("Read p2p data failed", "err", err, "node", node) utils.GetLogInstance().Error("Read p2p data failed", "err", err, "node", node)
return return
} }
node.messageHandler(content) node.messageHandler(content)
} }
// ReceiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages
func (node *Node) ReceiveGroupMessage() {
ctx := context.Background()
for {
if node.groupReceiver == nil {
time.Sleep(100 * time.Millisecond)
continue
}
msg, sender, err := node.groupReceiver.Receive(ctx)
if sender != node.host.GetID() {
utils.GetLogInstance().Info("[PUBSUB]", "msg size", len(msg), "sender", sender)
if err == nil {
node.messageHandler(msg)
}
}
}
}
// messageHandler parses the message and dispatch the actions // messageHandler parses the message and dispatch the actions
func (node *Node) messageHandler(content []byte) { func (node *Node) messageHandler(content []byte) {
node.MaybeBroadcastAsValidator(content) node.MaybeBroadcastAsValidator(content)
@ -237,7 +258,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
utils.GetLogInstance().Error("Can't get Ping Message") utils.GetLogInstance().Error("Can't get Ping Message")
return -1 return -1
} }
// utils.GetLogInstance().Info("Ping", "Msg", ping) utils.GetLogInstance().Debug("Ping", "Msg", ping)
peer := new(p2p.Peer) peer := new(p2p.Peer)
peer.IP = ping.Node.IP peer.IP = ping.Node.IP

@ -22,6 +22,12 @@ func (id GroupID) String() string {
return fmt.Sprintf("%x", string(id)) return fmt.Sprintf("%x", string(id))
} }
// Const of group ID
const (
GroupIDBeacon GroupID = "harmony/0.0.1/beacon"
GroupIDGlobal GroupID = "harmony/0.0.1/global"
)
// GroupReceiver is a multicast group message receiver interface. // GroupReceiver is a multicast group message receiver interface.
type GroupReceiver interface { type GroupReceiver interface {
// Close closes this receiver. // Close closes this receiver.

Loading…
Cancel
Save