diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 738270cb1..a8b493e24 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -28,7 +28,6 @@ type Service struct { peerChan chan p2p.Peer peerInfo <-chan peerstore.PeerInfo discovery *libp2pdis.RoutingDiscovery - lock sync.Mutex } // New returns role conversion service. @@ -112,13 +111,11 @@ func (s *Service) DoService() { case peer := <-s.peerInfo: if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) - s.lock.Lock() if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil { utils.GetLogInstance().Warn("can't connect to peer node", "error", err) } else { utils.GetLogInstance().Info("connected to peer node", "peer", peer) } - s.lock.Unlock() // figure out the public ip/port ip := "127.0.0.1" var port string diff --git a/consensus/consensus.go b/consensus/consensus.go index b3ef03d1d..72a433bd4 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -379,7 +379,7 @@ func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int { consensus.pubKeyLock.Lock() consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) consensus.pubKeyLock.Unlock() - utils.GetLogInstance().Debug("[SYNC]", "new peer added", peer) + // utils.GetLogInstance().Debug("[SYNC]", "new peer added", peer) } count++ } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 48afe4a01..50bffa0f7 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -109,7 +109,8 @@ func (consensus *Consensus) startConsensus(newBlock *types.Block) { consensus.prepareSigs[consensus.nodeID] = consensus.priKey.SignHash(consensus.blockHash[:]) if utils.UseLibP2P { - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + // Construct broadcast p2p message + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } else { host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) } @@ -171,7 +172,7 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag consensus.aggregatedPrepareSig = aggSig if utils.UseLibP2P { - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } else { host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) } @@ -242,7 +243,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message consensus.aggregatedCommitSig = aggSig if utils.UseLibP2P { - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } else { host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 381bb8df1..3a89b3a2a 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -4,6 +4,7 @@ import ( "github.com/harmony-one/bls/ffi/go/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/host" "github.com/ethereum/go-ethereum/rlp" protobuf "github.com/golang/protobuf/proto" @@ -105,7 +106,7 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa // Construct and send prepare message msgToSend := consensus.constructPrepareMessage() if utils.UseLibP2P { - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } else { consensus.SendMessage(consensus.leader, msgToSend) } @@ -169,7 +170,7 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa multiSigAndBitmap := append(multiSig, bitmap...) msgToSend := consensus.constructCommitMessage(multiSigAndBitmap) if utils.UseLibP2P { - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, msgToSend) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } else { consensus.SendMessage(consensus.leader, msgToSend) } diff --git a/node/node_handler.go b/node/node_handler.go index 0c251a9b1..70addde56 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -71,7 +71,7 @@ func (node *Node) ReceiveGroupMessage() { // messageHandler parses the message and dispatch the actions func (node *Node) messageHandler(content []byte, sender string) { - node.MaybeBroadcastAsValidator(content) + // node.MaybeBroadcastAsValidator(content) consensusObj := node.Consensus @@ -314,11 +314,13 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int { return -1 } - utils.GetLogInstance().Debug("[pingMessageHandler]", "incoming peer", peer) + // utils.GetLogInstance().Debug("[pingMessageHandler]", "incoming peer", peer) // add to incoming peer list node.host.AddIncomingPeer(*peer) - node.host.ConnectHostPeer(*peer) + if utils.UseLibP2P { + node.host.ConnectHostPeer(*peer) + } if ping.Node.Role == proto_node.ClientRole { utils.GetLogInstance().Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer) @@ -349,7 +351,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int { // FIXME: HAR-89 use a separate nodefind/neighbor message host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers) - utils.GetLogInstance().Info("PingMsgHandler send pong message") + // utils.GetLogInstance().Info("PingMsgHandler send pong message") } return 1 @@ -391,6 +393,8 @@ func (node *Node) SendPongMessage() { utils.GetLogInstance().Info("[PONG] sent pong message to", "group", p2p.GroupIDBeacon) } sentMessage = true + // stop sending ping message + node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery}) } } numPeers = numPeersNow @@ -424,8 +428,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { peers = append(peers, peer) } - utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #peers", len(peers)) - if len(peers) > 0 { node.AddPeers(peers) } @@ -446,7 +448,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { publicKeys = append(publicKeys, &key) } - utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #keys", len(publicKeys)) + utils.GetLogInstance().Debug("[pongMessageHandler]", "#keys", len(publicKeys), "#peers", len(peers)) if node.State == NodeWaitToJoin { node.State = NodeReadyForConsensus diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index 6eb097039..87caa2cec 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -246,8 +246,6 @@ func (host *HostV2) ConnectHostPeer(peer p2p.Peer) { utils.GetLogInstance().Error("ConnectHostPeer", "new peerinfo error", err, "peer", peer) return } - host.lock.Lock() - defer host.lock.Unlock() if err := host.h.Connect(ctx, *peerInfo); err != nil { utils.GetLogInstance().Warn("can't connect to peer", "error", err, "peer", peer) } else { diff --git a/test/configs/oneshard1.txt b/test/configs/oneshard1.txt new file mode 100644 index 000000000..995c9ec97 --- /dev/null +++ b/test/configs/oneshard1.txt @@ -0,0 +1,5 @@ +127.0.0.1 9000 leader 0 +127.0.0.1 9001 validator 0 +127.0.0.1 9002 validator 0 +127.0.0.1 9003 validator 0 +127.0.0.1 9004 validator 0 diff --git a/test/deploy.sh b/test/deploy.sh index 3e3f67e9a..f2de4d8c3 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -73,7 +73,7 @@ EOU DB= TXGEN=true DURATION=90 -MIN=5 +MIN=2 SHARDS=2 KILLPORT=9004 SYNC=false