diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index ece4cd823..93f2169c9 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -7,12 +7,6 @@ import ( "github.com/harmony-one/harmony/p2p/host" ) -// Constants for discovery service. -const ( - numIncoming = 128 - numOutgoing = 16 -) - // Service is the struct for discovery service. type Service struct { host p2p.Host diff --git a/cmd/harmony.go b/cmd/harmony.go index 7eb76f70a..4024a25c4 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -275,8 +275,5 @@ func main() { go currentNode.SupportSyncing() currentNode.ServiceManagerSetup() currentNode.RunServices() - - if !*libp2pPD { - currentNode.StartServer() - } + currentNode.StartServer() } diff --git a/node/node.go b/node/node.go index 9b7f1b703..d0acea152 100644 --- a/node/node.go +++ b/node/node.go @@ -206,7 +206,11 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions { // StartServer starts a server and process the requests by a handler. func (node *Node) StartServer() { - node.host.BindHandlerAndServe(node.StreamHandler) + if node.UseLibP2P { + select {} + } else { + node.host.BindHandlerAndServe(node.StreamHandler) + } } // Count the total number of transactions in the blockchain diff --git a/node/node_handler.go b/node/node_handler.go index 8b4236c80..1137de8e2 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -277,6 +277,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { // add to incoming peer list node.host.AddIncomingPeer(*peer) + node.host.ConnectHostPeer(*peer) if ping.Node.Role == proto_node.ClientRole { utils.GetLogInstance().Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer) @@ -329,8 +330,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { return -1 } - utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg", len(msgPayload)) - peers := make([]*p2p.Peer, 0) for _, p := range pong.Peers { @@ -349,6 +348,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { peers = append(peers, peer) } + utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #peers", len(peers)) + if len(peers) > 0 { node.AddPeers(peers) } @@ -369,6 +370,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { publicKeys = append(publicKeys, &key) } + utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #keys", len(publicKeys)) + if node.State == NodeWaitToJoin { node.State = NodeReadyForConsensus // Notify JoinShard to stop sending Ping messages diff --git a/p2p/host.go b/p2p/host.go index 03e76189a..db39f50c1 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -19,6 +19,7 @@ type Host interface { AddIncomingPeer(Peer) AddOutgoingPeer(Peer) + ConnectHostPeer(Peer) // SendMessageToGroups sends a message to one or more multicast groups. SendMessageToGroups(groups []GroupID, msg []byte) error diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index 9a8423c1c..d0a8624a6 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "io" + "sync" "github.com/ethereum/go-ethereum/log" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" libp2p "github.com/libp2p/go-libp2p" @@ -25,6 +27,10 @@ const ( BatchSizeInByte = 1 << 16 // ProtocolID The ID of protocol used in stream handling. ProtocolID = "/harmony/0.0.1" + + // Constants for discovery service. + numIncoming = 128 + numOutgoing = 16 ) // PubSub captures the pubsub interface we expect from libp2p. @@ -223,3 +229,30 @@ func (host *HostV2) Close() error { func (host *HostV2) GetP2PHost() p2p_host.Host { return host.h } + +// ConnectHostPeer connects to peer host +func (host *HostV2) ConnectHostPeer(peer p2p.Peer) { + ctx := context.Background() + addr := fmt.Sprintf("/ip4/%s/tcp/%s/ipfs/%s", peer.IP, peer.Port, peer.PeerID.Pretty()) + peerAddr, err := ma.NewMultiaddr(addr) + if err != nil { + utils.GetLogInstance().Error("ConnectHostPeer", "new ma error", err, "peer", peer) + return + } + peerInfo, err := peerstore.InfoFromP2pAddr(peerAddr) + if err != nil { + utils.GetLogInstance().Error("ConnectHostPeer", "new peerinfo error", err, "peer", peer) + return + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := host.h.Connect(ctx, *peerInfo); err != nil { + utils.GetLogInstance().Warn("can't connect to peer", "error", err, "peer", peer) + } else { + utils.GetLogInstance().Info("connected to peer host", "node", *peerInfo) + } + }() + wg.Wait() +}