Merge pull request #87 from harmony-one/lc4pr-pd

[HAR-5]: handle ping/pong messages properly
pull/88/head
Leo Chen 6 years ago committed by GitHub
commit f34ae3e537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      benchmark.go
  2. 27
      node/node.go
  3. 48
      node/node_handler.go
  4. 2
      node/node_test.go
  5. 18
      proto/node/pingpong.go
  6. 2
      proto/node/pingpong_test.go
  7. 16
      test_before_submit.sh

@ -203,6 +203,8 @@ func main() {
// Temporary testing code, to be removed. // Temporary testing code, to be removed.
currentNode.AddTestingAddresses(10000) currentNode.AddTestingAddresses(10000)
currentNode.State = node.WAIT
if consensus.IsLeader { if consensus.IsLeader {
if *accountModel { if *accountModel {
// Let consensus run // Let consensus run
@ -223,6 +225,10 @@ func main() {
currentNode.WaitForConsensusReady(consensus.ReadySignal) currentNode.WaitForConsensusReady(consensus.ReadySignal)
}() }()
} }
} else {
go func() {
currentNode.JoinShard(leader)
}()
} }
currentNode.StartServer(*port) currentNode.StartServer(*port)

@ -24,10 +24,20 @@ import (
"github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/log"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
proto_identity "github.com/harmony-one/harmony/proto/identity" proto_identity "github.com/harmony-one/harmony/proto/identity"
proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
) )
type NodeState byte
const (
INIT NodeState = iota // Node just started, before contacting BeaconChain
WAIT // Node contacted BeaconChain, wait to join Shard
JOIN // Node joined Shard, ready for consensus
OFFLINE // Node is offline
)
type NetworkNode struct { type NetworkNode struct {
SelfPeer p2p.Peer SelfPeer p2p.Peer
IDCPeer p2p.Peer IDCPeer p2p.Peer
@ -57,6 +67,7 @@ type Node struct {
SyncNode bool // TODO(minhdoan): Remove it later. SyncNode bool // TODO(minhdoan): Remove it later.
chain *core.BlockChain // Account Model chain *core.BlockChain // Account Model
Neighbors map[string]*p2p.Peer // All the neighbor nodes, key is the sha256 of Peer IP/Port Neighbors map[string]*p2p.Peer // All the neighbor nodes, key is the sha256 of Peer IP/Port
State NodeState // State of the Node
// Account Model // Account Model
Chain *core.BlockChain Chain *core.BlockChain
@ -235,6 +246,7 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
// Logger // Logger
node.log = log.New() node.log = log.New()
node.Neighbors = make(map[string]*p2p.Peer) node.Neighbors = make(map[string]*p2p.Peer)
node.State = INIT
return &node return &node
} }
@ -260,3 +272,18 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
} }
return count return count
} }
func (node *Node) JoinShard(leader p2p.Peer) {
// try to join the shard, with 10 minutes time-out
backoff := p2p.NewExpBackoff(500*time.Millisecond, 10*time.Minute, 2)
for node.State == WAIT {
backoff.Sleep()
ping := proto_node.NewPingMessage(node.SelfPeer)
buffer := ping.ConstructPingMessage()
p2p.SendMessage(leader, buffer)
node.log.Debug("Sent ping message")
}
}

@ -15,6 +15,7 @@ import (
"time" "time"
"github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/blockchain"
hmy_crypto "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto"
"github.com/harmony-one/harmony/proto/client" "github.com/harmony-one/harmony/proto/client"
@ -510,6 +511,33 @@ func (node *Node) pingMessageHandler(msgPayload []byte) {
return return
} }
node.log.Info("Ping", "Msg", ping) node.log.Info("Ping", "Msg", ping)
peer := new(p2p.Peer)
peer.Ip = ping.Node.IP
peer.Port = ping.Node.Port
peer.PubKey = hmy_crypto.Ed25519Curve.Point()
err = peer.PubKey.UnmarshalBinary(ping.Node.PubKey[:])
if err != nil {
node.log.Error("UnmarshalBinary Failed", "error", err)
return
}
node.AddPeers([]p2p.Peer{*peer})
// TODO: add public key to consensus.pubkeys
// Send a Pong message back
peers := make([]p2p.Peer, 0)
for _, v := range node.Neighbors {
peers = append(peers, *v)
}
pong := proto_node.NewPongMessage(peers)
buffer := pong.ConstructPongMessage()
p2p.SendMessage(*peer, buffer)
// TODO: broadcast pong messages to all neighbors
return return
} }
@ -520,5 +548,25 @@ func (node *Node) pongMessageHandler(msgPayload []byte) {
return return
} }
node.log.Info("Pong", "Msg", pong) node.log.Info("Pong", "Msg", pong)
peers := make([]p2p.Peer, 0)
for _, p := range pong.Peers {
peer := new(p2p.Peer)
peer.Ip = p.IP
peer.Port = p.Port
peer.PubKey = hmy_crypto.Ed25519Curve.Point()
err = peer.PubKey.UnmarshalBinary(p.PubKey[:])
if err != nil {
node.log.Error("UnmarshalBinary Failed", "error", err)
continue
}
peers = append(peers, *peer)
}
node.AddPeers(peers)
// TODO: add public key to consensus.pubkeys
return return
} }

@ -154,7 +154,7 @@ func TestPingPongHandler(test *testing.T) {
node := New(consensus, nil) node := New(consensus, nil)
go sendPingMessage(leader) // go sendPingMessage(leader)
go sendPongMessage(leader) go sendPongMessage(leader)
go exitServer() go exitServer()

@ -22,7 +22,7 @@ import (
type nodeInfo struct { type nodeInfo struct {
IP string IP string
Port string Port string
PubKey string PubKey []byte
} }
type PingMessageType struct { type PingMessageType struct {
@ -50,10 +50,16 @@ func (p PongMessageType) String() string {
func NewPingMessage(peer p2p.Peer) *PingMessageType { func NewPingMessage(peer p2p.Peer) *PingMessageType {
ping := new(PingMessageType) ping := new(PingMessageType)
var err error
ping.Version = PROTOCOL_VERSION ping.Version = PROTOCOL_VERSION
ping.Node.IP = peer.Ip ping.Node.IP = peer.Ip
ping.Node.Port = peer.Port ping.Node.Port = peer.Port
ping.Node.PubKey = fmt.Sprintf("%v", peer.PubKey) ping.Node.PubKey, err = peer.PubKey.MarshalBinary()
if err != nil {
fmt.Printf("Error Marshall PubKey: %v", err)
return nil
}
return ping return ping
} }
@ -64,12 +70,16 @@ func NewPongMessage(peers []p2p.Peer) *PongMessageType {
pong.Version = PROTOCOL_VERSION pong.Version = PROTOCOL_VERSION
pong.Peers = make([]nodeInfo, 0) pong.Peers = make([]nodeInfo, 0)
var err error
for _, p := range peers { for _, p := range peers {
n := nodeInfo{} n := nodeInfo{}
n.IP = p.Ip n.IP = p.Ip
n.Port = p.Port n.Port = p.Port
n.PubKey = fmt.Sprintf("%v", p.PubKey) n.PubKey, err = p.PubKey.MarshalBinary()
if err != nil {
fmt.Printf("Error Marshall PubKey: %v", err)
continue
}
pong.Peers = append(pong.Peers, n) pong.Peers = append(pong.Peers, n)
} }

@ -19,7 +19,7 @@ var (
Port: "9999", Port: "9999",
PubKey: pubKey1, PubKey: pubKey1,
} }
e1 = "1=>127.0.0.1:9999/5ad91c4440d3a0e83df49ff4a0243da1edf2ec2d9376ed58ea7ac6bc9d745ae4" e1 = "1=>127.0.0.1:9999/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]"
priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999)) priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999))
pubKey2 = pki.GetPublicKeyFromScalar(priKey2) pubKey2 = pki.GetPublicKeyFromScalar(priKey2)

@ -1,6 +1,20 @@
#!/bin/bash #!/bin/bash
DIRROOT=$(dirname $0) DIRROOT=$(dirname $0)
OS=$(uname -s)
go test ./... go test ./...
$DIRROOT/.travis.gofmt.sh
pushd $DIRROOT
./.travis.gofmt.sh
case $OS in
Darwin)
./go_executable_build.sh -o darwin
;;
Linux)
./go_executable_build.sh
;;
esac
popd

Loading…
Cancel
Save