add a goroutine to send pong message periodically

only send pong message when there is a stable number of peers

Signed-off-by: Leo Chen <leo@harmony.one>
pull/428/head
Leo Chen 6 years ago
parent a410f8d588
commit 175e43fed7
  1. 3
      cmd/harmony.go
  2. 59
      node/node_handler.go

@ -284,6 +284,9 @@ func main() {
go currentNode.JoinShard(leader) go currentNode.JoinShard(leader)
} }
} else { } else {
if consensus.IsLeader {
go currentNode.SendPongMessage()
}
currentNode.UseLibP2P = true currentNode.UseLibP2P = true
} }

@ -324,7 +324,8 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
// Add to Node's peer list anyway // Add to Node's peer list anyway
node.AddPeers([]*p2p.Peer{peer}) node.AddPeers([]*p2p.Peer{peer})
if node.Consensus.IsLeader { // This is the old way of broadcasting pong message
if node.Consensus.IsLeader && !node.UseLibP2P {
peers := node.Consensus.GetValidatorPeers() peers := node.Consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys) pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys)
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
@ -342,21 +343,55 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
// Broadcast the message to all validators, as publicKeys is updated // Broadcast the message to all validators, as publicKeys is updated
// FIXME: HAR-89 use a separate nodefind/neighbor message // FIXME: HAR-89 use a separate nodefind/neighbor message
if node.UseLibP2P { host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers)
content := host.ConstructP2pMessage(byte(0), buffer) utils.GetLogInstance().Info("PingMsgHandler send pong message")
err := node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content) }
if err != nil {
utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", p2p.GroupIDBeacon) return 1
}
// SendPongMessage is the a goroutine to periodcally send pong message to all peers
func (node *Node) SendPongMessage() {
tick := time.NewTicker(10 * time.Second)
numPeers := len(node.Consensus.GetValidatorPeers())
numPubKeys := len(node.Consensus.PublicKeys)
sentMessage := false
// Send Pong Message only when there is change on the number of peers
for {
select {
case <-tick.C:
peers := node.Consensus.GetValidatorPeers()
numPeersNow := len(peers)
numPubKeysNow := len(node.Consensus.PublicKeys)
// no peers, wait for another tick
if numPeersNow == 0 || numPubKeysNow == 0 {
continue
}
// new peers added
if numPubKeysNow != numPubKeys || numPeersNow != numPeers {
sentMessage = false
} else { } else {
utils.GetLogInstance().Debug("[PONG] sent Pong Message via group send", "group", p2p.GroupIDBeacon) // stable number of peers/pubkeys, sent the pong message
if !sentMessage {
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys)
buffer := pong.ConstructPongMessage()
content := host.ConstructP2pMessage(byte(0), buffer)
err := node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content)
if err != nil {
utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", p2p.GroupIDBeacon)
continue
} else {
utils.GetLogInstance().Info("[PONG] sent pong message to", "group", p2p.GroupIDBeacon)
}
sentMessage = true
}
} }
} else { numPeers = numPeersNow
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers) numPubKeys = numPubKeysNow
utils.GetLogInstance().Info("PingMsgHandler send pong message")
} }
} }
return 1
} }
func (node *Node) pongMessageHandler(msgPayload []byte) int { func (node *Node) pongMessageHandler(msgPayload []byte) int {

Loading…
Cancel
Save