Merge pull request #1428 from LeoHChen/remove_pong_messages

[pingpong] remove pong messages from discovery service
pull/1432/head
Leo Chen 5 years ago committed by GitHub
commit 4e1cb743b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 87
      api/proto/discovery/pingpong.go
  2. 25
      api/proto/discovery/pingpong_test.go
  3. 10
      api/proto/discovery/readme.md
  4. 1
      api/proto/node/node.go
  5. 1
      api/service/discovery/errors.go
  6. 32
      api/service/discovery/service.go
  7. 2
      internal/utils/singleton.go
  8. 15
      node/node.go
  9. 162
      node/node_handler.go
  10. 44
      node/node_test.go
  11. 4
      node/service_setup.go
  12. 9
      node/staking_test.go
  13. 1
      p2p/host/hostv2/hostv2.go
  14. 1
      p2p/p2pimpl/p2pimpl.go

@ -1,11 +1,9 @@
/*
Package proto/discovery implements the discovery ping/pong protocol among nodes.
Package proto/discovery implements the discovery ping protocol among nodes.
pingpong.go adds support of ping/pong messages.
pingpong.go adds support of ping messages.
ping: from node to peers, sending IP/Port/PubKey info
pong: peer responds to ping messages, sending all pubkeys known by peer
*/
package discovery
@ -15,7 +13,6 @@ import (
"encoding/gob"
"fmt"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
"github.com/harmony-one/harmony/api/proto/node"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -30,24 +27,10 @@ type PingMessageType struct {
Node node.Info
}
// PongMessageType defines the data structure of the Pong message
type PongMessageType struct {
ShardID uint32
Version uint16 // version of the protocol
Peers []node.Info
PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders
LeaderPubKey []byte // public key of shard leader
}
func (p PingMessageType) String() string {
return fmt.Sprintf("ping:%v/%v=>%v:%v/%v", p.Node.Role, p.Version, p.Node.IP, p.Node.Port, p.Node.PubKey)
}
func (p PongMessageType) String() string {
str := fmt.Sprintf("pong:%v=>length:%v, keys:%v, leader:%v\n", p.Version, len(p.Peers), len(p.PubKeys), len(p.LeaderPubKey))
return str
}
// NewPingMessage creates a new Ping message based on the p2p.Peer input
func NewPingMessage(peer p2p.Peer, isClient bool) *PingMessageType {
ping := new(PingMessageType)
@ -68,40 +51,6 @@ func NewPingMessage(peer p2p.Peer, isClient bool) *PingMessageType {
return ping
}
// NewPongMessage creates a new Pong message based on a list of p2p.Peer and a list of publicKeys
func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey, leaderKey *bls.PublicKey, shardID uint32) *PongMessageType {
pong := new(PongMessageType)
pong.ShardID = shardID
pong.PubKeys = make([][]byte, 0)
pong.Version = proto.ProtocolVersion
pong.Peers = make([]node.Info, 0)
var err error
for _, p := range peers {
n := node.Info{}
n.IP = p.IP
n.Port = p.Port
n.PeerID = p.PeerID
n.PubKey = p.ConsensusPubKey.Serialize()
if err != nil {
fmt.Printf("Error Marshal PubKey: %v", err)
continue
}
pong.Peers = append(pong.Peers, n)
}
for _, p := range pubKeys {
key := p.Serialize()
pong.PubKeys = append(pong.PubKeys, key)
}
pong.LeaderPubKey = leaderKey.Serialize()
return pong
}
// GetPingMessage deserializes the Ping Message from a list of byte
func GetPingMessage(payload []byte) (*PingMessageType, error) {
ping := new(PingMessageType)
@ -118,24 +67,6 @@ func GetPingMessage(payload []byte) (*PingMessageType, error) {
return ping, nil
}
// GetPongMessage deserializes the Pong Message from a list of byte
func GetPongMessage(payload []byte) (*PongMessageType, error) {
pong := new(PongMessageType)
pong.Peers = make([]node.Info, 0)
pong.PubKeys = make([][]byte, 0)
r := bytes.NewBuffer(payload)
decoder := gob.NewDecoder(r)
err := decoder.Decode(pong)
if err != nil {
utils.Logger().Error().Err(err).Msg("[GetPongMessage] Decode")
return nil, fmt.Errorf("Decode Pong Error")
}
return pong, nil
}
// ConstructPingMessage contructs ping message from node to leader
func (p PingMessageType) ConstructPingMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
@ -149,17 +80,3 @@ func (p PingMessageType) ConstructPingMessage() []byte {
}
return byteBuffer.Bytes()
}
// ConstructPongMessage contructs pong message from leader to node
func (p PongMessageType) ConstructPongMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(node.PONG))
encoder := gob.NewEncoder(byteBuffer)
err := encoder.Encode(p)
if err != nil {
utils.Logger().Error().Err(err).Msg("[ConstructPongMessage] Encode")
return nil
}
return byteBuffer.Bytes()
}

@ -37,8 +37,6 @@ var (
ConsensusPubKey: pubKey2,
},
}
e2 = "pong:1=>length:2"
leaderPubKey = pki.GetBLSPrivateKeyFromInt(888).GetPublicKey()
pubKeys = []*bls.PublicKey{pubKey1, pubKey2}
@ -61,13 +59,6 @@ func TestString(test *testing.T) {
if strings.Compare(r3, e3) != 0 {
test.Errorf("expect: %v, got: %v", e3, r3)
}
pong1 := NewPongMessage(p2, pubKeys, leaderPubKey, 0)
r2 := fmt.Sprintf("%v", *pong1)
if !strings.HasPrefix(r2, e2) {
test.Errorf("expect: %v, got: %v", e2, r2)
}
}
func TestSerialize(test *testing.T) {
@ -84,20 +75,4 @@ func TestSerialize(test *testing.T) {
if !reflect.DeepEqual(ping, ping1) {
test.Error("Serialize/Deserialze Ping Message Failed")
}
pong1 := NewPongMessage(p2, pubKeys, leaderPubKey, 0)
buf2 = pong1.ConstructPongMessage()
msg2, err := proto.GetMessagePayload(buf2)
if err != nil {
test.Error("GetMessagePayload Failed!")
}
pong, err := GetPongMessage(msg2)
if err != nil {
test.Error("Pong failed!")
}
if !reflect.DeepEqual(pong, pong1) {
test.Error("Serialize/Deserialze Pong Message Failed")
}
}

@ -9,19 +9,9 @@ but it will be removed once the full integration of libp2p is finished as the IP
It also contains a Role field to indicate if the node is a client node or regular node, as client node
won't join the consensus.
## Pong
Pong message is sent by leader to all validators, once the leader has enough validators.
It contains a list of peers and the corresponding BLS public keys.
Noted, the list of peers may not be needed once we have libp2p fully integrated.
The order of the peers and keys are important to the consensus.
At bootstrap, the Pong message is sent out and then the consensus should start.
## TODO
The following two todo should be worked on once we have full libp2p integration.
For network security reason, we should in general not expose the IP/Port of the node.
-[] remove peer info in Ping message, only keep peerID, which should be sufficient for p2p communication.
-[] remove peer list from Pong message.

@ -25,7 +25,6 @@ const (
Client
_ // used to be Control
PING // node send ip/pki to register with leader
PONG // node broadcast pubK
ShardState
)

@ -7,6 +7,5 @@ var (
ErrGetPeers = errors.New("[DISCOVERY]: get peer list failed")
ErrConnectionFull = errors.New("[DISCOVERY]: node's incoming connection full")
ErrPing = errors.New("[DISCOVERY]: ping peer failed")
ErrPong = errors.New("[DISCOVERY]: pong peer failed")
ErrDHTBootstrap = errors.New("[DISCOVERY]: DHT bootstrap failed")
)

@ -74,17 +74,15 @@ func (s *Service) Run() {
}
func (s *Service) contactP2pPeers() {
pingInterval := 5
nodeConfig := nodeconfig.GetShardConfig(s.config.ShardID)
// Don't send ping message for Explorer Node
if nodeConfig.Role() == nodeconfig.ExplorerNode {
return
}
tick := time.NewTicker(5 * time.Second)
pingMsg := proto_discovery.NewPingMessage(s.host.GetSelfPeer(), s.config.IsClient)
utils.Logger().Info().Interface("myPing", pingMsg).Msg("Constructing Ping Message")
msgBuf := host.ConstructP2pMessage(byte(0), pingMsg.ConstructPingMessage())
s.sentPingMessage(s.config.ShardGroupID, msgBuf)
@ -102,29 +100,19 @@ func (s *Service) contactP2pPeers() {
s.addBeaconPeerFunc(&peer)
}
}
// Add to outgoing peer list
// s.host.AddOutgoingPeer(peer)
// utils.Logger().Debug().Interface("add outgoing peer", peer).Msg("[DISCOVERY]")
case <-s.stopChan:
utils.Logger().Debug().Msg("[DISCOVERY] stop pinging ...")
return
case action := <-s.actionChan:
s.config.Actions[action.Name] = action.Action
case <-tick.C:
for g, a := range s.config.Actions {
if a == p2p.ActionPause {
// Received Pause Message, to reduce the frequency of ping message to every 1 minute
// TODO (leo) use different timer tick for different group, mainly differentiate beacon and regular shards
// beacon ping could be less frequent than regular shard
tick.Stop()
tick = time.NewTicker(5 * time.Minute)
}
if a == p2p.ActionStart || a == p2p.ActionResume || a == p2p.ActionPause {
s.sentPingMessage(g, msgBuf)
}
}
s.sentPingMessage(s.config.ShardGroupID, msgBuf)
// the longest sleep is 3600 seconds
if pingInterval >= 3600 {
pingInterval = 3600
} else {
pingInterval *= 2
}
time.Sleep(time.Duration(pingInterval) * time.Second)
}
}

@ -9,6 +9,7 @@ import (
"path"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/natefinch/lumberjack"
@ -139,6 +140,7 @@ func setZeroLoggerFileOutput(filepath string, maxSize int) error {
// Logger returns a zerolog.Logger singleton
func Logger() *zerolog.Logger {
if zeroLogger == nil {
zerolog.TimeFieldFormat = time.RFC3339Nano
logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}).
Level(zeroLoggerLevel).
With().

@ -304,13 +304,6 @@ func (node *Node) getTransactionsForNewBlock(coinbase common.Address) types.Tran
return selected
}
// MaybeKeepSendingPongMessage keeps sending pong message if the current node is a leader.
func (node *Node) MaybeKeepSendingPongMessage() {
if node.Consensus != nil && node.Consensus.IsLeader() {
go node.SendPongMessage()
}
}
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
select {}
@ -436,6 +429,8 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.startConsensus = make(chan struct{})
go node.bootstrapConsensus()
return &node
}
@ -494,12 +489,6 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int {
}
}
// Only leader needs to add the peer info into consensus
// Validators will receive the updated peer info from Leader via pong message
// TODO: remove this after fully migrating to beacon chain-based committee membership
// // TODO: make peers into a context object shared by consensus and drand
// node.DRand.AddPeers(peers)
//}
return count
}

@ -25,7 +25,6 @@ import (
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
"github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/contracts/structs"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
@ -201,8 +200,6 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) {
}
case proto_node.PING:
node.pingMessageHandler(msgPayload, sender)
case proto_node.PONG:
node.pongMessageHandler(msgPayload)
case proto_node.ShardState:
if err := node.epochShardStateMessageHandler(msgPayload); err != nil {
ctxerror.Log15(utils.GetLogger().Warn, err)
@ -713,15 +710,6 @@ func getGenesisNodeByConsensusKey(key types.BlsPublicKey) *genesisNode {
}
func (node *Node) pingMessageHandler(msgPayload []byte, sender libp2p_peer.ID) int {
senderStr := string(sender)
if senderStr != "" {
_, ok := node.duplicatedPing.LoadOrStore(senderStr, true)
if ok {
// duplicated ping message return
return 0
}
}
ping, err := proto_discovery.GetPingMessage(msgPayload)
if err != nil {
utils.Logger().Error().
@ -746,15 +734,22 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender libp2p_peer.ID) i
}
}
var k types.BlsPublicKey
if err := k.FromLibBLSPublicKey(peer.ConsensusPubKey); err != nil {
err = ctxerror.New("cannot convert BLS public key").WithCause(err)
ctxerror.Log15(utils.GetLogger().Warn, err)
}
utils.Logger().Info().
Str("Peer Version", ping.NodeVer).
Interface("PeerID", peer).
Msg("received ping message")
Str("Version", ping.NodeVer).
Str("BlsKey", peer.ConsensusPubKey.SerializeToHexStr()).
Str("IP", peer.IP).
Str("Port", peer.Port).
Interface("PeerID", peer.PeerID).
Msg("[PING] PeerInfo")
senderStr := string(sender)
if senderStr != "" {
_, ok := node.duplicatedPing.LoadOrStore(senderStr, true)
if ok {
// duplicated ping message return
return 0
}
}
// add to incoming peer list
//node.host.AddIncomingPeer(*peer)
@ -776,140 +771,27 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender libp2p_peer.ID) i
return 1
}
// SendPongMessage is the a goroutine to periodcally send pong message to all peers
func (node *Node) SendPongMessage() {
tick := time.NewTicker(2 * time.Second)
tick2 := time.NewTicker(120 * time.Second)
numPeers := node.numPeers
sentMessage := false
firstTime := true
// Send Pong Message only when there is change on the number of peers
// bootstrapConsensus is the a goroutine to check number of peers and start the consensus
func (node *Node) bootstrapConsensus() {
tick := time.NewTicker(5 * time.Second)
for {
select {
case <-tick.C:
peers := node.Consensus.GetValidatorPeers()
numPeersNow := node.numPeers
// no peers, wait for another tick
if numPeersNow == 0 {
utils.Logger().Info().
Int("numPeers", numPeers).
Int("numPeersNow", numPeersNow).
Msg("[PONG] No peers, continue")
continue
}
// new peers added
if numPeersNow != numPeers {
utils.Logger().Info().
Int("numPeers", numPeers).
Int("numPeersNow", numPeersNow).
Msg("[PONG] Different number of peers")
sentMessage = false
} else {
// stable number of peers, sent the pong message
// also make sure number of peers is greater than the minimal required number
if !sentMessage && numPeersNow >= node.Consensus.MinPeers {
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey(), node.Consensus.ShardID)
buffer := pong.ConstructPongMessage()
err := node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetShardGroupID()}, host.ConstructP2pMessage(byte(0), buffer))
if err != nil {
utils.Logger().Error().
Str("group", string(node.NodeConfig.GetShardGroupID())).
Msg("[PONG] Failed to send pong message")
Msg("No peers, continue")
continue
} else {
utils.Logger().Info().
Str("group", string(node.NodeConfig.GetShardGroupID())).
Int("# nodes", numPeersNow).
Msg("[PONG] Sent pong message to")
}
sentMessage = true
// only need to notify consensus leader once to start the consensus
if firstTime {
// Leader stops sending ping message
node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery})
utils.Logger().Info().Msg("[PONG] StartConsensus")
if numPeersNow >= node.Consensus.MinPeers {
utils.Logger().Info().Msg("[bootstrap] StartConsensus")
node.startConsensus <- struct{}{}
firstTime = false
}
}
}
numPeers = numPeersNow
case <-tick2.C:
// send pong message regularly to make sure new node received all the public keys
// also nodes offline/online will receive the public keys
peers := node.Consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey(), node.Consensus.ShardID)
buffer := pong.ConstructPongMessage()
err := node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetShardGroupID()}, host.ConstructP2pMessage(byte(0), buffer))
if err != nil {
utils.Logger().Error().
Str("group", string(node.NodeConfig.GetShardGroupID())).
Msg("[PONG] Failed to send regular pong message")
continue
} else {
utils.Logger().Info().
Str("group", string(node.NodeConfig.GetShardGroupID())).
Int("# nodes", len(peers)).
Msg("[PONG] Sent regular pong message to")
}
}
}
}
func (node *Node) pongMessageHandler(msgPayload []byte) int {
pong, err := proto_discovery.GetPongMessage(msgPayload)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Can't get Pong Message")
return -1
}
if pong.ShardID != node.Consensus.ShardID {
utils.Logger().Error().
Uint32("receivedShardID", pong.ShardID).
Uint32("expectedShardID", node.Consensus.ShardID).
Msg("Received Pong message for the wrong shard")
return 0
}
peers := make([]*p2p.Peer, 0)
for _, p := range pong.Peers {
peer := new(p2p.Peer)
peer.IP = p.IP
peer.Port = p.Port
peer.PeerID = p.PeerID
peer.ConsensusPubKey = &bls.PublicKey{}
if len(p.PubKey) != 0 { // TODO: add the check in bls library
err = peer.ConsensusPubKey.Deserialize(p.PubKey[:])
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Deserialize ConsensusPubKey Failed")
continue
}
return
}
peers = append(peers, peer)
}
if len(peers) > 0 {
node.AddPeers(peers)
}
// Stop discovery service after received pong message
data := make(map[string]interface{})
data["peer"] = p2p.GroupAction{Name: node.NodeConfig.GetShardGroupID(), Action: p2p.ActionPause}
node.serviceManager.TakeAction(&service.Action{Action: service.Notify, ServiceType: service.PeerDiscovery, Params: data})
// TODO: remove this after fully migrating to beacon chain-based committee membership
return 0
}
func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error {

@ -13,8 +13,6 @@ import (
bls2 "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/drand"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
@ -284,51 +282,9 @@ func sendPingMessage(node *Node, leader p2p.Peer) {
_ = ping2.ConstructPingMessage()
}
func sendPongMessage(node *Node, leader p2p.Peer) {
pubKey1 := pki.GetBLSPrivateKeyFromInt(333).GetPublicKey()
pubKey2 := pki.GetBLSPrivateKeyFromInt(444).GetPublicKey()
p1 := p2p.Peer{
IP: "127.0.0.1",
Port: "9998",
ConsensusPubKey: pubKey1,
}
p2 := p2p.Peer{
IP: "127.0.0.1",
Port: "9999",
ConsensusPubKey: pubKey2,
}
pubKeys := []*bls.PublicKey{pubKey1, pubKey2}
leaderPubKey := pki.GetBLSPrivateKeyFromInt(888).GetPublicKey()
pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, pubKeys, leaderPubKey, 0)
_ = pong1.ConstructPongMessage()
}
func exitServer() {
fmt.Println("wait 5 seconds to terminate the process ...")
time.Sleep(5 * time.Second)
os.Exit(0)
}
func TestPingPongHandler(t *testing.T) {
blsKey := bls2.RandPrivateKey()
pubKey := blsKey.GetPublicKey()
leader := p2p.Peer{IP: "127.0.0.1", Port: "8881", ConsensusPubKey: pubKey}
// validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := consensus.New(host, 0, leader, blsKey)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
node := New(host, consensus, testDBFactory, false)
//go sendPingMessage(leader)
go sendPongMessage(node, leader)
go exitServer()
node.StartServer()
}

@ -88,10 +88,6 @@ func (node *Node) setupForExplorerNode() {
// ServiceManagerSetup setups service store.
func (node *Node) ServiceManagerSetup() {
// Run pingpong message protocol for all type of nodes.
// TODO(investigation): This is supposed to move to discovery service but it did not work when trying to move there.
node.MaybeKeepSendingPongMessage()
node.serviceManager = &service.Manager{}
node.serviceMessageChan = make(map[service.Type]chan *msg_pb.Message)
switch node.NodeConfig.Role() {

@ -3,6 +3,7 @@ package node
import (
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/consensus"
@ -24,18 +25,20 @@ var (
)
func TestUpdateStakingList(t *testing.T) {
pubKey := bls.RandPrivateKey().GetPublicKey()
blsKey := bls.RandPrivateKey()
pubKey := blsKey.GetPublicKey()
leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", ConsensusPubKey: pubKey}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := consensus.New(host, 0, leader, nil)
consensus, err := consensus.New(host, 0, leader, blsKey)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
node := New(host, consensus, testDBFactory, false)
node.BlockPeriod = 8 * time.Second
for i := 0; i < 5; i++ {
selectedTxs := node.getTransactionsForNewBlock(common.Address{})
@ -60,7 +63,9 @@ func TestUpdateStakingList(t *testing.T) {
node.UpdateStakingList(stakeInfo)
/*
if node.CurrentStakes[testAddress].Amount.Cmp(amount) != 0 {
t.Error("Stake Info is not updated correctly")
}
*/
}

@ -189,6 +189,7 @@ func New(self *p2p.Peer, priKey libp2p_crypto.PrivKey) *HostV2 {
Str("port", self.Port).
Str("id", p2pHost.ID().Pretty()).
Str("addr", listenAddr.String()).
Str("PubKey", self.ConsensusPubKey.SerializeToHexStr()).
Msg("HostV2 is up!")
return h

@ -19,6 +19,7 @@ func NewHost(self *p2p.Peer, key libp2p_crypto.PrivKey) (p2p.Host, error) {
utils.Logger().Info().
Str("self", net.JoinHostPort(self.IP, self.Port)).
Interface("PeerID", self.PeerID).
Str("PubKey", self.ConsensusPubKey.SerializeToHexStr()).
Msg("NewHost")
return h, nil

Loading…
Cancel
Save