[HAR-5] Merge pull request #95 from harmony-one/lc4pr

broadcast pong messages to all neighbors
pull/102/head
Leo Chen 6 years ago committed by GitHub
commit 466e3e35ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      consensus/consensus.go
  2. 9
      deploy.sh
  3. 6
      node/node.go
  4. 47
      node/node_handler.go
  5. 2
      p2p/peer.go
  6. 8
      proto/node/pingpong_test.go

@ -239,7 +239,9 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
for _, peer := range peers { for _, peer := range peers {
_, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer))
if !ok { if !ok {
if peer.ValidatorID == -1 {
peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId()) peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId())
}
consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
consensus.publicKeys = append(consensus.publicKeys, peer.PubKey) consensus.publicKeys = append(consensus.publicKeys, peer.PubKey)
count++ count++
@ -267,3 +269,29 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function // TODO (lc) we need to have a corresponding RemovePeers function
return 0 return 0
} }
// DebugPrintPublicKeys print all the publicKeys in string format in Consensus
func (consensus *Consensus) DebugPrintPublicKeys() {
for _, k := range consensus.publicKeys {
str := fmt.Sprintf("%s", k)
consensus.Log.Debug("pk:", "string", str)
}
consensus.Log.Debug("PublicKeys:", "#", len(consensus.publicKeys))
}
// DebugPrintValidators print all validator ip/port/key in string format in Consensus
func (consensus *Consensus) DebugPrintValidators() {
count := 0
consensus.validators.Range(func(k, v interface{}) bool {
if p, ok := v.(p2p.Peer); ok {
str2 := fmt.Sprintf("%s", p.PubKey)
consensus.Log.Debug("validator:", "IP", p.Ip, "Port", p.Port, "VID", p.ValidatorID, "Key", str2)
count++
return true
} else {
return false
}
})
consensus.Log.Debug("Validators", "#", count)
}

@ -14,6 +14,7 @@ USAGE: $ME [OPTIONS] config_file_name
-d enable db support (default: $DB) -d enable db support (default: $DB)
-t toggle txgen (default: $TXGEN) -t toggle txgen (default: $TXGEN)
-D duration txgen run duration (default: $DURATION) -D duration txgen run duration (default: $DURATION)
-m min_peers minimal number of peers to start consensus (default: $MIN)
This script will build all the binaries and start benchmark and txgen based on the configuration file. This script will build all the binaries and start benchmark and txgen based on the configuration file.
@ -29,15 +30,17 @@ EOU
PEER= PEER=
DB= DB=
TXGEN=true TXGEN=true
DURATION=60 DURATION=90
MIN=5
while getopts "hpdtD:" option; do while getopts "hpdtD:m:" option; do
case $option in case $option in
h) usage ;; h) usage ;;
p) PEER='-peer_discovery' ;; p) PEER='-peer_discovery' ;;
d) DB='-db_supported' ;; d) DB='-db_supported' ;;
t) TXGEN=false ;; t) TXGEN=false ;;
D) DURATION=$OPTARG ;; D) DURATION=$OPTARG ;;
m) MIN=$OPTARG ;;
esac esac
done done
@ -70,7 +73,7 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardID <<< $line IFS=' ' read ip port mode shardID <<< $line
#echo $ip $port $mode #echo $ip $port $mode
if [ "$mode" != "client" ]; then if [ "$mode" != "client" ]; then
./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder $DB $PEER & ./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder $DB $PEER -min_peers $MIN &
fi fi
done < $config done < $config

@ -27,8 +27,6 @@ import (
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
proto_identity "github.com/harmony-one/harmony/proto/identity" proto_identity "github.com/harmony-one/harmony/proto/identity"
proto_node "github.com/harmony-one/harmony/proto/node" proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/jinzhu/copier"
) )
type NodeState byte type NodeState byte
@ -262,9 +260,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
key := fmt.Sprintf("%v", p.PubKey) key := fmt.Sprintf("%v", p.PubKey)
_, ok := node.Neighbors.Load(key) _, ok := node.Neighbors.Load(key)
if !ok { if !ok {
np := new(p2p.Peer) node.Neighbors.Store(key, p)
copier.Copy(np, &p)
node.Neighbors.Store(key, *np)
count++ count++
} }
} }

@ -186,10 +186,8 @@ func (node *Node) NodeHandler(conn net.Conn) {
os.Exit(0) os.Exit(0)
} }
case proto_node.PING: case proto_node.PING:
node.log.Info("NET: received message: PING")
node.pingMessageHandler(msgPayload) node.pingMessageHandler(msgPayload)
case proto_node.PONG: case proto_node.PONG:
node.log.Info("NET: received message: PONG")
node.pongMessageHandler(msgPayload) node.pongMessageHandler(msgPayload)
} }
case proto.Client: case proto.Client:
@ -317,7 +315,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
select { select {
case <-readySignal: case <-readySignal:
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up. time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up.
case <-time.After(100 * time.Second): case <-time.After(200 * time.Second):
retry = true retry = true
node.Consensus.ResetState() node.Consensus.ResetState()
timeoutCount++ timeoutCount++
@ -373,7 +371,7 @@ func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) {
select { select {
case <-readySignal: case <-readySignal:
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up. time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up.
case <-time.After(100 * time.Second): case <-time.After(200 * time.Second):
retry = true retry = true
node.Consensus.ResetState() node.Consensus.ResetState()
timeoutCount++ timeoutCount++
@ -518,55 +516,46 @@ func (node *Node) UpdateUtxoAndState(newBlock *blockchain.Block) {
} }
} }
func (node *Node) pingMessageHandler(msgPayload []byte) { func (node *Node) pingMessageHandler(msgPayload []byte) int {
ping, err := proto_node.GetPingMessage(msgPayload) ping, err := proto_node.GetPingMessage(msgPayload)
if err != nil { if err != nil {
node.log.Error("Can't get Ping Message") node.log.Error("Can't get Ping Message")
return return -1
} }
node.log.Info("Ping", "Msg", ping) // node.log.Info("Ping", "Msg", ping)
peer := new(p2p.Peer) peer := new(p2p.Peer)
peer.Ip = ping.Node.IP peer.Ip = ping.Node.IP
peer.Port = ping.Node.Port peer.Port = ping.Node.Port
peer.ValidatorID = ping.Node.ValidatorID
peer.PubKey = hmy_crypto.Ed25519Curve.Point() peer.PubKey = hmy_crypto.Ed25519Curve.Point()
err = peer.PubKey.UnmarshalBinary(ping.Node.PubKey[:]) err = peer.PubKey.UnmarshalBinary(ping.Node.PubKey[:])
if err != nil { if err != nil {
node.log.Error("UnmarshalBinary Failed", "error", err) node.log.Error("UnmarshalBinary Failed", "error", err)
return return -1
} }
// Add to Node's peer list // Add to Node's peer list
count := node.AddPeers([]p2p.Peer{*peer}) node.AddPeers([]p2p.Peer{*peer})
// Send a Pong message back // Send a Pong message back
peers := make([]p2p.Peer, 0) peers := node.Consensus.GetValidatorPeers()
count = 0
node.Neighbors.Range(func(k, v interface{}) bool {
if p, ok := v.(p2p.Peer); ok {
peers = append(peers, p)
count++
return true
} else {
return false
}
})
pong := proto_node.NewPongMessage(peers) pong := proto_node.NewPongMessage(peers)
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
p2p.SendMessage(*peer, buffer) for _, p := range peers {
p2p.SendMessage(p, buffer)
// TODO: broadcast pong messages to all neighbors }
return return len(peers)
} }
func (node *Node) pongMessageHandler(msgPayload []byte) { func (node *Node) pongMessageHandler(msgPayload []byte) int {
pong, err := proto_node.GetPongMessage(msgPayload) pong, err := proto_node.GetPongMessage(msgPayload)
if err != nil { if err != nil {
node.log.Error("Can't get Pong Message") node.log.Error("Can't get Pong Message")
return return -1
} }
// node.log.Info("Pong", "Msg", pong) // node.log.Info("Pong", "Msg", pong)
node.State = NodeJoinedShard node.State = NodeJoinedShard
@ -577,6 +566,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) {
peer := new(p2p.Peer) peer := new(p2p.Peer)
peer.Ip = p.IP peer.Ip = p.IP
peer.Port = p.Port peer.Port = p.Port
peer.ValidatorID = p.ValidatorID
peer.PubKey = hmy_crypto.Ed25519Curve.Point() peer.PubKey = hmy_crypto.Ed25519Curve.Point()
err = peer.PubKey.UnmarshalBinary(p.PubKey[:]) err = peer.PubKey.UnmarshalBinary(p.PubKey[:])
@ -587,8 +577,5 @@ func (node *Node) pongMessageHandler(msgPayload []byte) {
peers = append(peers, *peer) peers = append(peers, *peer)
} }
node.AddPeers(peers) return node.AddPeers(peers)
// TODO: add public key to consensus.pubkeys
return
} }

@ -19,7 +19,7 @@ type Peer struct {
Port string // Port number of the peer Port string // Port number of the peer
PubKey kyber.Point // Public key of the peer PubKey kyber.Point // Public key of the peer
Ready bool // Ready is true if the peer is ready to join consensus. Ready bool // Ready is true if the peer is ready to join consensus.
ValidatorID int ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard
// TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. // TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available.
} }

@ -17,10 +17,10 @@ var (
p1 = p2p.Peer{ p1 = p2p.Peer{
Ip: "127.0.0.1", Ip: "127.0.0.1",
Port: "9999", Port: "9999",
ValidatorID: 8888, ValidatorID: -1,
PubKey: pubKey1, PubKey: pubKey1,
} }
e1 = "ping:1=>127.0.0.1:9999:8888/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]" e1 = "ping:1=>127.0.0.1:9999:-1/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]"
priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999)) priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999))
pubKey2 = pki.GetPublicKeyFromScalar(priKey2) pubKey2 = pki.GetPublicKeyFromScalar(priKey2)
@ -31,14 +31,14 @@ var (
Port: "8888", Port: "8888",
PubKey: pubKey1, PubKey: pubKey1,
Ready: true, Ready: true,
ValidatorID: 1, ValidatorID: -1,
}, },
{ {
Ip: "127.0.0.1", Ip: "127.0.0.1",
Port: "9999", Port: "9999",
PubKey: pubKey2, PubKey: pubKey2,
Ready: false, Ready: false,
ValidatorID: 2, ValidatorID: -2,
}, },
} }
e2 = "pong:1=>length:2" e2 = "pong:1=>length:2"

Loading…
Cancel
Save