diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go deleted file mode 100644 index 5a7f411e9..000000000 --- a/api/service/networkinfo/service.go +++ /dev/null @@ -1,300 +0,0 @@ -package networkinfo - -import ( - "context" - "fmt" - "net" - "sync" - "time" - - "github.com/ethereum/go-ethereum/rpc" - msg_pb "github.com/harmony-one/harmony/api/proto/message" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - badger "github.com/ipfs/go-ds-badger" - coredis "github.com/libp2p/go-libp2p-core/discovery" - libp2p_peer "github.com/libp2p/go-libp2p-core/peer" - libp2pdis "github.com/libp2p/go-libp2p-discovery" - libp2pdht "github.com/libp2p/go-libp2p-kad-dht" - libp2pdhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" - madns "github.com/multiformats/go-multiaddr-dns" - manet "github.com/multiformats/go-multiaddr-net" - "github.com/pkg/errors" -) - -// Service is the network info service. -type Service struct { - Host p2p.Host - Rendezvous nodeconfig.GroupID - bootnodes p2p.AddrList - dht *libp2pdht.IpfsDHT - cancel context.CancelFunc - stopChan chan struct{} - stoppedChan chan struct{} - peerChan chan p2p.Peer - peerInfo <-chan libp2p_peer.AddrInfo - discovery *libp2pdis.RoutingDiscovery - messageChan chan *msg_pb.Message - started bool -} - -// ConnectionRetry set the number of retry of connection to bootnode in case the initial connection is failed -var ( - // retry for 30s and give up then - ConnectionRetry = 15 -) - -const ( - waitInRetry = 5 * time.Second - connectionTimeout = 3 * time.Minute - - minFindPeerInterval = 5 // initial find peer interval during bootstrap - maxFindPeerInterval = 1800 // max find peer interval, every 30 minutes - - // register to bootnode every ticker - dhtTicker = 6 * time.Hour - - discoveryLimit = 32 -) - -// New returns role conversion service. If dataStorePath is not empty, it -// points to a persistent database directory to use. -func New( - h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer, - bootnodes p2p.AddrList, dataStorePath string, -) (*Service, error) { - ctx, cancel := context.WithCancel(context.Background()) - var dhtOpts []libp2pdhtopts.Option - if dataStorePath != "" { - dataStore, err := badger.NewDatastore(dataStorePath, nil) - if err != nil { - return nil, errors.Wrapf(err, - "cannot open Badger datastore at %s", dataStorePath) - } - utils.Logger().Info(). - Str("dataStorePath", dataStorePath). - Msg("backing DHT with Badger datastore") - dhtOpts = append(dhtOpts, libp2pdhtopts.Datastore(dataStore)) - } - - dht, err := libp2pdht.New(ctx, h.GetP2PHost(), dhtOpts...) - if err != nil { - return nil, errors.Wrapf(err, "cannot create DHT") - } - - return &Service{ - Host: h, - dht: dht, - Rendezvous: rendezvous, - cancel: cancel, - stopChan: make(chan struct{}), - stoppedChan: make(chan struct{}), - peerChan: peerChan, - bootnodes: bootnodes, - discovery: nil, - started: false, - }, nil -} - -// MustNew is a panic-on-error version of New. -func MustNew( - h p2p.Host, rendezvous nodeconfig.GroupID, peerChan chan p2p.Peer, - bootnodes p2p.AddrList, dataStorePath string, -) *Service { - service, err := New(h, rendezvous, peerChan, bootnodes, dataStorePath) - if err != nil { - panic(err) - } - return service -} - -// Start starts network info service. -func (s *Service) Start() error { - err := s.Init() - if err != nil { - utils.Logger().Error().Err(err).Msg("Service Init Failed") - return nil - } - s.Run() - s.started = true - return nil -} - -// Init initializes role conversion service. -func (s *Service) Init() error { - ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) - defer cancel() - utils.Logger().Info().Msg("Init networkinfo service") - - // Bootstrap the DHT. In the default configuration, this spawns a Background - // thread that will refresh the peer table every five minutes. - utils.Logger().Debug().Msg("Bootstrapping the DHT") - if err := s.dht.Bootstrap(ctx); err != nil { - return fmt.Errorf("error bootstrap dht: %s", err) - } - - var wg sync.WaitGroup - if s.bootnodes == nil { - // TODO: should've passed in bootnodes through constructor. - s.bootnodes = p2p.BootNodes - } - - connected := false - var bnList p2p.AddrList - for _, maddr := range s.bootnodes { - if madns.Matches(maddr) { - mas, err := madns.Resolve(context.Background(), maddr) - if err != nil { - utils.Logger().Error().Err(err).Msg("Resolve bootnode") - continue - } - bnList = append(bnList, mas...) - } else { - bnList = append(bnList, maddr) - } - } - - for _, peerAddr := range bnList { - peerinfo, _ := libp2p_peer.AddrInfoFromP2pAddr(peerAddr) - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < ConnectionRetry; i++ { - if err := s.Host.GetP2PHost().Connect(ctx, *peerinfo); err != nil { - utils.Logger().Warn().Err(err).Int("try", i).Msg("can't connect to bootnode") - time.Sleep(waitInRetry) - } else { - utils.Logger().Info().Int("try", i).Interface("node", *peerinfo).Msg("connected to bootnode") - // it is okay if any bootnode is connected - connected = true - break - } - } - }() - } - wg.Wait() - - if !connected { - return fmt.Errorf("[FATAL] error connecting to bootnodes") - } - - // We use a rendezvous point "shardID" to announce our location. - utils.Logger().Info().Str("Rendezvous", string(s.Rendezvous)).Msg("Announcing ourselves...") - s.discovery = libp2pdis.NewRoutingDiscovery(s.dht) - libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) - utils.Logger().Info().Msg("Successfully announced!") - - return nil -} - -// Run runs network info. -func (s *Service) Run() { - defer close(s.stoppedChan) - if s.discovery == nil { - utils.Logger().Error().Msg("discovery is not initialized") - return - } - - go s.DoService() -} - -// DoService does network info. -func (s *Service) DoService() { - tick := time.NewTicker(dhtTicker) - defer tick.Stop() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - peerInterval := minFindPeerInterval - intervalTick := time.NewTicker(time.Duration(peerInterval) * time.Second) - defer intervalTick.Stop() - for { - select { - case <-s.stopChan: - return - case <-tick.C: - libp2pdis.Advertise(ctx, s.discovery, string(s.Rendezvous)) - utils.Logger().Info(). - Str("Rendezvous", string(s.Rendezvous)). - Msg("Successfully announced!") - case <-intervalTick.C: - var err error - s.peerInfo, err = s.discovery.FindPeers( - ctx, string(s.Rendezvous), coredis.Limit(discoveryLimit), - ) - if err != nil { - utils.Logger().Error().Err(err).Msg("FindPeers") - return - } - if peerInterval < maxFindPeerInterval { - peerInterval *= 2 - intervalTick.Stop() - intervalTick = time.NewTicker(time.Duration(peerInterval) * time.Second) - } - - go s.findPeers(ctx) - } - } -} - -func (s *Service) findPeers(ctx context.Context) { - _, cgnPrefix, err := net.ParseCIDR("100.64.0.0/10") - if err != nil { - utils.Logger().Error().Err(err).Msg("can't parse CIDR") - return - } - for peer := range s.peerInfo { - if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { - if err := s.Host.GetP2PHost().Connect(ctx, peer); err != nil { - utils.Logger().Warn().Err(err).Interface("peer", peer).Msg("can't connect to peer node") - // break if the node can't connect to peers, waiting for another peer - break - } else { - utils.Logger().Info().Interface("peer", peer).Msg("connected to peer node") - } - // figure out the public ip/port - var ip, port string - - for _, addr := range peer.Addrs { - netaddr, err := manet.ToNetAddr(addr) - if err != nil { - continue - } - nip := netaddr.(*net.TCPAddr).IP - if (nip.IsGlobalUnicast() && !utils.IsPrivateIP(nip)) || cgnPrefix.Contains(nip) { - ip = nip.String() - port = fmt.Sprintf("%d", netaddr.(*net.TCPAddr).Port) - break - } - } - p := p2p.Peer{IP: ip, Port: port, PeerID: peer.ID, Addrs: peer.Addrs} - utils.Logger().Info().Interface("peer", p).Msg("Notify peerChan") - if s.peerChan != nil { - s.peerChan <- p - } - } - } - - utils.Logger().Info().Msg("PeerInfo Channel Closed") -} - -// Stop stops network info service. -func (s *Service) Stop() error { - utils.Logger().Info().Msg("Stopping network info service") - defer s.cancel() - - if !s.started { - utils.Logger().Info().Msg("Service didn't started. Exit") - return nil - } - - s.stopChan <- struct{}{} - <-s.stoppedChan - utils.Logger().Info().Msg("Network info service stopped") - return nil -} - -// APIs for the services. -func (s *Service) APIs() []rpc.API { - return nil -} diff --git a/api/service/networkinfo/service_test.go b/api/service/networkinfo/service_test.go deleted file mode 100644 index 08232a30c..000000000 --- a/api/service/networkinfo/service_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package networkinfo - -import ( - "testing" - "time" - - "github.com/harmony-one/harmony/crypto/bls" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" -) - -func TestService(t *testing.T) { - nodePriKey, _, err := utils.LoadKeyFromFile("/tmp/127.0.0.1.12345.key") - if err != nil { - t.Fatal(err) - } - peerPriKey := bls.RandPrivateKey() - peerPubKey := peerPriKey.GetPublicKey() - if peerPriKey == nil || peerPubKey == nil { - t.Fatal("generate key error") - } - selfPeer := p2p.Peer{IP: "127.0.0.1", Port: "12345", ConsensusPubKey: peerPubKey} - host, err := p2p.NewHost(&selfPeer, nodePriKey) - if err != nil { - t.Fatal("unable to new host in harmony") - } - - s, err := New(host, nodeconfig.GroupIDBeaconClient, nil, nil, "") - if err != nil { - t.Fatalf("New() failed: %s", err) - } - - s.Start() - time.Sleep(2 * time.Second) - s.Stop() -} diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index de783f2d0..62f6bee59 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -3,7 +3,6 @@ package main import ( - "context" "flag" "fmt" "os" @@ -12,9 +11,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" - badger "github.com/ipfs/go-ds-badger" net "github.com/libp2p/go-libp2p-core/network" - kaddht "github.com/libp2p/go-libp2p-kad-dht" ma "github.com/multiformats/go-multiaddr" ) @@ -117,8 +114,15 @@ func main() { utils.FatalErrMsg(err, "cannot load key from %s", *keyFile) } + // For bootstrap nodes, we shall keep .dht file. + dataStorePath := fmt.Sprintf(".dht-%s-%s", *ip, *port) selfPeer := p2p.Peer{IP: *ip, Port: *port} - host, err := p2p.NewHost(&selfPeer, privKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &selfPeer, + BLSKey: privKey, + BootNodes: nil, // Boot nodes have no boot nodes :) Will be connected when other nodes joined + DataStoreFile: &dataStorePath, + }) if err != nil { utils.FatalErrMsg(err, "cannot initialize network") } @@ -127,20 +131,11 @@ func main() { fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, host.GetID().Pretty()), ) + host.Start() + if *logConn { host.GetP2PHost().Network().Notify(NewConnLogger(utils.GetLogInstance())) } - dataStorePath := fmt.Sprintf(".dht-%s-%s", *ip, *port) - dataStore, err := badger.NewDatastore(dataStorePath, nil) - if err != nil { - utils.FatalErrMsg(err, "cannot initialize DHT cache at %s", dataStorePath) - } - dht := kaddht.NewDHT(context.Background(), host.GetP2PHost(), dataStore) - - if err := dht.Bootstrap(context.Background()); err != nil { - utils.FatalErrMsg(err, "cannot bootstrap DHT") - } - select {} } diff --git a/cmd/harmony/config.go b/cmd/harmony/config.go index 127ae496f..5923f7197 100644 --- a/cmd/harmony/config.go +++ b/cmd/harmony/config.go @@ -46,9 +46,10 @@ type networkConfig struct { } type p2pConfig struct { - Port int - IP string - KeyFile string + Port int + IP string + KeyFile string + DHTDataStore *string `toml:",omitempty"` } type generalConfig struct { diff --git a/cmd/harmony/flags.go b/cmd/harmony/flags.go index 4037faf62..5be21de29 100644 --- a/cmd/harmony/flags.go +++ b/cmd/harmony/flags.go @@ -42,6 +42,7 @@ var ( p2pPortFlag, p2pIPFlag, p2pKeyFileFlag, + p2pDHTDataStoreFlag, legacyKeyFileFlag, } @@ -411,6 +412,12 @@ var ( Usage: "the p2p key file of the harmony node", DefValue: defaultConfig.P2P.KeyFile, } + p2pDHTDataStoreFlag = cli.StringFlag{ + Name: "p2p.dht.datastore", + Usage: "the datastore file to persist the dht routing table", + DefValue: "", + Hidden: true, + } legacyKeyFileFlag = cli.StringFlag{ Name: "key", Usage: "the p2p key file of the harmony node", @@ -435,6 +442,11 @@ func applyP2PFlags(cmd *cobra.Command, config *harmonyConfig) { } else if cli.IsFlagChanged(cmd, legacyKeyFileFlag) { config.P2P.KeyFile = cli.GetStringFlagValue(cmd, legacyKeyFileFlag) } + + if cli.IsFlagChanged(cmd, p2pDHTDataStoreFlag) { + ds := cli.GetStringFlagValue(cmd, p2pDHTDataStoreFlag) + config.P2P.DHTDataStore = &ds + } } // http flags diff --git a/cmd/harmony/flags_test.go b/cmd/harmony/flags_test.go index 34e2754d9..2f9d0ddf0 100644 --- a/cmd/harmony/flags_test.go +++ b/cmd/harmony/flags_test.go @@ -297,6 +297,8 @@ func TestNetworkFlags(t *testing.T) { } } +var defDataStore = ".dht-127.0.0.1" + func TestP2PFlags(t *testing.T) { tests := []struct { args []string @@ -308,11 +310,13 @@ func TestP2PFlags(t *testing.T) { expConfig: defaultConfig.P2P, }, { - args: []string{"--p2p.port", "9001", "--p2p.keyfile", "./key.file"}, + args: []string{"--p2p.port", "9001", "--p2p.keyfile", "./key.file", "--p2p.dht.datastore", + defDataStore}, expConfig: p2pConfig{ - Port: 9001, - IP: nodeconfig.DefaultPublicListenIP, - KeyFile: "./key.file", + Port: 9001, + IP: nodeconfig.DefaultPublicListenIP, + KeyFile: "./key.file", + DHTDataStore: &defDataStore, }, }, { diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 0c862f306..e916b9a00 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -16,8 +16,6 @@ import ( "syscall" "time" - "github.com/harmony-one/harmony/internal/params" - ethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/bls/ffi/go/bls" @@ -34,6 +32,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/internal/genesis" + "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/multibls" @@ -234,12 +233,7 @@ func setupPprof(config harmonyConfig) { func setupNodeAndRun(hc harmonyConfig) { var err error - bootNodes := hc.Network.BootNodes - p2p.BootNodes, err = p2p.StringsToAddrs(bootNodes) - if err != nil { - utils.FatalErrMsg(err, "cannot parse bootnode list %#v", - bootNodes) - } + nodeconfigSetShardSchedule(hc) nodeconfig.SetShardingSchedule(shard.Schedule) nodeconfig.SetVersion(getHarmonyVersion()) @@ -397,6 +391,12 @@ func setupNodeAndRun(hc harmonyConfig) { go listenOSSigAndShutDown(currentNode) + if err := myHost.Start(); err != nil { + utils.Logger().Fatal(). + Err(err). + Msg("Start p2p host failed") + } + if err := currentNode.BootstrapConsensus(); err != nil { fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error()) if !currentNode.NodeConfig.IsOffline { @@ -543,7 +543,12 @@ func createGlobalConfig(hc harmonyConfig) (*nodeconfig.ConfigType, error) { ConsensusPubKey: nodeConfig.ConsensusPriKey[0].Pub.Object, } - myHost, err = p2p.NewHost(&selfPeer, nodeConfig.P2PPriKey) + myHost, err = p2p.NewHost(p2p.HostConfig{ + Self: &selfPeer, + BLSKey: nodeConfig.P2PPriKey, + BootNodes: hc.Network.BootNodes, + DataStoreFile: hc.P2P.DHTDataStore, + }) if err != nil { return nil, errors.Wrap(err, "cannot create P2P network host") } diff --git a/consensus/consensus_service_test.go b/consensus/consensus_service_test.go index cf5a24624..e1b568b81 100644 --- a/consensus/consensus_service_test.go +++ b/consensus/consensus_service_test.go @@ -16,7 +16,10 @@ import ( func TestSignAndMarshalConsensusMessage(t *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } @@ -45,7 +48,10 @@ func TestSignAndMarshalConsensusMessage(t *testing.T) { func TestSetViewID(t *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } diff --git a/consensus/construct_test.go b/consensus/construct_test.go index 27d1e82f4..dd833095a 100644 --- a/consensus/construct_test.go +++ b/consensus/construct_test.go @@ -19,7 +19,10 @@ import ( func TestConstructAnnounceMessage(test *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { test.Fatalf("newhost failure: %v", err) } @@ -50,7 +53,10 @@ func TestConstructPreparedMessage(test *testing.T) { validatorPriKey := bls.RandPrivateKey() validatorPubKey := leaderPriKey.GetPublicKey() priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { test.Fatalf("newhost failure: %v", err) } @@ -117,7 +123,10 @@ func TestConstructPreparedMessage(test *testing.T) { func TestConstructPrepareMessage(test *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { test.Fatalf("newhost failure: %v", err) } @@ -206,7 +215,10 @@ func TestConstructPrepareMessage(test *testing.T) { func TestConstructCommitMessage(test *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { test.Fatalf("newhost failure: %v", err) } @@ -297,7 +309,10 @@ func TestConstructCommitMessage(test *testing.T) { func TestPopulateMessageFields(t *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } diff --git a/go.mod b/go.mod index 6940ba81c..d4c57f37b 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/harmony-one/bls v0.0.6 github.com/harmony-one/taggedrlp v0.1.4 github.com/harmony-one/vdf v0.0.0-20190924175951-620379da8849 + github.com/hashicorp/go-version v1.2.0 github.com/hashicorp/golang-lru v0.5.4 github.com/ipfs/go-ds-badger v0.2.4 github.com/libp2p/go-libp2p v0.13.0 @@ -44,6 +45,7 @@ require ( github.com/pelletier/go-toml v1.2.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 + github.com/prometheus/common v0.14.0 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 github.com/rjeczalik/notify v0.9.2 github.com/rs/cors v1.7.0 // indirect diff --git a/node/node_handler_test.go b/node/node_handler_test.go index 1fd14b698..13dde23a4 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -21,7 +21,10 @@ func TestAddNewBlock(t *testing.T) { pubKey := blsKey.GetPublicKey() leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", ConsensusPubKey: pubKey} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } @@ -65,7 +68,10 @@ func TestVerifyNewBlock(t *testing.T) { pubKey := blsKey.GetPublicKey() leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", ConsensusPubKey: pubKey} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } diff --git a/node/node_newblock_test.go b/node/node_newblock_test.go index 15fc152a9..51391cf9c 100644 --- a/node/node_newblock_test.go +++ b/node/node_newblock_test.go @@ -23,7 +23,10 @@ func TestFinalizeNewBlockAsync(t *testing.T) { pubKey := blsKey.GetPublicKey() leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", ConsensusPubKey: pubKey} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } diff --git a/node/node_test.go b/node/node_test.go index da8b097cb..e2740fb45 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -23,7 +23,10 @@ func TestNewNode(t *testing.T) { pubKey := blsKey.GetPublicKey() leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", ConsensusPubKey: pubKey} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } @@ -193,7 +196,10 @@ func TestAddBeaconPeer(t *testing.T) { pubKey := blsKey.GetPublicKey() leader := p2p.Peer{IP: "127.0.0.1", Port: "8982", ConsensusPubKey: pubKey} priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2p.NewHost(&leader, priKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) if err != nil { t.Fatalf("newhost failure: %v", err) } diff --git a/node/service_setup.go b/node/service_setup.go index d4cf400c8..78f154de4 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -7,19 +7,10 @@ import ( "github.com/harmony-one/harmony/api/service/blockproposal" "github.com/harmony-one/harmony/api/service/consensus" "github.com/harmony-one/harmony/api/service/explorer" - "github.com/harmony-one/harmony/api/service/networkinfo" ) // RegisterValidatorServices register the validator services. func (node *Node) RegisterValidatorServices() { - _, chanPeer, _ := node.initNodeConfiguration() - // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.Register( - service.NetworkInfo, - networkinfo.MustNew( - node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath(), - ), - ) // Register consensus service. node.serviceManager.Register( service.Consensus, @@ -34,14 +25,6 @@ func (node *Node) RegisterValidatorServices() { // RegisterExplorerServices register the explorer services func (node *Node) RegisterExplorerServices() { - _, chanPeer, _ := node.initNodeConfiguration() - - // Register networkinfo service. - node.serviceManager.Register( - service.NetworkInfo, - networkinfo.MustNew( - node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath()), - ) // Register explorer service. node.serviceManager.Register( service.SupportExplorer, explorer.New(&node.SelfPeer, node.stateSync, node.Blockchain()), diff --git a/p2p/discovery/discovery.go b/p2p/discovery/discovery.go new file mode 100644 index 000000000..8456f9423 --- /dev/null +++ b/p2p/discovery/discovery.go @@ -0,0 +1,89 @@ +package discovery + +import ( + "context" + "time" + + "github.com/harmony-one/harmony/internal/utils" + "github.com/libp2p/go-libp2p-core/discovery" + libp2p_host "github.com/libp2p/go-libp2p-core/host" + libp2p_peer "github.com/libp2p/go-libp2p-core/peer" + libp2p_dis "github.com/libp2p/go-libp2p-discovery" + libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/rs/zerolog" +) + +// Discovery is the interface for the underlying peer discovery protocol. +// The interface is implemented by dhtDiscovery +type Discovery interface { + Start() error + Close() error + Advertise(ctx context.Context, ns string) (time.Duration, error) + FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) + GetRawDiscovery() discovery.Discovery +} + +// dhtDiscovery is a wrapper of libp2p dht discovery service. It implements Discovery +// interface. +type dhtDiscovery struct { + dht *libp2p_dht.IpfsDHT + disc discovery.Discovery + host libp2p_host.Host + + opt DHTConfig + logger zerolog.Logger + ctx context.Context + cancel func() +} + +// NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface. +func NewDHTDiscovery(host libp2p_host.Host, opt DHTConfig) (Discovery, error) { + opts, err := opt.getLibp2pRawOptions() + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + dht, err := libp2p_dht.New(ctx, host, opts...) + if err != nil { + return nil, err + } + d := libp2p_dis.NewRoutingDiscovery(dht) + + logger := utils.Logger().With().Str("module", "discovery").Logger() + return &dhtDiscovery{ + dht: dht, + disc: d, + host: host, + opt: opt, + logger: logger, + ctx: ctx, + cancel: cancel, + }, nil +} + +// Start bootstrap the dht discovery service. +func (d *dhtDiscovery) Start() error { + return d.dht.Bootstrap(d.ctx) +} + +// Stop stop the dhtDiscovery service +func (d *dhtDiscovery) Close() error { + d.cancel() + return nil +} + +// Advertise advertises a service +func (d *dhtDiscovery) Advertise(ctx context.Context, ns string) (time.Duration, error) { + return d.disc.Advertise(ctx, ns) +} + +// FindPeers discovers peers providing a service +func (d *dhtDiscovery) FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) { + opt := discovery.Limit(peerLimit) + return d.disc.FindPeers(ctx, ns, opt) +} + +// GetRawDiscovery get the raw discovery to be used for libp2p pubsub options +func (d *dhtDiscovery) GetRawDiscovery() discovery.Discovery { + return d.disc +} diff --git a/p2p/discovery/discovery_test.go b/p2p/discovery/discovery_test.go new file mode 100644 index 000000000..901627bcb --- /dev/null +++ b/p2p/discovery/discovery_test.go @@ -0,0 +1,21 @@ +package discovery + +// TODO: test this module + +import ( + "context" + "testing" + + "github.com/libp2p/go-libp2p" +) + +func TestNewDHTDiscovery(t *testing.T) { + host, err := libp2p.New(context.Background()) + if err != nil { + t.Fatal(err) + } + _, err = NewDHTDiscovery(host, DHTConfig{}) + if err != nil { + t.Fatal(err) + } +} diff --git a/p2p/discovery/option.go b/p2p/discovery/option.go new file mode 100644 index 000000000..63634bd1b --- /dev/null +++ b/p2p/discovery/option.go @@ -0,0 +1,54 @@ +package discovery + +import ( + "github.com/pkg/errors" + + p2ptypes "github.com/harmony-one/harmony/p2p/types" + badger "github.com/ipfs/go-ds-badger" + libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" +) + +// DHTConfig is the configurable DHT options. +// For normal nodes, only BootNodes field need to be specified. +type DHTConfig struct { + BootNodes []string + DataStoreFile *string // File path to store DHT data. Shall be only used for bootstrap nodes. +} + +// getLibp2pRawOptions get the raw libp2p options as a slice. +func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) { + var opts []libp2p_dht.Option + + bootOption, err := getBootstrapOption(opt.BootNodes) + if err != nil { + return nil, err + } + opts = append(opts, bootOption) + + if opt.DataStoreFile != nil && len(*opt.DataStoreFile) != 0 { + dsOption, err := getDataStoreOption(*opt.DataStoreFile) + if err != nil { + return nil, err + } + opts = append(opts, dsOption) + } + + return opts, nil +} + +func getBootstrapOption(bootNodes []string) (libp2p_dht.Option, error) { + resolved, err := p2ptypes.ResolveAndParseMultiAddrs(bootNodes) + if err != nil { + return nil, errors.Wrap(err, "failed to parse boot nodes") + } + return libp2p_dht.BootstrapPeers(resolved...), nil +} + +func getDataStoreOption(dataStoreFile string) (libp2p_dht.Option, error) { + ds, err := badger.NewDatastore(dataStoreFile, nil) + if err != nil { + return nil, errors.Wrapf(err, + "cannot open Badger data store at %s", dataStoreFile) + } + return libp2p_dht.Datastore(ds), nil +} diff --git a/p2p/discovery/option_test.go b/p2p/discovery/option_test.go new file mode 100644 index 000000000..747d7ca95 --- /dev/null +++ b/p2p/discovery/option_test.go @@ -0,0 +1,85 @@ +package discovery + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "testing" +) + +var ( + tmpDir = filepath.Join(os.TempDir(), "harmony-one", "harmony", "p2p", "discovery") + emptyFile = filepath.Join(tmpDir, "empty_file") + validPath = filepath.Join(tmpDir, "dht-1.1.1.1") +) + +var ( + testAddrStr = []string{ + "/ip4/52.40.84.2/tcp/9800/p2p/QmbPVwrqWsTYXq1RxGWcxx9SWaTUCfoo1wA6wmdbduWe29", + "/ip4/54.86.126.90/tcp/9800/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv", + } +) + +func init() { + os.RemoveAll(tmpDir) + os.MkdirAll(tmpDir, os.ModePerm) + + f, _ := os.Create(emptyFile) + f.Close() +} + +func TestDHTOption_getLibp2pRawOptions(t *testing.T) { + tests := []struct { + opt DHTConfig + expLen int + expErr error + }{ + { + opt: DHTConfig{ + BootNodes: testAddrStr, + }, + expLen: 1, + }, + { + opt: DHTConfig{ + BootNodes: testAddrStr, + DataStoreFile: &validPath, + }, + expLen: 2, + }, + { + opt: DHTConfig{ + BootNodes: testAddrStr, + DataStoreFile: &emptyFile, + }, + expErr: errors.New("not a directory"), + }, + } + for i, test := range tests { + opts, err := test.opt.getLibp2pRawOptions() + if assErr := assertError(err, test.expErr); assErr != nil { + t.Errorf("Test %v: %v", i, assErr) + } + if err != nil || test.expErr != nil { + continue + } + if len(opts) != test.expLen { + t.Errorf("Test %v: unexpected option size %v / %v", i, len(opts), test.expLen) + } + } +} + +func assertError(got, expect error) error { + if (got == nil) != (expect == nil) { + return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) + } + if (got == nil) || (expect == nil) { + return nil + } + if !strings.Contains(got.Error(), expect.Error()) { + return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) + } + return nil +} diff --git a/p2p/host.go b/p2p/host.go index a99e7e1be..ba8b1ad77 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -13,6 +13,8 @@ import ( "github.com/harmony-one/bls/ffi/go/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p/discovery" + sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/libp2p/go-libp2p" libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto" libp2p_host "github.com/libp2p/go-libp2p-core/host" @@ -29,13 +31,15 @@ import ( type Host interface { Start() error Close() error - GetSelfPeer() Peer AddPeer(*Peer) error GetID() libp2p_peer.ID GetP2PHost() libp2p_host.Host + GetDiscovery() discovery.Discovery GetPeerCount() int ConnectHostPeer(Peer) error + // AddStreamProtocol add the given protocol + AddStreamProtocol(protocols ...sttypes.Protocol) // SendMessageToGroups sends a message to one or more multicast groups. SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error PubSub() *libp2p_pubsub.PubSub @@ -66,15 +70,27 @@ const ( MaxMessageSize = 1 << 21 ) +// HostConfig is the config structure to create a new host +type HostConfig struct { + Self *Peer + BLSKey libp2p_crypto.PrivKey + BootNodes []string + DataStoreFile *string +} + // NewHost .. -func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { +func NewHost(cfg HostConfig) (Host, error) { + var ( + self = cfg.Self + key = cfg.BLSKey + ) listenAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", self.IP, self.Port)) if err != nil { return nil, errors.Wrapf(err, "cannot create listen multiaddr from port %#v", self.Port) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) p2pHost, err := libp2p.New(ctx, libp2p.ListenAddrs(listenAddr), libp2p.Identity(key), @@ -85,6 +101,14 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { return nil, errors.Wrapf(err, "cannot initialize libp2p host") } + disc, err := discovery.NewDHTDiscovery(p2pHost, discovery.DHTConfig{ + BootNodes: cfg.BootNodes, + DataStoreFile: cfg.DataStoreFile, + }) + if err != nil { + return nil, errors.Wrap(err, "cannot create DHT discovery") + } + options := []libp2p_pubsub.Option{ // WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. When queue is full, validation is throttled and new messages are dropped. libp2p_pubsub.WithValidateQueueSize(512), @@ -95,6 +119,7 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { // WithValidateThrottle sets the upper bound on the number of active validation goroutines across all topics. The default is 8192. libp2p_pubsub.WithValidateThrottle(MaxMessageHandlers), libp2p_pubsub.WithMaxMessageSize(MaxMessageSize), + libp2p_pubsub.WithDiscovery(disc.GetRawDiscovery()), } traceFile := os.Getenv("P2P_TRACEFILE") @@ -120,7 +145,7 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { pubsub, err := libp2p_pubsub.NewGossipSub(ctx, p2pHost, options...) if err != nil { - return nil, errors.Wrapf(err, "cannot initialize libp2p pubsub") + return nil, errors.Wrapf(err, "cannot initialize libp2p pub-sub") } self.PeerID = p2pHost.ID() @@ -128,16 +153,15 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { // has to save the private key for host h := &HostV2{ - h: p2pHost, - pubsub: pubsub, - joined: map[string]*libp2p_pubsub.Topic{}, - self: *self, - priKey: key, - logger: &subLogger, - } - - if err != nil { - return nil, err + h: p2pHost, + pubsub: pubsub, + joined: map[string]*libp2p_pubsub.Topic{}, + self: *self, + priKey: key, + discovery: disc, + logger: &subLogger, + ctx: ctx, + cancel: cancel, } utils.Logger().Info(). @@ -150,14 +174,18 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { // HostV2 is the version 2 p2p host type HostV2 struct { - h libp2p_host.Host - pubsub *libp2p_pubsub.PubSub - joined map[string]*libp2p_pubsub.Topic - self Peer - priKey libp2p_crypto.PrivKey - lock sync.Mutex - logger *zerolog.Logger - blocklist libp2p_pubsub.Blacklist + h libp2p_host.Host + pubsub *libp2p_pubsub.PubSub + joined map[string]*libp2p_pubsub.Topic + streamProtos []sttypes.Protocol + self Peer + priKey libp2p_crypto.PrivKey + lock sync.Mutex + discovery discovery.Discovery + logger *zerolog.Logger + blocklist libp2p_pubsub.Blacklist + ctx context.Context + cancel func() } // PubSub .. @@ -165,14 +193,23 @@ func (host *HostV2) PubSub() *libp2p_pubsub.PubSub { return host.pubsub } -// Start is the current placeholder +// Start start the HostV2 discovery process +// TODO: move PubSub start handling logic here func (host *HostV2) Start() error { - return nil + for _, proto := range host.streamProtos { + proto.Start() + } + return host.discovery.Start() } -// Close is the current placeholder +// Close close the HostV2 func (host *HostV2) Close() error { - return nil + for _, proto := range host.streamProtos { + proto.Close() + } + host.discovery.Close() + host.cancel() + return host.h.Close() } // C .. -> (total known peers, connected, not connected) @@ -190,6 +227,14 @@ func (host *HostV2) C() (int, int, int) { return len(peers), connected, not } +// AddStreamProtocol adds the stream protocols to the host to be started and closed +// when the host starts or close +func (host *HostV2) AddStreamProtocol(protocols ...sttypes.Protocol) { + for _, protocol := range protocols { + host.streamProtos = append(host.streamProtos, protocol) + } +} + // GetOrJoin .. func (host *HostV2) GetOrJoin(topic string) (*libp2p_pubsub.Topic, error) { host.lock.Lock() @@ -277,6 +322,11 @@ func (host *HostV2) GetP2PHost() libp2p_host.Host { return host.h } +// GetDiscovery returns the underlying discovery +func (host *HostV2) GetDiscovery() discovery.Discovery { + return host.discovery +} + // ListTopic returns the list of topic the node subscribed func (host *HostV2) ListTopic() []string { host.lock.Lock() @@ -344,48 +394,3 @@ func ConstructMessage(content []byte) []byte { copy(message[5:], content) return message } - -// AddrList is a list of multiaddress -type AddrList []ma.Multiaddr - -// String is a function to print a string representation of the AddrList -func (al *AddrList) String() string { - strs := make([]string, len(*al)) - for i, addr := range *al { - strs[i] = addr.String() - } - return strings.Join(strs, ",") -} - -// Set is a function to set the value of AddrList based on a string -func (al *AddrList) Set(value string) error { - if len(*al) > 0 { - return fmt.Errorf("AddrList is already set") - } - for _, a := range strings.Split(value, ",") { - addr, err := ma.NewMultiaddr(a) - if err != nil { - return err - } - *al = append(*al, addr) - } - return nil -} - -// StringsToAddrs convert a list of strings to a list of multiaddresses -func StringsToAddrs(addrStrings []string) (maddrs []ma.Multiaddr, err error) { - for _, addrString := range addrStrings { - addr, err := ma.NewMultiaddr(addrString) - if err != nil { - return maddrs, err - } - maddrs = append(maddrs, addr) - } - return -} - -// BootNodes is a list of boot nodes. -// It is populated either from default or from user CLI input. -// TODO: refactor p2p config into a config structure (now part of config is here, part is in -// nodeconfig) -var BootNodes AddrList diff --git a/p2p/stream/types/interface.go b/p2p/stream/types/interface.go new file mode 100644 index 000000000..6001c3dba --- /dev/null +++ b/p2p/stream/types/interface.go @@ -0,0 +1,33 @@ +package sttypes + +import ( + p2ptypes "github.com/harmony-one/harmony/p2p/types" + "github.com/hashicorp/go-version" + libp2p_network "github.com/libp2p/go-libp2p-core/network" +) + +// Protocol is the interface of protocol to be registered to libp2p. +type Protocol interface { + p2ptypes.LifeCycle + + Specifier() string + Version() *version.Version + ProtoID() ProtoID + Match(string) bool + HandleStream(st libp2p_network.Stream) +} + +// Request is the interface of a stream request used for common stream utils. +type Request interface { + ReqID() uint64 + SetReqID(rid uint64) + String() string + IsSupportedByProto(ProtoSpec) bool + Encode() ([]byte, error) +} + +// Response is the interface of a stream response used for common stream utils +type Response interface { + ReqID() uint64 + String() string +} diff --git a/p2p/stream/types/stream.go b/p2p/stream/types/stream.go new file mode 100644 index 000000000..985b64e00 --- /dev/null +++ b/p2p/stream/types/stream.go @@ -0,0 +1,78 @@ +package sttypes + +import ( + "io/ioutil" + "sync" + + libp2p_network "github.com/libp2p/go-libp2p-core/network" +) + +// Stream is the interface for streams implemented in each service. +// The stream interface is used for stream management as well as rate limiters +type Stream interface { + ID() StreamID + ProtoID() ProtoID + ProtoSpec() (ProtoSpec, error) + WriteBytes([]byte) error + ReadBytes() ([]byte, error) + Close() error // Make sure streams can handle multiple calls of Close +} + +// BaseStream is the wrapper around +type BaseStream struct { + raw libp2p_network.Stream + + // parse protocol spec fields + spec ProtoSpec + specErr error + specOnce sync.Once +} + +// NewBaseStream creates BaseStream as the wrapper of libp2p Stream +func NewBaseStream(st libp2p_network.Stream) *BaseStream { + return &BaseStream{ + raw: st, + } +} + +// StreamID is the unique identifier for the stream. It has the value of +// libp2p_network.Stream.ID() +type StreamID string + +// Meta return the StreamID of the stream +func (st *BaseStream) ID() StreamID { + return StreamID(st.raw.ID()) +} + +// ProtoID return the remote protocol ID of the stream +func (st *BaseStream) ProtoID() ProtoID { + return ProtoID(st.raw.Protocol()) +} + +// ProtoSpec get the parsed protocol Specifier of the stream +func (st *BaseStream) ProtoSpec() (ProtoSpec, error) { + st.specOnce.Do(func() { + st.spec, st.specErr = ProtoIDToProtoSpec(st.ProtoID()) + }) + return st.spec, st.specErr +} + +// Close close the stream on both sides. +func (st *BaseStream) Close() error { + return st.raw.Reset() +} + +// WriteBytes write the bytes to the stream +func (st *BaseStream) WriteBytes(b []byte) error { + _, err := st.raw.Write(b) + return err +} + +// ReadMsg read the bytes from the stream +func (st *BaseStream) ReadBytes() ([]byte, error) { + b, err := ioutil.ReadAll(st.raw) + if err != nil { + return nil, err + } + return b, nil +} diff --git a/p2p/stream/types/utils.go b/p2p/stream/types/utils.go new file mode 100644 index 000000000..3d92368ef --- /dev/null +++ b/p2p/stream/types/utils.go @@ -0,0 +1,91 @@ +package sttypes + +// TODO: test this file + +import ( + "crypto/rand" + "encoding/binary" + "fmt" + "strconv" + "strings" + + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/hashicorp/go-version" + libp2p_proto "github.com/libp2p/go-libp2p-core/protocol" + "github.com/pkg/errors" +) + +const ( + // ProtoIDCommonPrefix is the common prefix for stream protocol + ProtoIDCommonPrefix = "harmony" + + // ProtoIDFormat is the format of stream protocol ID + ProtoIDFormat = "%s/%s/%s/%d/%s" + + // protoIDNumElem is the number of elements of the ProtoID. See comments in ProtoID + protoIDNumElem = 5 +) + +// ProtoID is the protocol id for streaming, an alias of libp2p stream protocol ID。 +// The stream protocol ID is composed of following components: +// 1. Service - Currently, only sync service is supported. +// 2. NetworkType - mainnet, testnet, stn, e.t.c. +// 3. ShardID - shard ID of the current protocol. +// 4. Version - Stream protocol version for backward compatibility. +type ProtoID libp2p_proto.ID + +// ProtoSpec is the un-serialized stream proto id specification +// TODO: move this to service wise module since different protocol might have different +// protoID information +type ProtoSpec struct { + Service string + NetworkType nodeconfig.NetworkType + ShardID nodeconfig.ShardID + Version *version.Version +} + +// ToProtoID convert a ProtoSpec to ProtoID. +func (spec ProtoSpec) ToProtoID() ProtoID { + s := fmt.Sprintf(ProtoIDFormat, ProtoIDCommonPrefix, spec.Service, + spec.NetworkType, spec.ShardID, spec.Version.String()) + return ProtoID(s) +} + +// ProtoIDToProtoSpec converts a ProtoID to ProtoSpec +func ProtoIDToProtoSpec(id ProtoID) (ProtoSpec, error) { + comps := strings.Split(string(id), "/") + if len(comps) != protoIDNumElem { + return ProtoSpec{}, errors.New("unexpected protocol size") + } + var ( + prefix = comps[0] + service = comps[1] + networkType = comps[2] + shardIDStr = comps[3] + versionStr = comps[4] + ) + shardID, err := strconv.Atoi(shardIDStr) + if err != nil { + return ProtoSpec{}, errors.Wrap(err, "invalid shard ID") + } + if prefix != ProtoIDCommonPrefix { + return ProtoSpec{}, errors.New("unexpected prefix") + } + version, err := version.NewVersion(versionStr) + if err != nil { + return ProtoSpec{}, errors.Wrap(err, "unexpected version string") + } + return ProtoSpec{ + Service: service, + NetworkType: nodeconfig.NetworkType(networkType), + ShardID: nodeconfig.ShardID(uint32(shardID)), + Version: version, + }, nil +} + +// GenReqID generates a random ReqID +func GenReqID() uint64 { + var rnd [8]byte + rand.Read(rnd[:]) + return binary.BigEndian.Uint64(rnd[:]) +} diff --git a/p2p/stream/types/utils_test.go b/p2p/stream/types/utils_test.go new file mode 100644 index 000000000..8b0ff6d9e --- /dev/null +++ b/p2p/stream/types/utils_test.go @@ -0,0 +1,10 @@ +package sttypes + +import "testing" + +func BenchmarkProtoIDToProtoSpec(b *testing.B) { + stid := ProtoID("harmony/sync/unitest/0/1.0.1") + for i := 0; i != b.N; i++ { + ProtoIDToProtoSpec(stid) + } +} diff --git a/p2p/tests/address_test.go b/p2p/tests/address_test.go index c56036ddb..97ca3d50a 100644 --- a/p2p/tests/address_test.go +++ b/p2p/tests/address_test.go @@ -4,7 +4,7 @@ import ( "strings" "testing" - "github.com/harmony-one/harmony/p2p" + p2ptypes "github.com/harmony-one/harmony/p2p/types" "github.com/harmony-one/harmony/test/helpers" "github.com/stretchr/testify/assert" ) @@ -12,7 +12,7 @@ import ( func TestMultiAddressParsing(t *testing.T) { t.Parallel() - multiAddresses, err := p2p.StringsToAddrs(helpers.Bootnodes) + multiAddresses, err := p2ptypes.StringsToMultiAddrs(helpers.Bootnodes) assert.NoError(t, err) assert.Equal(t, len(helpers.Bootnodes), len(multiAddresses)) @@ -24,24 +24,24 @@ func TestMultiAddressParsing(t *testing.T) { func TestAddressListConversionToString(t *testing.T) { t.Parallel() - multiAddresses, err := p2p.StringsToAddrs(helpers.Bootnodes) + multiAddresses, err := p2ptypes.StringsToMultiAddrs(helpers.Bootnodes) assert.NoError(t, err) assert.Equal(t, len(helpers.Bootnodes), len(multiAddresses)) expected := strings.Join(helpers.Bootnodes[:], ",") - var addressList p2p.AddrList = multiAddresses + var addressList p2ptypes.AddrList = multiAddresses assert.Equal(t, expected, addressList.String()) } func TestAddressListConversionFromString(t *testing.T) { t.Parallel() - multiAddresses, err := p2p.StringsToAddrs(helpers.Bootnodes) + multiAddresses, err := p2ptypes.StringsToMultiAddrs(helpers.Bootnodes) assert.NoError(t, err) assert.Equal(t, len(helpers.Bootnodes), len(multiAddresses)) addressString := strings.Join(helpers.Bootnodes[:], ",") - var addressList p2p.AddrList = multiAddresses + var addressList p2ptypes.AddrList = multiAddresses addressList.Set(addressString) assert.Equal(t, len(addressList), len(multiAddresses)) assert.Equal(t, addressList[0], multiAddresses[0]) diff --git a/p2p/types/interface.go b/p2p/types/interface.go new file mode 100644 index 000000000..8fdb61485 --- /dev/null +++ b/p2p/types/interface.go @@ -0,0 +1,7 @@ +package p2ptypes + +// LifeCycle is the interface of module supports Start and Close +type LifeCycle interface { + Start() + Close() +} diff --git a/p2p/types/peerAddr.go b/p2p/types/peerAddr.go new file mode 100644 index 000000000..f3e0c7e94 --- /dev/null +++ b/p2p/types/peerAddr.go @@ -0,0 +1,97 @@ +package p2ptypes + +import ( + "context" + "fmt" + "strings" + "time" + + libp2p_peer "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + madns "github.com/multiformats/go-multiaddr-dns" +) + +// AddrList is a list of multi address +type AddrList []ma.Multiaddr + +// String is a function to print a string representation of the AddrList +func (al *AddrList) String() string { + strs := make([]string, len(*al)) + for i, addr := range *al { + strs[i] = addr.String() + } + return strings.Join(strs, ",") +} + +// Set is a function to set the value of AddrList based on a string +func (al *AddrList) Set(value string) error { + if len(*al) > 0 { + return fmt.Errorf("AddrList is already set") + } + for _, a := range strings.Split(value, ",") { + addr, err := ma.NewMultiaddr(a) + if err != nil { + return err + } + *al = append(*al, addr) + } + return nil +} + +// StringsToMultiAddrs convert a list of strings to a list of multiaddresses +func StringsToMultiAddrs(addrStrings []string) (maddrs []ma.Multiaddr, err error) { + for _, addrString := range addrStrings { + addr, err := ma.NewMultiaddr(addrString) + if err != nil { + return maddrs, err + } + maddrs = append(maddrs, addr) + } + return +} + +// ResolveAndParseMultiAddrs resolve the DNS multi peer and parse to libp2p AddrInfo +func ResolveAndParseMultiAddrs(addrStrings []string) ([]libp2p_peer.AddrInfo, error) { + var res []libp2p_peer.AddrInfo + for _, addrStr := range addrStrings { + ais, err := resolveMultiAddrString(addrStr) + if err != nil { + return nil, err + } + res = append(res, ais...) + } + return res, nil +} + +func resolveMultiAddrString(addrStr string) ([]libp2p_peer.AddrInfo, error) { + var ais []libp2p_peer.AddrInfo + + mAddr, err := ma.NewMultiaddr(addrStr) + if err != nil { + return nil, err + } + mAddrs, err := resolveMultiAddr(mAddr) + if err != nil { + return nil, err + } + for _, mAddr := range mAddrs { + ai, err := libp2p_peer.AddrInfoFromP2pAddr(mAddr) + if err != nil { + return nil, err + } + ais = append(ais, *ai) + } + return ais, nil +} + +func resolveMultiAddr(raw ma.Multiaddr) ([]ma.Multiaddr, error) { + if madns.Matches(raw) { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + mas, err := madns.Resolve(ctx, raw) + if err != nil { + return nil, err + } + return mas, nil + } + return []ma.Multiaddr{raw}, nil +} diff --git a/p2p/types/peerAddr_test.go b/p2p/types/peerAddr_test.go new file mode 100644 index 000000000..8bb2ee387 --- /dev/null +++ b/p2p/types/peerAddr_test.go @@ -0,0 +1,45 @@ +package p2ptypes + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +var testAddrs = []string{ + "/ip4/54.86.126.90/tcp/9850/p2p/Qmdfjtk6hPoyrH1zVD9PEH4zfWLo38dP2mDvvKXfh3tnEv", + "/ip4/52.40.84.2/tcp/9850/p2p/QmbPVwrqWsTYXq1RxGWcxx9SWaTUCfoo1wA6wmdbduWe29", +} + +func TestMultiAddressParsing(t *testing.T) { + multiAddresses, err := StringsToMultiAddrs(testAddrs) + assert.NoError(t, err) + assert.Equal(t, len(testAddrs), len(multiAddresses)) + + for index, multiAddress := range multiAddresses { + assert.Equal(t, multiAddress.String(), testAddrs[index]) + } +} + +func TestAddressListConversionToString(t *testing.T) { + multiAddresses, err := StringsToMultiAddrs(testAddrs) + assert.NoError(t, err) + assert.Equal(t, len(testAddrs), len(multiAddresses)) + + expected := strings.Join(testAddrs[:], ",") + var addressList AddrList = multiAddresses + assert.Equal(t, expected, addressList.String()) +} + +func TestAddressListConversionFromString(t *testing.T) { + multiAddresses, err := StringsToMultiAddrs(testAddrs) + assert.NoError(t, err) + assert.Equal(t, len(testAddrs), len(multiAddresses)) + + addressString := strings.Join(testAddrs[:], ",") + var addressList AddrList = multiAddresses + addressList.Set(addressString) + assert.Equal(t, len(addressList), len(multiAddresses)) + assert.Equal(t, addressList[0], multiAddresses[0]) +} diff --git a/p2p/types/types.go b/p2p/types/types.go new file mode 100644 index 000000000..03a3c843e --- /dev/null +++ b/p2p/types/types.go @@ -0,0 +1,8 @@ +package p2ptypes + +import ( + libp2p_peer "github.com/libp2p/go-libp2p-core/peer" +) + +// PeerID is the alias for libp2p peer ID +type PeerID libp2p_peer.ID diff --git a/test/helpers/p2p.go b/test/helpers/p2p.go index 5957450ac..425c37724 100644 --- a/test/helpers/p2p.go +++ b/test/helpers/p2p.go @@ -55,8 +55,12 @@ func GenerateHost(address string, port string) (p2p.Host, *bls.PublicKey, error) if err != nil { return nil, nil, err } - - host, err := p2p.NewHost(&peer, nodePrivateKey) + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &peer, + BLSKey: nodePrivateKey, + BootNodes: nil, + DataStoreFile: nil, + }) if err != nil { return nil, nil, err }