diff --git a/node/node.go b/node/node.go index 358c99764..cfce00b99 100644 --- a/node/node.go +++ b/node/node.go @@ -1,10 +1,11 @@ package node import ( - "github.com/simple-rules/harmony-benchmark/crypto/pki" "net" "sync" + "github.com/simple-rules/harmony-benchmark/crypto/pki" + "github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/client" "github.com/simple-rules/harmony-benchmark/consensus" @@ -67,11 +68,11 @@ func (node *Node) StartServer(port string) { func (node *Node) listenOnPort(port string) { listen, err := net.Listen("tcp4", ":"+port) - defer func() { + defer func(listen net.Listener) { if listen != nil { listen.Close() } - }() + }(listen) if err != nil { node.log.Error("Socket listen port failed", "port", port, "err", err) return diff --git a/node/node_handler.go b/node/node_handler.go index 7b42a1833..2b7df39a4 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -3,7 +3,6 @@ package node import ( "bytes" "encoding/gob" - "log" "net" "os" "strconv" @@ -82,6 +81,8 @@ func (node *Node) NodeHandler(conn net.Conn) { node.Client.UpdateBlocks(*blocks) } } + case proto_node.BLOCKCHAIN_SYNC: + node.transactionMessageHandler(msgPayload) case proto_node.CLIENT: clientMsgType := proto_node.ClientMessageType(msgPayload[0]) switch clientMsgType { @@ -155,7 +156,6 @@ func (node *Node) NodeHandler(conn net.Conn) { func (node *Node) transactionMessageHandler(msgPayload []byte) { txMessageType := proto_node.TransactionMessageType(msgPayload[0]) - log.Println(txMessageType) switch txMessageType { case proto_node.SEND: txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type diff --git a/p2p/message_reader.go b/p2p/helper.go similarity index 84% rename from p2p/message_reader.go rename to p2p/helper.go index e04e4876b..493f4831d 100644 --- a/p2p/message_reader.go +++ b/p2p/helper.go @@ -94,3 +94,23 @@ ILOOP: } return contentBuf.Bytes(), nil } + +func CreateMessage(msgType byte, data []byte) []byte { + buffer := bytes.NewBuffer([]byte{}) + + buffer.WriteByte(msgType) + + fourBytes := make([]byte, 4) + binary.BigEndian.PutUint32(fourBytes, uint32(len(data))) + buffer.Write(fourBytes) + + buffer.Write(data) + return buffer.Bytes() +} + +func SendMessageContent(conn net.Conn, data []byte) { + msgToSend := CreateMessage(byte(1), data) + w := bufio.NewWriter(conn) + w.Write(msgToSend) + w.Flush() +} diff --git a/p2p/helper_test.go b/p2p/helper_test.go new file mode 100644 index 000000000..da0062b5f --- /dev/null +++ b/p2p/helper_test.go @@ -0,0 +1,53 @@ +package p2p + +import ( + "bufio" + "net" + "testing" +) + +func setUpTestServer(times int, t *testing.T, conCreated chan bool) { + t.Parallel() + ln, _ := net.Listen("tcp", ":8081") + conCreated <- true + conn, _ := ln.Accept() + defer conn.Close() + + var ( + w = bufio.NewWriter(conn) + ) + for times > 0 { + times-- + data, err := ReadMessageContent(conn) + if err != nil { + t.Fatalf("error when ReadMessageContent %v", err) + } + data = CreateMessage(byte(1), data) + w.Write(data) + w.Flush() + } +} +func TestNewNewNode(t *testing.T) { + times := 100 + conCreated := make(chan bool) + go setUpTestServer(times, t, conCreated) + <-conCreated + + conn, _ := net.Dial("tcp", "127.0.0.1:8081") + + for times > 0 { + times-- + + myMsg := "minhdoan" + SendMessageContent(conn, []byte(myMsg)) + + data, err := ReadMessageContent(conn) + if err != nil { + t.Error("got an error when trying to receive an expected message from server.") + } + if string(data) != myMsg { + t.Error("did not receive expected message") + } + } + conn.Close() +} diff --git a/proto/node/node.go b/proto/node/node.go index c4a9e5954..ba2481461 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -3,6 +3,7 @@ package node import ( "bytes" "encoding/gob" + "github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/p2p" "github.com/simple-rules/harmony-benchmark/proto" @@ -16,6 +17,7 @@ const ( BLOCK CLIENT CONTROL + BLOCKCHAIN_SYNC // TODO: add more types )