add connecthostpeer function

pubsub requires pre-exist connection among peers before the message can
be sent via pubsub

Signed-off-by: Leo Chen <leo@harmony.one>
pull/405/head
Leo Chen 6 years ago
parent 715962ad87
commit 98596f4f21
  1. 6
      api/service/discovery/service.go
  2. 3
      cmd/harmony.go
  3. 4
      node/node.go
  4. 7
      node/node_handler.go
  5. 1
      p2p/host.go
  6. 33
      p2p/host/hostv2/hostv2.go

@ -7,12 +7,6 @@ import (
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
) )
// Constants for discovery service.
const (
numIncoming = 128
numOutgoing = 16
)
// Service is the struct for discovery service. // Service is the struct for discovery service.
type Service struct { type Service struct {
host p2p.Host host p2p.Host

@ -275,8 +275,5 @@ func main() {
go currentNode.SupportSyncing() go currentNode.SupportSyncing()
currentNode.ServiceManagerSetup() currentNode.ServiceManagerSetup()
currentNode.RunServices() currentNode.RunServices()
if !*libp2pPD {
currentNode.StartServer() currentNode.StartServer()
} }
}

@ -206,8 +206,12 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions {
// StartServer starts a server and process the requests by a handler. // StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() { func (node *Node) StartServer() {
if node.UseLibP2P {
select {}
} else {
node.host.BindHandlerAndServe(node.StreamHandler) node.host.BindHandlerAndServe(node.StreamHandler)
} }
}
// Count the total number of transactions in the blockchain // Count the total number of transactions in the blockchain
// Currently used for stats reporting purpose // Currently used for stats reporting purpose

@ -277,6 +277,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
// add to incoming peer list // add to incoming peer list
node.host.AddIncomingPeer(*peer) node.host.AddIncomingPeer(*peer)
node.host.ConnectHostPeer(*peer)
if ping.Node.Role == proto_node.ClientRole { if ping.Node.Role == proto_node.ClientRole {
utils.GetLogInstance().Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer) utils.GetLogInstance().Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer)
@ -329,8 +330,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return -1 return -1
} }
utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg", len(msgPayload))
peers := make([]*p2p.Peer, 0) peers := make([]*p2p.Peer, 0)
for _, p := range pong.Peers { for _, p := range pong.Peers {
@ -349,6 +348,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
peers = append(peers, peer) peers = append(peers, peer)
} }
utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #peers", len(peers))
if len(peers) > 0 { if len(peers) > 0 {
node.AddPeers(peers) node.AddPeers(peers)
} }
@ -369,6 +370,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
publicKeys = append(publicKeys, &key) publicKeys = append(publicKeys, &key)
} }
utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #keys", len(publicKeys))
if node.State == NodeWaitToJoin { if node.State == NodeWaitToJoin {
node.State = NodeReadyForConsensus node.State = NodeReadyForConsensus
// Notify JoinShard to stop sending Ping messages // Notify JoinShard to stop sending Ping messages

@ -19,6 +19,7 @@ type Host interface {
AddIncomingPeer(Peer) AddIncomingPeer(Peer)
AddOutgoingPeer(Peer) AddOutgoingPeer(Peer)
ConnectHostPeer(Peer)
// SendMessageToGroups sends a message to one or more multicast groups. // SendMessageToGroups sends a message to one or more multicast groups.
SendMessageToGroups(groups []GroupID, msg []byte) error SendMessageToGroups(groups []GroupID, msg []byte) error

@ -4,8 +4,10 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sync"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
libp2p "github.com/libp2p/go-libp2p" libp2p "github.com/libp2p/go-libp2p"
@ -25,6 +27,10 @@ const (
BatchSizeInByte = 1 << 16 BatchSizeInByte = 1 << 16
// ProtocolID The ID of protocol used in stream handling. // ProtocolID The ID of protocol used in stream handling.
ProtocolID = "/harmony/0.0.1" ProtocolID = "/harmony/0.0.1"
// Constants for discovery service.
numIncoming = 128
numOutgoing = 16
) )
// PubSub captures the pubsub interface we expect from libp2p. // PubSub captures the pubsub interface we expect from libp2p.
@ -223,3 +229,30 @@ func (host *HostV2) Close() error {
func (host *HostV2) GetP2PHost() p2p_host.Host { func (host *HostV2) GetP2PHost() p2p_host.Host {
return host.h return host.h
} }
// ConnectHostPeer connects to peer host
func (host *HostV2) ConnectHostPeer(peer p2p.Peer) {
ctx := context.Background()
addr := fmt.Sprintf("/ip4/%s/tcp/%s/ipfs/%s", peer.IP, peer.Port, peer.PeerID.Pretty())
peerAddr, err := ma.NewMultiaddr(addr)
if err != nil {
utils.GetLogInstance().Error("ConnectHostPeer", "new ma error", err, "peer", peer)
return
}
peerInfo, err := peerstore.InfoFromP2pAddr(peerAddr)
if err != nil {
utils.GetLogInstance().Error("ConnectHostPeer", "new peerinfo error", err, "peer", peer)
return
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := host.h.Connect(ctx, *peerInfo); err != nil {
utils.GetLogInstance().Warn("can't connect to peer", "error", err, "peer", peer)
} else {
utils.GetLogInstance().Info("connected to peer host", "node", *peerInfo)
}
}()
wg.Wait()
}

Loading…
Cancel
Save