add addPeer function support in p2p

This is mainly for hostv2 libp2p support.

addPeer will add new peer into the peerstore.

NewHost function add Identity support for libp2p after generating
keypair. The PeerID is generated from the public key, will be used to
find the right PeerInfo for libp2p communication.

Signed-off-by: Leo Chen <leo@harmony.one>
pull/280/head
Leo Chen 6 years ago
parent a93672449b
commit fcbf5c255a
  1. 2
      p2p/helper.go
  2. 3
      p2p/host/host.go
  3. 15
      p2p/host/hostv1/hostv1.go
  4. 70
      p2p/host/hostv2/hostv2.go
  5. 20
      p2p/p2p.go
  6. 46
      p2p/p2pimpl/p2pimpl.go

@ -39,7 +39,7 @@ func ReadMessageContent(s Stream) ([]byte, error) {
_, err := r.ReadByte() _, err := r.ReadByte()
switch err { switch err {
case io.EOF: case io.EOF:
log.Error("Error reading the p2p message type field", "msg", err) log.Error("Error reading the p2p message type field", "io.EOF", err)
return contentBuf.Bytes(), err return contentBuf.Bytes(), err
case nil: case nil:
//log.Printf("Received p2p message type: %x\n", msgType) //log.Printf("Received p2p message type: %x\n", msgType)

@ -2,6 +2,7 @@ package host
import ( import (
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer"
) )
// Host is the client + server in p2p network. // Host is the client + server in p2p network.
@ -10,4 +11,6 @@ type Host interface {
SendMessage(p2p.Peer, []byte) error SendMessage(p2p.Peer, []byte) error
BindHandlerAndServe(handler p2p.StreamHandler) BindHandlerAndServe(handler p2p.StreamHandler)
Close() error Close() error
AddPeer(*p2p.Peer) error
GetID() peer.ID
} }

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer"
) )
// HostV1 is the version 1 p2p host, using direct socket call. // HostV1 is the version 1 p2p host, using direct socket call.
@ -17,10 +18,15 @@ type HostV1 struct {
quit chan struct{} quit chan struct{}
} }
// AddPeer do nothing
func (host *HostV1) AddPeer(p *p2p.Peer) error {
return nil
}
// New creates a HostV1 // New creates a HostV1
func New(self p2p.Peer) *HostV1 { func New(self *p2p.Peer) *HostV1 {
h := &HostV1{ h := &HostV1{
self: self, self: *self,
quit: make(chan struct{}, 1), quit: make(chan struct{}, 1),
} }
return h return h
@ -31,6 +37,11 @@ func (host *HostV1) GetSelfPeer() p2p.Peer {
return host.self return host.self
} }
// GetID return ID
func (host *HostV1) GetID() peer.ID {
return peer.ID(fmt.Sprintf("%s:%s", host.self.IP, host.self.Port))
}
// BindHandlerAndServe Version 0 p2p. Going to be deprecated. // BindHandlerAndServe Version 0 p2p. Going to be deprecated.
func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) {
port := host.self.Port port := host.self.Port

@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
libp2p "github.com/libp2p/go-libp2p" libp2p "github.com/libp2p/go-libp2p"
p2p_crypto "github.com/libp2p/go-libp2p-crypto"
libp2phost "github.com/libp2p/go-libp2p-host" libp2phost "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net" net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
@ -23,8 +24,38 @@ const (
// HostV2 is the version 2 p2p host // HostV2 is the version 2 p2p host
type HostV2 struct { type HostV2 struct {
h libp2phost.Host h libp2phost.Host
self p2p.Peer self p2p.Peer
priKey p2p_crypto.PrivKey
}
// AddPeer add p2p.Peer into Peerstore
func (host *HostV2) AddPeer(p *p2p.Peer) error {
if p.PeerID != "" && len(p.Addrs) != 0 {
host.Peerstore().AddAddrs(p.PeerID, p.Addrs, peerstore.PermanentAddrTTL)
return nil
}
if p.PeerID == "" {
log.Error("AddPeer PeerID is EMPTY")
return nil
}
// reconstruct the multiaddress based on ip/port
// PeerID has to be known for the ip/port
addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port)
targetAddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Error("AddPeer NewMultiaddr error", "error", err)
return err
}
p.Addrs = append(p.Addrs, targetAddr)
host.Peerstore().AddAddrs(p.PeerID, p.Addrs, peerstore.PermanentAddrTTL)
fmt.Printf("AddPeer add to peerstore: %v\n", *p)
return nil
} }
// Peerstore returns the peer store // Peerstore returns the peer store
@ -33,24 +64,34 @@ func (host *HostV2) Peerstore() peerstore.Peerstore {
} }
// New creates a host for p2p communication // New creates a host for p2p communication
func New(self p2p.Peer) *HostV2 { func New(self p2p.Peer, priKey p2p_crypto.PrivKey) *HostV2 {
sourceAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", self.Port))
catchError(err) // TODO (leo), use the [0] of Addrs for now, need to find a reliable way of using listenAddr
p2pHost, err := libp2p.New(context.Background(), p2pHost, err := libp2p.New(context.Background(),
libp2p.ListenAddrs(sourceAddr), libp2p.ListenAddrs(self.Addrs[0]),
libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves. libp2p.Identity(priKey),
// TODO(ricl): Other features to probe // TODO(ricl): Other features to probe
// libp2p.EnableRelay; libp2p.Routing; // libp2p.EnableRelay; libp2p.Routing;
) )
catchError(err) catchError(err)
log.Debug("HostV2 is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addr", sourceAddr) log.Debug("HostV2 is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addr", self.Addrs)
// has to save the private key for host
h := &HostV2{ h := &HostV2{
h: p2pHost, h: p2pHost,
self: self, self: self,
priKey: priKey,
} }
return h return h
} }
// GetID returns ID.Pretty
func (host *HostV2) GetID() peer.ID {
return host.h.ID()
}
// GetSelfPeer gets self peer // GetSelfPeer gets self peer
func (host *HostV2) GetSelfPeer() p2p.Peer { func (host *HostV2) GetSelfPeer() p2p.Peer {
return host.self return host.self
@ -67,14 +108,9 @@ func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) {
// SendMessage a p2p message sending function with signature compatible to p2pv1. // SendMessage a p2p message sending function with signature compatible to p2pv1.
func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error {
addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port) s, err := host.h.NewStream(context.Background(), p.PeerID, ProtocolID)
targetAddr, err := multiaddr.NewMultiaddr(addr)
catchError(err)
peerID := peer.ID(addr)
host.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL)
s, err := host.h.NewStream(context.Background(), peerID, ProtocolID)
if err != nil { if err != nil {
log.Error("Failed to send message", "from", host.self, "to", p, "error", err) log.Error("Failed to send message", "from", host.self, "to", p, "error", err, "PeerID", p.PeerID)
return err return err
} }

@ -1,9 +1,12 @@
package p2p package p2p
import ( import (
"fmt"
"net" "net"
"github.com/dedis/kyber" "github.com/dedis/kyber"
peer "github.com/libp2p/go-libp2p-peer"
multiaddr "github.com/multiformats/go-multiaddr"
) )
// StreamHandler handles incoming p2p message. // StreamHandler handles incoming p2p message.
@ -11,12 +14,15 @@ type StreamHandler func(Stream)
// Peer is the object for a p2p peer (node) // Peer is the object for a p2p peer (node)
type Peer struct { type Peer struct {
IP string // IP address of the peer IP string // IP address of the peer
Port string // Port number of the peer Port string // Port number of the peer
PubKey kyber.Point // Public key of the peer PubKey kyber.Point // Public key of the peer, used for consensus signing
Ready bool // Ready is true if the peer is ready to join consensus. Ready bool // Ready is true if the peer is ready to join consensus. (FIXME: deprecated)
ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard
// TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. Addrs []multiaddr.Multiaddr // MultiAddress of the peer
PeerID peer.ID // PeerID, the pubkey for communication
} }
func (p Peer) String() string { return net.JoinHostPort(p.IP, p.Port) } func (p Peer) String() string {
return fmt.Sprintf("%s/%s[%d]", net.JoinHostPort(p.IP, p.Port), p.PeerID, len(p.Addrs))
}

@ -1,10 +1,17 @@
package p2pimpl package p2pimpl
import ( import (
"fmt"
"net"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/host/hostv1" "github.com/harmony-one/harmony/p2p/host/hostv1"
"github.com/harmony-one/harmony/p2p/host/hostv2" "github.com/harmony-one/harmony/p2p/host/hostv2"
"github.com/harmony-one/harmony/internal/utils"
peer "github.com/libp2p/go-libp2p-peer"
multiaddr "github.com/multiformats/go-multiaddr"
) )
// Version The version number of p2p library // Version The version number of p2p library
@ -12,14 +19,39 @@ import (
// 2 - libp2p // 2 - libp2p
const Version = 2 const Version = 2
// NewHost starts the host // NewHost starts the host for p2p
func NewHost(peer p2p.Peer) host.Host { // for hostv2, it generates multiaddress, keypair and add PeerID to peer, add priKey to host
// log.Debug("New Host", "ip/port", net.JoinHostPort(peer.IP, peer.Port)) // TODO (leo) the PriKey of the host has to be persistent in disk, so that we don't need to regenerate it
// on the same host if the node software restarted. The peerstore has to be persistent as well.
func NewHost(self *p2p.Peer) (host.Host, error) {
if Version == 1 { if Version == 1 {
h := hostv1.New(peer) h := hostv1.New(self)
return h return h, nil
}
selfAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", self.Port))
if err != nil {
return nil, err
}
self.Addrs = append(self.Addrs, selfAddr)
// TODO (leo), change to GenKeyP2PRand() to generate random key. Right now, the key is predicable as the
// seed is fixed.
priKey, pubKey, err := utils.GenKeyP2P(self.IP, self.Port)
if err != nil {
return nil, err
} }
h := hostv2.New(peer) peerID, err := peer.IDFromPublicKey(pubKey)
return h
if err != nil {
return nil, err
}
self.PeerID = peerID
h := hostv2.New(*self, priKey)
fmt.Printf("NewHost => self:%s, PeerID: %v\n", net.JoinHostPort(self.IP, self.Port), self.PeerID)
return h, nil
} }

Loading…
Cancel
Save