package p2p
import (
"context"
"encoding/binary"
"fmt"
"net"
"os"
"runtime"
"strings"
"sync"
"github.com/harmony-one/bls/ffi/go/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/libp2p/go-libp2p"
libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto"
libp2p_host "github.com/libp2p/go-libp2p-core/host"
libp2p_network "github.com/libp2p/go-libp2p-core/network"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
libp2p_peerstore "github.com/libp2p/go-libp2p-core/peerstore"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
// Host is the client + server in p2p network.
type Host interface {
GetSelfPeer ( ) Peer
AddPeer ( * Peer ) error
GetID ( ) libp2p_peer . ID
GetP2PHost ( ) libp2p_host . Host
GetPeerCount ( ) int
ConnectHostPeer ( Peer ) error
// SendMessageToGroups sends a message to one or more multicast groups.
SendMessageToGroups ( groups [ ] nodeconfig . GroupID , msg [ ] byte ) error
PubSub ( ) * libp2p_pubsub . PubSub
C ( ) ( int , int , int )
GetOrJoin ( topic string ) ( * libp2p_pubsub . Topic , error )
ListPeer ( topic string ) [ ] libp2p_peer . ID
ListTopic ( ) [ ] string
ListBlockedPeer ( ) [ ] libp2p_peer . ID
}
// Peer is the object for a p2p peer (node)
type Peer struct {
IP string // IP address of the peer
Port string // Port number of the peer
ConsensusPubKey * bls . PublicKey // Public key of the peer, used for consensus signing
Addrs [ ] ma . Multiaddr // MultiAddress of the peer
PeerID libp2p_peer . ID // PeerID, the pubkey for communication
}
const (
// SetAsideForConsensus set the number of active validation goroutines for the consensus topic
SetAsideForConsensus = 1 << 13
// SetAsideOtherwise set the number of active validation goroutines for other topic
SetAsideOtherwise = 1 << 11
// MaxMessageHandlers ..
MaxMessageHandlers = SetAsideForConsensus + SetAsideOtherwise
// MaxMessageSize is 2Mb
MaxMessageSize = 1 << 21
)
// NewHost ..
func NewHost ( self * Peer , key libp2p_crypto . PrivKey ) ( Host , error ) {
listenAddr , err := ma . NewMultiaddr ( fmt . Sprintf ( "/ip4/%s/tcp/%s" , self . IP , self . Port ) )
if err != nil {
return nil , errors . Wrapf ( err ,
"cannot create listen multiaddr from port %#v" , self . Port )
}
ctx := context . Background ( )
p2pHost , err := libp2p . New ( ctx ,
libp2p . ListenAddrs ( listenAddr ) ,
libp2p . Identity ( key ) ,
libp2p . EnableNATService ( ) ,
libp2p . ForceReachabilityPublic ( ) ,
)
if err != nil {
return nil , errors . Wrapf ( err , "cannot initialize libp2p host" )
}
options := [ ] libp2p_pubsub . Option {
// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. When queue is full, validation is throttled and new messages are dropped.
libp2p_pubsub . WithValidateQueueSize ( 512 ) ,
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer. We start dropping messages to a peer if the outbound queue if full.
libp2p_pubsub . WithPeerOutboundQueueSize ( 64 ) ,
// WithValidateWorkers sets the number of synchronous validation worker goroutines. Defaults to NumCPU.
libp2p_pubsub . WithValidateWorkers ( runtime . NumCPU ( ) * 2 ) ,
// WithValidateThrottle sets the upper bound on the number of active validation goroutines across all topics. The default is 8192.
libp2p_pubsub . WithValidateThrottle ( MaxMessageHandlers ) ,
libp2p_pubsub . WithMaxMessageSize ( MaxMessageSize ) ,
}
traceFile := os . Getenv ( "P2P_TRACEFILE" )
if len ( traceFile ) > 0 {
var tracer libp2p_pubsub . EventTracer
var tracerErr error
if strings . HasPrefix ( traceFile , "file:" ) {
tracer , tracerErr = libp2p_pubsub . NewJSONTracer ( strings . TrimPrefix ( traceFile , "file:" ) )
} else {
pi , err := libp2p_peer . AddrInfoFromP2pAddr ( ma . StringCast ( traceFile ) )
if err == nil {
tracer , tracerErr = libp2p_pubsub . NewRemoteTracer ( ctx , p2pHost , * pi )
}
}
if tracerErr == nil && tracer != nil {
options = append ( options , libp2p_pubsub . WithEventTracer ( tracer ) )
} else {
utils . Logger ( ) . Warn ( ) .
Str ( "Tracer" , traceFile ) .
Msg ( "can't add event tracer from P2P_TRACEFILE" )
}
}
pubsub , err := libp2p_pubsub . NewGossipSub ( ctx , p2pHost , options ... )
if err != nil {
return nil , errors . Wrapf ( err , "cannot initialize libp2p pubsub" )
}
self . PeerID = p2pHost . ID ( )
subLogger := utils . Logger ( ) . With ( ) . Str ( "hostID" , p2pHost . ID ( ) . Pretty ( ) ) . Logger ( )
// has to save the private key for host
h := & HostV2 {
h : p2pHost ,
pubsub : pubsub ,
joined : map [ string ] * libp2p_pubsub . Topic { } ,
self : * self ,
priKey : key ,
logger : & subLogger ,
}
if err != nil {
return nil , err
}
utils . Logger ( ) . Info ( ) .
Str ( "self" , net . JoinHostPort ( self . IP , self . Port ) ) .
Interface ( "PeerID" , self . PeerID ) .
Str ( "PubKey" , self . ConsensusPubKey . SerializeToHexStr ( ) ) .
Msg ( "libp2p host ready" )
return h , nil
}
// HostV2 is the version 2 p2p host
type HostV2 struct {
h libp2p_host . Host
pubsub * libp2p_pubsub . PubSub
joined map [ string ] * libp2p_pubsub . Topic
self Peer
priKey libp2p_crypto . PrivKey
lock sync . Mutex
logger * zerolog . Logger
blocklist libp2p_pubsub . Blacklist
}
// PubSub ..
func ( host * HostV2 ) PubSub ( ) * libp2p_pubsub . PubSub {
return host . pubsub
}
// C .. -> (total known peers, connected, not connected)
func ( host * HostV2 ) C ( ) ( int , int , int ) {
connected , not := 0 , 0
peers := host . h . Peerstore ( ) . Peers ( )
for _ , peer := range peers {
result := host . h . Network ( ) . Connectedness ( peer )
if result == libp2p_network . Connected {
connected ++
} else if result == libp2p_network . NotConnected {
not ++
}
}
return len ( peers ) , connected , not
}
// GetOrJoin ..
func ( host * HostV2 ) GetOrJoin ( topic string ) ( * libp2p_pubsub . Topic , error ) {
host . lock . Lock ( )
defer host . lock . Unlock ( )
if t , ok := host . joined [ topic ] ; ok {
return t , nil
} else if t , err := host . pubsub . Join ( topic ) ; err != nil {
return nil , errors . Wrapf ( err , "cannot join pubsub topic %x" , topic )
} else {
host . joined [ topic ] = t
return t , nil
}
}
// SendMessageToGroups sends a message to one or more multicast groups.
// It returns a nil error if and only if it has succeeded to schedule the given
// message for sending.
func ( host * HostV2 ) SendMessageToGroups ( groups [ ] nodeconfig . GroupID , msg [ ] byte ) ( err error ) {
if len ( msg ) == 0 {
return errors . New ( "cannot send out empty message" )
}
for _ , group := range groups {
t , e := host . GetOrJoin ( string ( group ) )
if e != nil {
err = e
continue
}
e = t . Publish ( context . Background ( ) , msg )
if e != nil {
err = e
continue
}
}
return err
}
// AddPeer add p2p.Peer into Peerstore
func ( host * HostV2 ) AddPeer ( p * Peer ) error {
if p . PeerID != "" && len ( p . Addrs ) != 0 {
host . Peerstore ( ) . AddAddrs ( p . PeerID , p . Addrs , libp2p_peerstore . PermanentAddrTTL )
return nil
}
if p . PeerID == "" {
host . logger . Error ( ) . Msg ( "AddPeer PeerID is EMPTY" )
return fmt . Errorf ( "AddPeer error: peerID is empty" )
}
// reconstruct the multiaddress based on ip/port
// PeerID has to be known for the ip/port
addr := fmt . Sprintf ( "/ip4/%s/tcp/%s" , p . IP , p . Port )
targetAddr , err := ma . NewMultiaddr ( addr )
if err != nil {
host . logger . Error ( ) . Err ( err ) . Msg ( "AddPeer NewMultiaddr error" )
return err
}
p . Addrs = append ( p . Addrs , targetAddr )
host . Peerstore ( ) . AddAddrs ( p . PeerID , p . Addrs , libp2p_peerstore . PermanentAddrTTL )
host . logger . Info ( ) . Interface ( "peer" , * p ) . Msg ( "AddPeer add to libp2p_peerstore" )
return nil
}
// Peerstore returns the peer store
func ( host * HostV2 ) Peerstore ( ) libp2p_peerstore . Peerstore {
return host . h . Peerstore ( )
}
// GetID returns ID.Pretty
func ( host * HostV2 ) GetID ( ) libp2p_peer . ID {
return host . h . ID ( )
}
// GetSelfPeer gets self peer
func ( host * HostV2 ) GetSelfPeer ( ) Peer {
return host . self
}
// GetP2PHost returns the p2p.Host
func ( host * HostV2 ) GetP2PHost ( ) libp2p_host . Host {
return host . h
}
// ListTopic returns the list of topic the node subscribed
func ( host * HostV2 ) ListTopic ( ) [ ] string {
host . lock . Lock ( )
defer host . lock . Unlock ( )
topics := make ( [ ] string , 0 )
for t := range host . joined {
topics = append ( topics , t )
}
return topics
}
// ListPeer returns list of peers in a topic
func ( host * HostV2 ) ListPeer ( topic string ) [ ] libp2p_peer . ID {
host . lock . Lock ( )
defer host . lock . Unlock ( )
return host . joined [ topic ] . ListPeers ( )
}
// ListBlockedPeer returns list of blocked peer
func ( host * HostV2 ) ListBlockedPeer ( ) [ ] libp2p_peer . ID {
// TODO: this is a place holder for now
peers := make ( [ ] libp2p_peer . ID , 0 )
return peers
}
// GetPeerCount ...
func ( host * HostV2 ) GetPeerCount ( ) int {
return host . h . Peerstore ( ) . Peers ( ) . Len ( )
}
// ConnectHostPeer connects to peer host
func ( host * HostV2 ) ConnectHostPeer ( peer Peer ) error {
ctx := context . Background ( )
addr := fmt . Sprintf ( "/ip4/%s/tcp/%s/ipfs/%s" , peer . IP , peer . Port , peer . PeerID . Pretty ( ) )
peerAddr , err := ma . NewMultiaddr ( addr )
if err != nil {
host . logger . Error ( ) . Err ( err ) . Interface ( "peer" , peer ) . Msg ( "ConnectHostPeer" )
return err
}
peerInfo , err := libp2p_peer . AddrInfoFromP2pAddr ( peerAddr )
if err != nil {
host . logger . Error ( ) . Err ( err ) . Interface ( "peer" , peer ) . Msg ( "ConnectHostPeer" )
return err
}
if err := host . h . Connect ( ctx , * peerInfo ) ; err != nil {
host . logger . Warn ( ) . Err ( err ) . Interface ( "peer" , peer ) . Msg ( "can't connect to peer" )
return err
}
host . logger . Info ( ) . Interface ( "node" , * peerInfo ) . Msg ( "connected to peer host" )
return nil
}
// NamedTopic represents pubsub topic
// Name is the human readable topic, groupID
type NamedTopic struct {
Name string
Topic * libp2p_pubsub . Topic
}
// ConstructMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructMessage ( content [ ] byte ) [ ] byte {
message := make ( [ ] byte , 5 + len ( content ) )
message [ 0 ] = 17 // messageType 0x11
binary . BigEndian . PutUint32 ( message [ 1 : 5 ] , uint32 ( len ( content ) ) )
copy ( message [ 5 : ] , content )
return message
}
// AddrList is a list of multiaddress
type AddrList [ ] ma . Multiaddr
// String is a function to print a string representation of the AddrList
func ( al * AddrList ) String ( ) string {
strs := make ( [ ] string , len ( * al ) )
for i , addr := range * al {
strs [ i ] = addr . String ( )
}
return strings . Join ( strs , "," )
}
// Set is a function to set the value of AddrList based on a string
func ( al * AddrList ) Set ( value string ) error {
if len ( * al ) > 0 {
return fmt . Errorf ( "AddrList is already set" )
}
for _ , a := range strings . Split ( value , "," ) {
addr , err := ma . NewMultiaddr ( a )
if err != nil {
return err
}
* al = append ( * al , addr )
}
return nil
}
// StringsToAddrs convert a list of strings to a list of multiaddresses
func StringsToAddrs ( addrStrings [ ] string ) ( maddrs [ ] ma . Multiaddr , err error ) {
for _ , addrString := range addrStrings {
addr , err := ma . NewMultiaddr ( addrString )
if err != nil {
return maddrs , err
}
maddrs = append ( maddrs , addr )
}
return
}
// BootNodes is a list of boot nodes.
// It is populated either from default or from user CLI input.
// TODO: refactor p2p config into a config structure (now part of config is here, part is in
// nodeconfig)
var BootNodes AddrList