diff --git a/.gitignore b/.gitignore index e35cab024..22ac1e3ef 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,12 @@ db/ # bootnode keystore .bnkey + +# harmony node keystore +.hmykey + +# vendor directory +vendor + +# node_modules +node_modules/ diff --git a/.gitmodules b/.gitmodules index 118ee2656..b07d555e7 100644 --- a/.gitmodules +++ b/.gitmodules @@ -5,3 +5,6 @@ [submodule "vendor/github.com/golang/protobuf"] path = vendor/github.com/golang/protobuf url = https://github.com/golang/protobuf +[submodule "vendor/github.com/libp2p/go-libp2p-kad-dht"] + path = vendor/github.com/libp2p/go-libp2p-kad-dht + url = https://github.com/libp2p/go-libp2p-kad-dht diff --git a/api/service/discovery/discovery_test.go b/api/service/discovery/discovery_test.go new file mode 100644 index 000000000..ef6b24be2 --- /dev/null +++ b/api/service/discovery/discovery_test.go @@ -0,0 +1,31 @@ +package discovery + +import ( + "testing" + + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" +) + +var ( + ip = "127.0.0.1" + port = "7099" + service *Service +) + +func TestDiscoveryService(t *testing.T) { + selfPeer := p2p.Peer{IP: ip, Port: port} + priKey, _, err := utils.GenKeyP2P(ip, port) + + host, err := p2pimpl.NewHost(&selfPeer, priKey) + if err != nil { + t.Fatalf("unable to new host in harmony: %v", err) + } + + service = New(host, "rendezvous") + + if service == nil { + t.Fatalf("unable to create new discovery service") + } +} diff --git a/api/service/discovery/errors.go b/api/service/discovery/errors.go new file mode 100644 index 000000000..f2d614f54 --- /dev/null +++ b/api/service/discovery/errors.go @@ -0,0 +1,12 @@ +package discovery + +import "errors" + +// Errors of peer discovery +var ( + ErrGetPeers = errors.New("[DISCOVERY]: get peer list failed") + ErrConnectionFull = errors.New("[DISCOVERY]: node's incoming connection full") + ErrPing = errors.New("[DISCOVERY]: ping peer failed") + ErrPong = errors.New("[DISCOVERY]: pong peer failed") + ErrDHTBootstrap = errors.New("[DISCOVERY]: DHT bootstrap failed") +) diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go new file mode 100644 index 000000000..87da7d308 --- /dev/null +++ b/api/service/discovery/service.go @@ -0,0 +1,131 @@ +package discovery + +import ( + "context" + "sync" + + "github.com/ethereum/go-ethereum/log" + proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" + + peerstore "github.com/libp2p/go-libp2p-peerstore" + + libp2pdis "github.com/libp2p/go-libp2p-discovery" + libp2pdht "github.com/libp2p/go-libp2p-kad-dht" +) + +// Constants for discovery service. +const ( + numIncoming = 128 + numOutgoing = 16 +) + +// Service is the struct for discovery service. +type Service struct { + Host p2p.Host + DHT *libp2pdht.IpfsDHT + Rendezvous string + ctx context.Context + peerChan <-chan peerstore.PeerInfo +} + +// New returns discovery service. +// h is the p2p host +// r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network) +func New(h p2p.Host, r string) *Service { + ctx := context.Background() + dht, err := libp2pdht.New(ctx, h.GetP2PHost()) + if err != nil { + panic(err) + } + + return &Service{ + Host: h, + DHT: dht, + Rendezvous: r, + ctx: ctx, + peerChan: make(<-chan peerstore.PeerInfo), + } +} + +// StartService starts discovery service. +func (s *Service) StartService() { + log.Info("Starting discovery service.") + err := s.Init() + if err != nil { + log.Error("StartService Aborted", "Error", err) + return + } + + // We use a rendezvous point "shardID" to announce our location. + log.Info("Announcing ourselves...") + routingDiscovery := libp2pdis.NewRoutingDiscovery(s.DHT) + libp2pdis.Advertise(s.ctx, routingDiscovery, s.Rendezvous) + log.Debug("Successfully announced!") + + log.Debug("Searching for other peers...") + s.peerChan, err = routingDiscovery.FindPeers(s.ctx, s.Rendezvous) + if err != nil { + log.Error("FindPeers", "error", err) + } +} + +// StopService shutdowns discovery service. +func (s *Service) StopService() { + log.Info("Shutting down discovery service.") +} + +func (s *Service) foundPeers() { + select { + case peer := <-s.peerChan: + if peer.ID != s.Host.GetP2PHost().ID() { + log.Debug("Found Peer", "peer", peer.ID, "addr", peer.Addrs) + if len(peer.ID) > 0 { + p := p2p.Peer{PeerID: peer.ID, Addrs: peer.Addrs} + s.Host.AddPeer(&p) + // TODO: stop ping if pinged before + s.pingPeer(p) + } + } + } +} + +// Init is to initialize for discoveryService. +func (s *Service) Init() error { + log.Info("Init discovery service") + + // Bootstrap the DHT. In the default configuration, this spawns a Background + // thread that will refresh the peer table every five minutes. + log.Debug("Bootstrapping the DHT") + if err := s.DHT.Bootstrap(s.ctx); err != nil { + return ErrDHTBootstrap + } + + var wg sync.WaitGroup + for _, peerAddr := range utils.BootNodes { + peerinfo, _ := peerstore.InfoFromP2pAddr(peerAddr) + wg.Add(1) + go func() { + defer wg.Done() + if err := s.Host.GetP2PHost().Connect(s.ctx, *peerinfo); err != nil { + log.Warn("can't connect to bootnode", "error", err) + } else { + log.Info("connected to bootnode", "node", *peerinfo) + } + }() + } + wg.Wait() + + go s.foundPeers() + + return nil +} + +func (s *Service) pingPeer(peer p2p.Peer) { + ping := proto_discovery.NewPingMessage(s.Host.GetSelfPeer()) + buffer := ping.ConstructPingMessage() + log.Debug("Sending Ping Message to", "peer", peer) + s.Host.SendMessage(peer, buffer) + log.Debug("Sent Ping Message to", "peer", peer) +} diff --git a/api/service/manager.go b/api/service/manager.go index 7c07318e7..6e9f81fa4 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -22,18 +22,19 @@ type Type byte // Constants for Type. const ( SupportSyncing Type = iota - SupportClient + ClientSupport SupportExplorer Consensus BlockProposal + PeerDiscovery Test Done ) func (t Type) String() string { switch t { - case SupportClient: - return "SupportClient" + case ClientSupport: + return "ClientSupport" case SupportSyncing: return "SyncingSupport" case SupportExplorer: @@ -42,6 +43,8 @@ func (t Type) String() string { return "Consensus" case BlockProposal: return "BlockProposal" + case PeerDiscovery: + return "PeerDiscovery" case Test: return "Test" case Done: diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 9bcc6e8af..e24ab9189 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -87,6 +87,8 @@ type StateSync struct { // AddLastMileBlock add the lastest a few block into queue for syncing func (ss *StateSync) AddLastMileBlock(block *types.Block) { + ss.syncMux.Lock() + defer ss.syncMux.Unlock() ss.lastMileBlocks = append(ss.lastMileBlocks, block) } diff --git a/cmd/harmony.go b/cmd/harmony.go index 228933bff..02a0c0ff6 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -216,9 +216,6 @@ func main() { } go currentNode.SupportSyncing() - if consensus.IsLeader { - go currentNode.SupportClient() - } currentNode.ServiceManagerSetup() currentNode.RunServices() currentNode.StartServer() diff --git a/consensus/consensus.go b/consensus/consensus.go index 0777af59e..54790190a 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -93,7 +93,7 @@ type Consensus struct { OnConsensusDone func(*types.Block) // current consensus block to check if out of sync - ConsensusBlock chan *types.Block + ConsensusBlock chan *BFTBlockInfo // verified block to state sync broadcast VerifiedNewBlock chan *types.Block @@ -113,6 +113,13 @@ type Consensus struct { Active_Nodes []proto_node.Info } +// BFTBlockInfo send the latest block that was in BFT consensus process as well as its consensusID to state syncing +// consensusID is necessary to make sure the out of sync node can enter the correct view +type BFTBlockInfo struct { + Block *types.Block + ConsensusID uint32 +} + // BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far // This is mainly used in the case that this node is lagging behind and needs to catch up. // For example, the consensus moved to round N and this node received message(N). @@ -124,6 +131,16 @@ type BlockConsensusStatus struct { state State // the latest state of the consensus } +// UpdateConsensusID is used to update latest consensusID for nodes that out of sync +func (consensus *Consensus) UpdateConsensusID(consensusID uint32) { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + if consensus.consensusID < consensusID { + utils.GetLogInstance().Debug("update consensusID", "myConsensusID", consensus.consensusID, "newConsensusID", consensusID) + consensus.consensusID = consensusID + } +} + // New creates a new Consensus object func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { consensus := Consensus{} @@ -202,7 +219,8 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons } // Checks the basic meta of a consensus message. -func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) bool { +// +func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) error { consensusID := message.ConsensusId blockHash := message.BlockHash @@ -210,20 +228,20 @@ func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Messag err := verifyMessageSig(publicKey, message) if err != nil { utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) - return false + return ErrInvalidConsensusMessage } // check consensus Id if consensusID != consensus.consensusID { utils.GetLogInstance().Warn("Wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) - return false + return ErrConsensusIDNotMatch } if !bytes.Equal(blockHash, consensus.blockHash[:]) { utils.GetLogInstance().Warn("Wrong blockHash", "consensus", consensus) - return false + return ErrInvalidConsensusMessage } - return true + return nil } // Gets the validator peer based on validator ID. diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index b3b918cef..e1fcb241b 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -123,7 +123,7 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag validatorPeer := consensus.getValidatorPeerByID(validatorID) - if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) { + if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID) return } @@ -185,7 +185,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message validatorPeer := consensus.getValidatorPeerByID(validatorID) - if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) { + if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID) return } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 8374d6efa..783095173 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -12,6 +12,30 @@ import ( "github.com/harmony-one/harmony/internal/utils" ) +// sendBFTBlockToStateSyncing will send the latest BFT consensus block to state syncing checkingjjkkkkkkkkkkkkkkkjnjk +func (consensus *Consensus) sendBFTBlockToStateSyncing(consensusID uint32) { + // validator send consensus block to state syncing + if val, ok := consensus.blocksReceived[consensusID]; ok { + consensus.mutex.Lock() + delete(consensus.blocksReceived, consensusID) + consensus.mutex.Unlock() + + var blockObj types.Block + err := rlp.DecodeBytes(val.block, &blockObj) + if err != nil { + utils.GetLogInstance().Debug("failed to construct the cached block") + return + } + blockInfo := &BFTBlockInfo{Block: &blockObj, ConsensusID: consensusID} + select { + case consensus.ConsensusBlock <- blockInfo: + default: + utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", blockObj.NumberU64(), "blockHash", blockObj.Hash().Hex()) + } + } + return +} + // ProcessMessageValidator dispatches validator's consensus message. func (consensus *Consensus) ProcessMessageValidator(payload []byte) { message := consensus_proto.Message{} @@ -40,11 +64,20 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa blockHash := message.BlockHash block := message.Payload + // Add block to received block cache + consensus.mutex.Lock() + consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state} + consensus.mutex.Unlock() + copy(consensus.blockHash[:], blockHash[:]) consensus.block = block - if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { + if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the leader message") + if err == ErrConsensusIDNotMatch { + utils.GetLogInstance().Debug("sending bft block to state syncing") + consensus.sendBFTBlockToStateSyncing(consensusID) + } return } @@ -56,11 +89,6 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa return } - // Add block to received block cache - consensus.mutex.Lock() - consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state} - consensus.mutex.Unlock() - // Add attack model of IncorrectResponse if attack.GetInstance().IncorrectResponse() { utils.GetLogInstance().Warn("IncorrectResponse attacked") @@ -102,8 +130,8 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa // Update readyByConsensus for attack. attack.GetInstance().UpdateConsensusReady(consensusID) - if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { - utils.GetLogInstance().Debug("Failed to check the leader message") + if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { + utils.GetLogInstance().Debug("processPreparedMessage error", "error", err) return } @@ -161,8 +189,8 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess // Update readyByConsensus for attack. attack.GetInstance().UpdateConsensusReady(consensusID) - if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) { - utils.GetLogInstance().Debug("Failed to check the leader message") + if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { + utils.GetLogInstance().Debug("processCommittedMessage error", "error", err) return } @@ -195,6 +223,7 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess consensus.state = CommittedDone // TODO: the block catch up logic is a temporary workaround for full failure node catchup. Implement the full node catchup logic // The logic is to roll up to the latest blocks one by one to try catching up with the leader. + // but because of checkConsensusMessage, the catchup logic will never be used here for { val, ok := consensus.blocksReceived[consensus.consensusID] if ok { @@ -205,10 +234,6 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess var blockObj types.Block err := rlp.DecodeBytes(val.block, &blockObj) - if err != nil { - utils.GetLogInstance().Warn("Unparseable block header data", "error", err) - return - } if err != nil { utils.GetLogInstance().Debug("failed to construct the new block after consensus") } diff --git a/consensus/errors.go b/consensus/errors.go index a005c5f63..6156c4aed 100644 --- a/consensus/errors.go +++ b/consensus/errors.go @@ -34,4 +34,10 @@ var ( // ErrInvalidNumber is returned if a block's number doesn't equal it's parent's // plus one. ErrInvalidNumber = errors.New("invalid block number") + + // ErrConsensusIDNotMatch is returned if the current consensusID is not equal message's consensusID + ErrConsensusIDNotMatch = errors.New("consensusID not match") + + // ErrInvalidConsensusMessage is returned is the consensus message received is invalid + ErrInvalidConsensusMessage = errors.New("invalid consensus message") ) diff --git a/internal/utils/flags.go b/internal/utils/flags.go index 5f73c0376..8c206e18c 100644 --- a/internal/utils/flags.go +++ b/internal/utils/flags.go @@ -46,7 +46,10 @@ func StringsToAddrs(addrStrings []string) (maddrs []ma.Multiaddr, err error) { return } -// DefaultBootNodeAddrStrings is a list of Harmony bootnodes. Used to find other peers in the network. +// DefaultBootNodeAddrStrings is a list of Harmony bootnodes address. Used to find other peers in the network. var DefaultBootNodeAddrStrings = []string{ "/ip4/127.0.0.1/tcp/9876/p2p/QmayB8NwxmfGE4Usb4H61M8uwbfc7LRbmXb3ChseJgbVuf", } + +// BootNodes is a list of boot nodes. It is populated either from default or from user CLI input. +var BootNodes AddrList diff --git a/node/node.go b/node/node.go index a6058f302..ba19b53c1 100644 --- a/node/node.go +++ b/node/node.go @@ -26,6 +26,7 @@ import ( proto_node "github.com/harmony-one/harmony/api/proto/node" service_manager "github.com/harmony-one/harmony/api/service" blockproposal "github.com/harmony-one/harmony/api/service/blockproposal" + "github.com/harmony-one/harmony/api/service/clientsupport" consensus_service "github.com/harmony-one/harmony/api/service/consensus" "github.com/harmony-one/harmony/api/service/explorer" "github.com/harmony-one/harmony/api/service/syncing" @@ -91,7 +92,7 @@ func (state State) String() string { // Constants related to doing syncing. const ( lastMileThreshold = 4 - inSyncThreshold = 2 + inSyncThreshold = 1 ) const ( @@ -278,12 +279,11 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { node.BlockChannel = make(chan *types.Block) node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey), node.Consensus.ShardID) - node.AddFaucetContractToPendingTransactions() if node.Role == BeaconLeader { node.AddDepositContractToPendingTransactions() } - node.Consensus.ConsensusBlock = make(chan *types.Block) + node.Consensus.ConsensusBlock = make(chan *bft.BFTBlockInfo) node.Consensus.VerifiedNewBlock = make(chan *types.Block) } @@ -304,11 +304,16 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { } // IsOutOfSync checks whether the node is out of sync by comparing latest block with consensus block -func (node *Node) IsOutOfSync(consensusBlock *types.Block) bool { +func (node *Node) IsOutOfSync(consensusBlockInfo *bft.BFTBlockInfo) bool { + consensusBlock := consensusBlockInfo.Block + consensusID := consensusBlockInfo.ConsensusID + myHeight := node.blockchain.CurrentBlock().NumberU64() newHeight := consensusBlock.NumberU64() utils.GetLogInstance().Debug("[SYNC]", "myHeight", myHeight, "newHeight", newHeight) if newHeight-myHeight <= inSyncThreshold { + node.stateSync.AddLastMileBlock(consensusBlock) + node.Consensus.UpdateConsensusID(consensusID + 1) return false } // cache latest blocks for last mile catch up @@ -324,21 +329,24 @@ func (node *Node) DoSyncing() { select { // in current implementation logic, timeout means in sync case <-time.After(5 * time.Second): + //myHeight := node.blockchain.CurrentBlock().NumberU64() + //utils.GetLogInstance().Debug("[SYNC]", "currentHeight", myHeight) node.stateMutex.Lock() node.State = NodeReadyForConsensus node.stateMutex.Unlock() continue - case consensusBlock := <-node.Consensus.ConsensusBlock: - // never reached from chao - if !node.IsOutOfSync(consensusBlock) { + case consensusBlockInfo := <-node.Consensus.ConsensusBlock: + if !node.IsOutOfSync(consensusBlockInfo) { if node.State == NodeNotInSync { utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") - node.stateSync.CloseConnections() - node.stateSync = nil } node.stateMutex.Lock() node.State = NodeReadyForConsensus node.stateMutex.Unlock() + // wait for last mile block finish; think a better way + time.Sleep(200 * time.Millisecond) + node.stateSync.CloseConnections() + node.stateSync = nil continue } else { utils.GetLogInstance().Debug("[SYNC] node is out of sync") @@ -662,6 +670,8 @@ func (node *Node) setupForShardLeader() { node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.NewService(node.BlockChannel, node.Consensus)) // Register new block service. node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.NewService(node.Consensus.ReadySignal, node.WaitForConsensusReady)) + // Register client support service. + node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.NewService(node.blockchain.State, node.CallFaucetContract, node.SelfPeer.IP, node.SelfPeer.Port)) } func (node *Node) setupForShardValidator() { diff --git a/node/node_handler.go b/node/node_handler.go index 674b19b2e..881935775 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -200,17 +200,8 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool { err := node.blockchain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) if err != nil { utils.GetLogInstance().Debug("Failed verifying new block", "Error", err, "tx", newBlock.Transactions()[0]) - - // send consensus block to state syncing - select { - case node.Consensus.ConsensusBlock <- newBlock: - default: - utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", newBlock.NumberU64(), "blockHash", newBlock.Hash().Hex()) - } - return false } - return true } diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index c906725eb..a7dadfb42 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -13,6 +13,7 @@ import ( net "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" peerstore "github.com/libp2p/go-libp2p-peerstore" + protocol "github.com/libp2p/go-libp2p-protocol" p2p_config "github.com/libp2p/go-libp2p/config" ma "github.com/multiformats/go-multiaddr" ) @@ -103,7 +104,7 @@ func (host *HostV2) GetSelfPeer() p2p.Peer { // BindHandlerAndServe bind a streamHandler to the harmony protocol. func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { - host.h.SetStreamHandler(ProtocolID, func(s net.Stream) { + host.h.SetStreamHandler(protocol.ID(ProtocolID), func(s net.Stream) { handler(s) }) // Hang forever @@ -113,7 +114,8 @@ func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { // SendMessage a p2p message sending function with signature compatible to p2pv1. func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { logger := log.New("from", host.self, "to", p, "PeerID", p.PeerID) - s, err := host.h.NewStream(context.Background(), p.PeerID, ProtocolID) + host.Peerstore().AddProtocols(p.PeerID, ProtocolID) + s, err := host.h.NewStream(context.Background(), p.PeerID, protocol.ID(ProtocolID)) if err != nil { logger.Error("NewStream() failed", "peerID", p.PeerID, "protocolID", ProtocolID, "error", err) diff --git a/vendor/github.com/libp2p/go-libp2p-kad-dht b/vendor/github.com/libp2p/go-libp2p-kad-dht new file mode 160000 index 000000000..838d43da0 --- /dev/null +++ b/vendor/github.com/libp2p/go-libp2p-kad-dht @@ -0,0 +1 @@ +Subproject commit 838d43da02fc33899794e1c63fe4bd4d0bfd749a