diff --git a/consensus/consensus.go b/consensus/consensus.go index 75104ade4..8acfa034f 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -35,6 +35,9 @@ type Consensus struct { responses *map[uint16]kyber.Scalar finalResponses *map[uint16]kyber.Scalar // map of nodeID to validator Peer object + // FIXME: should use PubKey of p2p.Peer as the hashkey + // However, we have assumed uint16 in consensus/consensus_leader.go:136 + // we won't change it now validators map[uint16]p2p.Peer // Leader leader p2p.Peer @@ -214,3 +217,15 @@ func (consensus *Consensus) String() string { return fmt.Sprintf("[duty:%s, priKey:%s, ShardID:%v, nodeID:%v, state:%s]", duty, consensus.priKey.String(), consensus.ShardID, consensus.nodeID, consensus.state) } + +func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { + count := 0 + for _, peer := range peers { + _, ok := consensus.validators[utils.GetUniqueIdFromPeer(peer)] + if !ok { + consensus.validators[utils.GetUniqueIdFromPeer(peer)] = peer + count++ + } + } + return count +} diff --git a/node/node.go b/node/node.go index 986a1d18c..adc693acc 100644 --- a/node/node.go +++ b/node/node.go @@ -17,6 +17,8 @@ import ( "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/p2p" proto_identity "github.com/harmony-one/harmony/proto/identity" + + "github.com/jinzhu/copier" ) type NetworkNode struct { @@ -44,10 +46,9 @@ type Node struct { IsWaiting bool SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. IDCPeer p2p.Peer - SyncNode bool // TODO(minhdoan): Remove it later. - - // Account Model - chain *core.BlockChain + SyncNode bool // TODO(minhdoan): Remove it later. + chain *core.BlockChain // Account Model + Neighbors map[string]*p2p.Peer // All the neighbor nodes, key is the sha256 of Peer IP/Port } // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client @@ -85,7 +86,7 @@ func (node *Node) StartServer(port string) { // Disable this temporarily. // node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers()) } - fmt.Println("going to start server") + fmt.Println("going to start server on port:", port) //node.log.Debug("Starting server", "node", node, "port", port) node.listenOnPort(port) } @@ -200,6 +201,29 @@ func New(consensus *consensus.Consensus, db *db.LDBDatabase) *Node { } // Logger node.log = log.New() + node.Neighbors = make(map[string]*p2p.Peer) return &node } + +// Add neighbors nodes +func (node *Node) AddPeers(peers []p2p.Peer) int { + count := 0 + for _, p := range peers { + key := fmt.Sprintf("%v", p.PubKey) + _, ok := node.Neighbors[key] + if !ok { + np := new(p2p.Peer) + copier.Copy(np, &p) + node.Neighbors[key] = np + count++ + } + } + node.log.Info("Added", "# of peers", count) + + if count > 0 { + c := node.Consensus.AddPeers(peers) + node.log.Info("Added in Consensus", "# of peers", c) + } + return count +} diff --git a/node/node_handler.go b/node/node_handler.go index a83e2524d..bd7ab6e84 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -178,6 +178,12 @@ func (node *Node) NodeHandler(conn net.Conn) { os.Exit(0) } + case proto_node.PING: + node.log.Info("NET: received message: PING") + node.pingMessageHandler(msgPayload) + case proto_node.PONG: + node.log.Info("NET: received message: PONG") + node.pongMessageHandler(msgPayload) } case proto.CLIENT: actionType := client.ClientMessageType(msgType) @@ -188,6 +194,8 @@ func (node *Node) NodeHandler(conn net.Conn) { node.Client.TransactionMessageHandler(msgPayload) } } + default: + node.log.Error("Unknown", "MsgCateory:", msgCategory) } } @@ -442,3 +450,23 @@ func (node *Node) UpdateUtxoAndState(newBlock *blockchain.Block) { node.log.Info("LEADER LOCKED UTXO", "num", node.UtxoPool.CountNumOfLockedUtxos(), "ShardID", node.UtxoPool.ShardID) } } + +func (node *Node) pingMessageHandler(msgPayload []byte) { + ping, err := proto_node.GetPingMessage(msgPayload) + if err != nil { + node.log.Error("Can't get Ping Message") + return + } + node.log.Info("Ping", "Msg", ping) + return +} + +func (node *Node) pongMessageHandler(msgPayload []byte) { + pong, err := proto_node.GetPongMessage(msgPayload) + if err != nil { + node.log.Error("Can't get Pong Message") + return + } + node.log.Info("Pong", "Msg", pong) + return +} diff --git a/node/node_test.go b/node/node_test.go index 7dad8db4c..e637131b5 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -1,10 +1,18 @@ package node import ( + "fmt" + "os" "testing" + "time" + + "github.com/harmony-one/harmony/crypto" + "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/p2p" + + proto_node "github.com/harmony-one/harmony/proto/node" ) func TestNewNewNode(test *testing.T) { @@ -45,3 +53,110 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) { test.Error("Count of transactions in the blockchain is incorrect") } } + +func TestAddPeers(test *testing.T) { + priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333)) + pubKey1 := pki.GetPublicKeyFromScalar(priKey1) + + priKey2 := crypto.Ed25519Curve.Scalar().SetInt64(int64(999)) + pubKey2 := pki.GetPublicKeyFromScalar(priKey2) + + peers1 := []p2p.Peer{ + { + Ip: "127.0.0.1", + Port: "8888", + PubKey: pubKey1, + Ready: true, + ValidatorID: 1, + }, + { + Ip: "127.0.0.1", + Port: "9999", + PubKey: pubKey2, + Ready: false, + ValidatorID: 2, + }, + } + leader := p2p.Peer{Ip: "1", Port: "2"} + validator := p2p.Peer{Ip: "3", Port: "5"} + consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + + node := New(consensus, nil) + r1 := node.AddPeers(peers1) + e1 := 2 + if r1 != e1 { + test.Errorf("Add %v peers, expectd %v", r1, e1) + } + r2 := node.AddPeers(peers1) + e2 := 0 + if r2 != e2 { + test.Errorf("Add %v peers, expectd %v", r2, e2) + } +} + +func sendPingMessage(leader p2p.Peer) { + priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333)) + pubKey1 := pki.GetPublicKeyFromScalar(priKey1) + + p1 := p2p.Peer{ + Ip: "127.0.0.1", + Port: "9999", + PubKey: pubKey1, + } + + ping1 := proto_node.NewPingMessage(p1) + buf1 := ping1.ConstructPingMessage() + + fmt.Println("waiting for 5 seconds ...") + time.Sleep(5 * time.Second) + + p2p.SendMessage(leader, buf1) + fmt.Println("sent ping message ...") +} + +func sendPongMessage(leader p2p.Peer) { + priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333)) + pubKey1 := pki.GetPublicKeyFromScalar(priKey1) + p1 := p2p.Peer{ + Ip: "127.0.0.1", + Port: "9998", + PubKey: pubKey1, + } + priKey2 := crypto.Ed25519Curve.Scalar().SetInt64(int64(999)) + pubKey2 := pki.GetPublicKeyFromScalar(priKey2) + p2 := p2p.Peer{ + Ip: "127.0.0.1", + Port: "9999", + PubKey: pubKey2, + } + + pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2}) + buf1 := pong1.ConstructPongMessage() + + fmt.Println("waiting for 10 seconds ...") + time.Sleep(10 * time.Second) + + p2p.SendMessage(leader, buf1) + fmt.Println("sent pong message ...") +} + +func exitServer() { + fmt.Println("wait 15 seconds to terminate the process ...") + time.Sleep(15 * time.Second) + + os.Exit(0) +} + +func TestPingPongHandler(test *testing.T) { + leader := p2p.Peer{Ip: "127.0.0.1", Port: "8881"} + validator := p2p.Peer{Ip: "127.0.0.1", Port: "9991"} + consensus := consensus.NewConsensus("127.0.0.1", "8881", "0", []p2p.Peer{leader, validator}, leader) + + node := New(consensus, nil) + + go sendPingMessage(leader) + go sendPongMessage(leader) + go exitServer() + + node.StartServer("8881") +} diff --git a/proto/node/node.go b/proto/node/node.go index 4928134d7..6ce5f462d 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -13,6 +13,10 @@ import ( // NodeMessageType is to indicate the specific type of message under NODE category type NodeMessageType byte +const ( + PROTOCOL_VERSION = 1 +) + const ( Transaction NodeMessageType = iota BLOCK @@ -20,7 +24,7 @@ const ( CONTROL BlockchainSync PING // node send ip/pki to register with leader - PONG // leader + PONG // node broadcast pubK // TODO: add more types ) diff --git a/proto/node/pingpong.go b/proto/node/pingpong.go index 29c05a72d..af0dbddff 100644 --- a/proto/node/pingpong.go +++ b/proto/node/pingpong.go @@ -4,11 +4,7 @@ Package proto/node implements the communication protocol among nodes. pingpong.go adds support of ping/pong messages. ping: from node to peers, sending IP/Port/PubKey info -TODO: add protocol version support - pong: peer responds to ping messages, sending all pubkeys known by peer -TODO: -* add the version of the protocol */ @@ -18,25 +14,66 @@ import ( "bytes" "encoding/gob" "fmt" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/proto" "log" ) -type PingMessageType struct { +type nodeInfo struct { IP string Port string PubKey string } +type PingMessageType struct { + Version uint16 // version of the protocol + Node nodeInfo +} + type PongMessageType struct { - PubKeys []string + Version uint16 // version of the protocol + Peers []nodeInfo } func (p PingMessageType) String() string { - return fmt.Sprintf("%v:%v/%v", p.IP, p.Port, p.PubKey) + return fmt.Sprintf("%v=>%v:%v/%v", p.Version, p.Node.IP, p.Node.Port, p.Node.PubKey) } func (p PongMessageType) String() string { - return fmt.Sprintf("# Keys: %v", len(p.PubKeys)) + str := fmt.Sprintf("%v=># Peers: %v", p.Version, len(p.Peers)) + for _, p := range p.Peers { + str = fmt.Sprintf("%v\n%v:%v/%v", str, p.IP, p.Port, p.PubKey) + } + return str +} + +func NewPingMessage(peer p2p.Peer) *PingMessageType { + ping := new(PingMessageType) + + ping.Version = PROTOCOL_VERSION + ping.Node.IP = peer.Ip + ping.Node.Port = peer.Port + ping.Node.PubKey = fmt.Sprintf("%v", peer.PubKey) + + return ping +} + +func NewPongMessage(peers []p2p.Peer) *PongMessageType { + pong := new(PongMessageType) + + pong.Version = PROTOCOL_VERSION + pong.Peers = make([]nodeInfo, 0) + + for _, p := range peers { + n := nodeInfo{} + n.IP = p.Ip + n.Port = p.Port + n.PubKey = fmt.Sprintf("%v", p.PubKey) + + pong.Peers = append(pong.Peers, n) + } + + return pong } // Deserialize Ping Message @@ -71,8 +108,10 @@ func GetPongMessage(payload []byte) (*PongMessageType, error) { // ConstructPingMessage contructs ping message from node to leader func (ping PingMessageType) ConstructPingMessage() []byte { - var byteBuffer bytes.Buffer - encoder := gob.NewEncoder(&byteBuffer) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer.WriteByte(byte(PING)) + + encoder := gob.NewEncoder(byteBuffer) err := encoder.Encode(ping) if err != nil { log.Panic("Can't serialize Ping message", "error:", err) @@ -83,8 +122,10 @@ func (ping PingMessageType) ConstructPingMessage() []byte { // ConstructPongMessage contructs pong message from leader to node func (pong PongMessageType) ConstructPongMessage() []byte { - var byteBuffer bytes.Buffer - encoder := gob.NewEncoder(&byteBuffer) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer.WriteByte(byte(PONG)) + + encoder := gob.NewEncoder(byteBuffer) err := encoder.Encode(pong) if err != nil { log.Panic("Can't serialize Pong message", "error:", err) diff --git a/proto/node/pingpong_test.go b/proto/node/pingpong_test.go index 78d8a700c..1d6e71294 100644 --- a/proto/node/pingpong_test.go +++ b/proto/node/pingpong_test.go @@ -4,58 +4,91 @@ import ( "fmt" "strings" "testing" + + "github.com/harmony-one/harmony/crypto" + "github.com/harmony-one/harmony/crypto/pki" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/proto" ) var ( - p1 = PingMessageType{"127.0.0.1", "9999", "0x12345678901234567890"} - e1 = "127.0.0.1:9999/0x12345678901234567890" + priKey1 = crypto.Ed25519Curve.Scalar().SetInt64(int64(333)) + pubKey1 = pki.GetPublicKeyFromScalar(priKey1) + p1 = p2p.Peer{ + Ip: "127.0.0.1", + Port: "9999", + PubKey: pubKey1, + } + e1 = "1=>127.0.0.1:9999/5ad91c4440d3a0e83df49ff4a0243da1edf2ec2d9376ed58ea7ac6bc9d745ae4" - p2 = PongMessageType{ - []string{ - "0x1111111111111", - "0x2222222222222", - "0x3333333333333", + priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999)) + pubKey2 = pki.GetPublicKeyFromScalar(priKey2) + + p2 = []p2p.Peer{ + { + Ip: "127.0.0.1", + Port: "8888", + PubKey: pubKey1, + Ready: true, + ValidatorID: 1, + }, + { + Ip: "127.0.0.1", + Port: "9999", + PubKey: pubKey2, + Ready: false, + ValidatorID: 2, }, } - e2 = "# Keys: 3" + e2 = "1=># Peers: 2" buf1 []byte buf2 []byte ) func TestString(test *testing.T) { - r1 := fmt.Sprintf("%v", p1) + ping1 := NewPingMessage(p1) + + r1 := fmt.Sprintf("%v", *ping1) if strings.Compare(r1, e1) != 0 { test.Errorf("expect: %v, got: %v", e1, r1) } else { - fmt.Printf("Ping:%v\n", p1) + fmt.Printf("Ping:%v\n", r1) } - r2 := fmt.Sprintf("%v", p2) + pong1 := NewPongMessage(p2) + r2 := fmt.Sprintf("%v", *pong1) - if strings.Compare(r2, e2) != 0 { + if !strings.HasPrefix(r2, e2) { test.Errorf("expect: %v, got: %v", e2, r2) } else { - fmt.Printf("Pong:%v\n", p2) + fmt.Printf("Pong:%v\n", r2) } } func TestSerialize(test *testing.T) { - buf1 = p1.ConstructPingMessage() - fmt.Printf("buf: %v\n", buf1) + ping1 := NewPingMessage(p1) + buf1 = ping1.ConstructPingMessage() + fmt.Printf("buf ping: %v\n", buf1) - buf2 = p2.ConstructPongMessage() - fmt.Printf("buf: %v\n", buf2) + pong1 := NewPongMessage(p2) + buf2 = pong1.ConstructPongMessage() + fmt.Printf("buf pong: %v\n", buf2) } func TestDeserialize(test *testing.T) { - ping, err := GetPingMessage(buf1) + msg1, err := proto.GetMessagePayload(buf1) + if err != nil { + test.Error("GetMessagePayload Failed!") + } + ping, err := GetPingMessage(msg1) if err != nil { test.Error("Ping failed!") } fmt.Printf("Ping:%v\n", ping) - pong, err := GetPongMessage(buf2) + msg2, err := proto.GetMessagePayload(buf2) + pong, err := GetPongMessage(msg2) if err != nil { test.Error("Pong failed!") }