Merge pull request #82 from harmony-one/lc4pr-pd

add ping/pong message handler
pull/85/head
Leo Chen 6 years ago committed by GitHub
commit 18c018dffc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      consensus/consensus.go
  2. 32
      node/node.go
  3. 28
      node/node_handler.go
  4. 115
      node/node_test.go
  5. 6
      proto/node/node.go
  6. 65
      proto/node/pingpong.go
  7. 71
      proto/node/pingpong_test.go

@ -35,6 +35,9 @@ type Consensus struct {
responses *map[uint16]kyber.Scalar
finalResponses *map[uint16]kyber.Scalar
// map of nodeID to validator Peer object
// FIXME: should use PubKey of p2p.Peer as the hashkey
// However, we have assumed uint16 in consensus/consensus_leader.go:136
// we won't change it now
validators map[uint16]p2p.Peer
// Leader
leader p2p.Peer
@ -214,3 +217,15 @@ func (consensus *Consensus) String() string {
return fmt.Sprintf("[duty:%s, priKey:%s, ShardID:%v, nodeID:%v, state:%s]",
duty, consensus.priKey.String(), consensus.ShardID, consensus.nodeID, consensus.state)
}
func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
count := 0
for _, peer := range peers {
_, ok := consensus.validators[utils.GetUniqueIdFromPeer(peer)]
if !ok {
consensus.validators[utils.GetUniqueIdFromPeer(peer)] = peer
count++
}
}
return count
}

@ -17,6 +17,8 @@ import (
"github.com/harmony-one/harmony/log"
"github.com/harmony-one/harmony/p2p"
proto_identity "github.com/harmony-one/harmony/proto/identity"
"github.com/jinzhu/copier"
)
type NetworkNode struct {
@ -45,9 +47,8 @@ type Node struct {
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer
SyncNode bool // TODO(minhdoan): Remove it later.
// Account Model
chain *core.BlockChain
chain *core.BlockChain // Account Model
Neighbors map[string]*p2p.Peer // All the neighbor nodes, key is the sha256 of Peer IP/Port
}
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
@ -85,7 +86,7 @@ func (node *Node) StartServer(port string) {
// Disable this temporarily.
// node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers())
}
fmt.Println("going to start server")
fmt.Println("going to start server on port:", port)
//node.log.Debug("Starting server", "node", node, "port", port)
node.listenOnPort(port)
}
@ -200,6 +201,29 @@ func New(consensus *consensus.Consensus, db *db.LDBDatabase) *Node {
}
// Logger
node.log = log.New()
node.Neighbors = make(map[string]*p2p.Peer)
return &node
}
// Add neighbors nodes
func (node *Node) AddPeers(peers []p2p.Peer) int {
count := 0
for _, p := range peers {
key := fmt.Sprintf("%v", p.PubKey)
_, ok := node.Neighbors[key]
if !ok {
np := new(p2p.Peer)
copier.Copy(np, &p)
node.Neighbors[key] = np
count++
}
}
node.log.Info("Added", "# of peers", count)
if count > 0 {
c := node.Consensus.AddPeers(peers)
node.log.Info("Added in Consensus", "# of peers", c)
}
return count
}

@ -178,6 +178,12 @@ func (node *Node) NodeHandler(conn net.Conn) {
os.Exit(0)
}
case proto_node.PING:
node.log.Info("NET: received message: PING")
node.pingMessageHandler(msgPayload)
case proto_node.PONG:
node.log.Info("NET: received message: PONG")
node.pongMessageHandler(msgPayload)
}
case proto.CLIENT:
actionType := client.ClientMessageType(msgType)
@ -188,6 +194,8 @@ func (node *Node) NodeHandler(conn net.Conn) {
node.Client.TransactionMessageHandler(msgPayload)
}
}
default:
node.log.Error("Unknown", "MsgCateory:", msgCategory)
}
}
@ -442,3 +450,23 @@ func (node *Node) UpdateUtxoAndState(newBlock *blockchain.Block) {
node.log.Info("LEADER LOCKED UTXO", "num", node.UtxoPool.CountNumOfLockedUtxos(), "ShardID", node.UtxoPool.ShardID)
}
}
func (node *Node) pingMessageHandler(msgPayload []byte) {
ping, err := proto_node.GetPingMessage(msgPayload)
if err != nil {
node.log.Error("Can't get Ping Message")
return
}
node.log.Info("Ping", "Msg", ping)
return
}
func (node *Node) pongMessageHandler(msgPayload []byte) {
pong, err := proto_node.GetPongMessage(msgPayload)
if err != nil {
node.log.Error("Can't get Pong Message")
return
}
node.log.Info("Pong", "Msg", pong)
return
}

@ -1,10 +1,18 @@
package node
import (
"fmt"
"os"
"testing"
"time"
"github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/p2p"
proto_node "github.com/harmony-one/harmony/proto/node"
)
func TestNewNewNode(test *testing.T) {
@ -45,3 +53,110 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) {
test.Error("Count of transactions in the blockchain is incorrect")
}
}
func TestAddPeers(test *testing.T) {
priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333))
pubKey1 := pki.GetPublicKeyFromScalar(priKey1)
priKey2 := crypto.Ed25519Curve.Scalar().SetInt64(int64(999))
pubKey2 := pki.GetPublicKeyFromScalar(priKey2)
peers1 := []p2p.Peer{
{
Ip: "127.0.0.1",
Port: "8888",
PubKey: pubKey1,
Ready: true,
ValidatorID: 1,
},
{
Ip: "127.0.0.1",
Port: "9999",
PubKey: pubKey2,
Ready: false,
ValidatorID: 2,
},
}
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
node := New(consensus, nil)
r1 := node.AddPeers(peers1)
e1 := 2
if r1 != e1 {
test.Errorf("Add %v peers, expectd %v", r1, e1)
}
r2 := node.AddPeers(peers1)
e2 := 0
if r2 != e2 {
test.Errorf("Add %v peers, expectd %v", r2, e2)
}
}
func sendPingMessage(leader p2p.Peer) {
priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333))
pubKey1 := pki.GetPublicKeyFromScalar(priKey1)
p1 := p2p.Peer{
Ip: "127.0.0.1",
Port: "9999",
PubKey: pubKey1,
}
ping1 := proto_node.NewPingMessage(p1)
buf1 := ping1.ConstructPingMessage()
fmt.Println("waiting for 5 seconds ...")
time.Sleep(5 * time.Second)
p2p.SendMessage(leader, buf1)
fmt.Println("sent ping message ...")
}
func sendPongMessage(leader p2p.Peer) {
priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333))
pubKey1 := pki.GetPublicKeyFromScalar(priKey1)
p1 := p2p.Peer{
Ip: "127.0.0.1",
Port: "9998",
PubKey: pubKey1,
}
priKey2 := crypto.Ed25519Curve.Scalar().SetInt64(int64(999))
pubKey2 := pki.GetPublicKeyFromScalar(priKey2)
p2 := p2p.Peer{
Ip: "127.0.0.1",
Port: "9999",
PubKey: pubKey2,
}
pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2})
buf1 := pong1.ConstructPongMessage()
fmt.Println("waiting for 10 seconds ...")
time.Sleep(10 * time.Second)
p2p.SendMessage(leader, buf1)
fmt.Println("sent pong message ...")
}
func exitServer() {
fmt.Println("wait 15 seconds to terminate the process ...")
time.Sleep(15 * time.Second)
os.Exit(0)
}
func TestPingPongHandler(test *testing.T) {
leader := p2p.Peer{Ip: "127.0.0.1", Port: "8881"}
validator := p2p.Peer{Ip: "127.0.0.1", Port: "9991"}
consensus := consensus.NewConsensus("127.0.0.1", "8881", "0", []p2p.Peer{leader, validator}, leader)
node := New(consensus, nil)
go sendPingMessage(leader)
go sendPongMessage(leader)
go exitServer()
node.StartServer("8881")
}

@ -13,6 +13,10 @@ import (
// NodeMessageType is to indicate the specific type of message under NODE category
type NodeMessageType byte
const (
PROTOCOL_VERSION = 1
)
const (
Transaction NodeMessageType = iota
BLOCK
@ -20,7 +24,7 @@ const (
CONTROL
BlockchainSync
PING // node send ip/pki to register with leader
PONG // leader
PONG // node broadcast pubK
// TODO: add more types
)

@ -4,11 +4,7 @@ Package proto/node implements the communication protocol among nodes.
pingpong.go adds support of ping/pong messages.
ping: from node to peers, sending IP/Port/PubKey info
TODO: add protocol version support
pong: peer responds to ping messages, sending all pubkeys known by peer
TODO:
* add the version of the protocol
*/
@ -18,25 +14,66 @@ import (
"bytes"
"encoding/gob"
"fmt"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/proto"
"log"
)
type PingMessageType struct {
type nodeInfo struct {
IP string
Port string
PubKey string
}
type PingMessageType struct {
Version uint16 // version of the protocol
Node nodeInfo
}
type PongMessageType struct {
PubKeys []string
Version uint16 // version of the protocol
Peers []nodeInfo
}
func (p PingMessageType) String() string {
return fmt.Sprintf("%v:%v/%v", p.IP, p.Port, p.PubKey)
return fmt.Sprintf("%v=>%v:%v/%v", p.Version, p.Node.IP, p.Node.Port, p.Node.PubKey)
}
func (p PongMessageType) String() string {
return fmt.Sprintf("# Keys: %v", len(p.PubKeys))
str := fmt.Sprintf("%v=># Peers: %v", p.Version, len(p.Peers))
for _, p := range p.Peers {
str = fmt.Sprintf("%v\n%v:%v/%v", str, p.IP, p.Port, p.PubKey)
}
return str
}
func NewPingMessage(peer p2p.Peer) *PingMessageType {
ping := new(PingMessageType)
ping.Version = PROTOCOL_VERSION
ping.Node.IP = peer.Ip
ping.Node.Port = peer.Port
ping.Node.PubKey = fmt.Sprintf("%v", peer.PubKey)
return ping
}
func NewPongMessage(peers []p2p.Peer) *PongMessageType {
pong := new(PongMessageType)
pong.Version = PROTOCOL_VERSION
pong.Peers = make([]nodeInfo, 0)
for _, p := range peers {
n := nodeInfo{}
n.IP = p.Ip
n.Port = p.Port
n.PubKey = fmt.Sprintf("%v", p.PubKey)
pong.Peers = append(pong.Peers, n)
}
return pong
}
// Deserialize Ping Message
@ -71,8 +108,10 @@ func GetPongMessage(payload []byte) (*PongMessageType, error) {
// ConstructPingMessage contructs ping message from node to leader
func (ping PingMessageType) ConstructPingMessage() []byte {
var byteBuffer bytes.Buffer
encoder := gob.NewEncoder(&byteBuffer)
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(PING))
encoder := gob.NewEncoder(byteBuffer)
err := encoder.Encode(ping)
if err != nil {
log.Panic("Can't serialize Ping message", "error:", err)
@ -83,8 +122,10 @@ func (ping PingMessageType) ConstructPingMessage() []byte {
// ConstructPongMessage contructs pong message from leader to node
func (pong PongMessageType) ConstructPongMessage() []byte {
var byteBuffer bytes.Buffer
encoder := gob.NewEncoder(&byteBuffer)
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(PONG))
encoder := gob.NewEncoder(byteBuffer)
err := encoder.Encode(pong)
if err != nil {
log.Panic("Can't serialize Pong message", "error:", err)

@ -4,58 +4,91 @@ import (
"fmt"
"strings"
"testing"
"github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/proto"
)
var (
p1 = PingMessageType{"127.0.0.1", "9999", "0x12345678901234567890"}
e1 = "127.0.0.1:9999/0x12345678901234567890"
priKey1 = crypto.Ed25519Curve.Scalar().SetInt64(int64(333))
pubKey1 = pki.GetPublicKeyFromScalar(priKey1)
p1 = p2p.Peer{
Ip: "127.0.0.1",
Port: "9999",
PubKey: pubKey1,
}
e1 = "1=>127.0.0.1:9999/5ad91c4440d3a0e83df49ff4a0243da1edf2ec2d9376ed58ea7ac6bc9d745ae4"
p2 = PongMessageType{
[]string{
"0x1111111111111",
"0x2222222222222",
"0x3333333333333",
priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999))
pubKey2 = pki.GetPublicKeyFromScalar(priKey2)
p2 = []p2p.Peer{
{
Ip: "127.0.0.1",
Port: "8888",
PubKey: pubKey1,
Ready: true,
ValidatorID: 1,
},
{
Ip: "127.0.0.1",
Port: "9999",
PubKey: pubKey2,
Ready: false,
ValidatorID: 2,
},
}
e2 = "# Keys: 3"
e2 = "1=># Peers: 2"
buf1 []byte
buf2 []byte
)
func TestString(test *testing.T) {
r1 := fmt.Sprintf("%v", p1)
ping1 := NewPingMessage(p1)
r1 := fmt.Sprintf("%v", *ping1)
if strings.Compare(r1, e1) != 0 {
test.Errorf("expect: %v, got: %v", e1, r1)
} else {
fmt.Printf("Ping:%v\n", p1)
fmt.Printf("Ping:%v\n", r1)
}
r2 := fmt.Sprintf("%v", p2)
pong1 := NewPongMessage(p2)
r2 := fmt.Sprintf("%v", *pong1)
if strings.Compare(r2, e2) != 0 {
if !strings.HasPrefix(r2, e2) {
test.Errorf("expect: %v, got: %v", e2, r2)
} else {
fmt.Printf("Pong:%v\n", p2)
fmt.Printf("Pong:%v\n", r2)
}
}
func TestSerialize(test *testing.T) {
buf1 = p1.ConstructPingMessage()
fmt.Printf("buf: %v\n", buf1)
ping1 := NewPingMessage(p1)
buf1 = ping1.ConstructPingMessage()
fmt.Printf("buf ping: %v\n", buf1)
buf2 = p2.ConstructPongMessage()
fmt.Printf("buf: %v\n", buf2)
pong1 := NewPongMessage(p2)
buf2 = pong1.ConstructPongMessage()
fmt.Printf("buf pong: %v\n", buf2)
}
func TestDeserialize(test *testing.T) {
ping, err := GetPingMessage(buf1)
msg1, err := proto.GetMessagePayload(buf1)
if err != nil {
test.Error("GetMessagePayload Failed!")
}
ping, err := GetPingMessage(msg1)
if err != nil {
test.Error("Ping failed!")
}
fmt.Printf("Ping:%v\n", ping)
pong, err := GetPongMessage(buf2)
msg2, err := proto.GetMessagePayload(buf2)
pong, err := GetPongMessage(msg2)
if err != nil {
test.Error("Pong failed!")
}

Loading…
Cancel
Save