diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..429f04cdf --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +# IdeaIDE +.idea + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Mac +.DS_Store \ No newline at end of file diff --git a/AppSpec.yml b/AppSpec.yml new file mode 100644 index 000000000..5be3d729a --- /dev/null +++ b/AppSpec.yml @@ -0,0 +1,22 @@ +version: 0.0 +os: linux +files: + - source: ./ + destination: /home/ec2-user/projects/ +hooks: + BeforeInstall: + - location: aws-scripts/say_hello.sh + timeout: 300 + runas: root + AfterInstall: + - location: aws-scripts/say_hello.sh #maybe start the server + timeout: 300 + runas: root + ApplicationStart: + - location: aws-scripts/say_hello.sh #maybe start the server. + timeout: 300 + runas: root + ApplicationStop: + - location: aws-scripts/say_hello.sh + timeout: 300 + runas: root \ No newline at end of file diff --git a/README.md b/README.md index 2b5820f84..95c1dbda6 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,10 @@ # Harmony Benchmark +## Usage + +./deploy.sh ipList.txt +./send_txn.sh + ## References https://github.com/lightningnetwork/lnd diff --git a/aws-code/dial.go b/aws-code/dial.go new file mode 100644 index 000000000..aaca72471 --- /dev/null +++ b/aws-code/dial.go @@ -0,0 +1,19 @@ +package main + +import ( + "fmt" + "io/ioutil" + "net" +) + +func main() { + conn, err := net.Dial("tcp", ":9000") + if err != nil { + panic(err) + } + defer conn.Close() + + bs, _ := ioutil.ReadAll(conn) + fmt.Println(string(bs)) + +} diff --git a/aws-code/listen.go b/aws-code/listen.go new file mode 100644 index 000000000..69d786aaf --- /dev/null +++ b/aws-code/listen.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "io" + "net" + "time" +) + +func main() { + ln, err := net.Listen("tcp", "localhost:9000") + if err != nil { + panic(err) + } + defer ln.Close() + + for { + conn, err := ln.Accept() + if err != nil { + panic(err) + } + + io.WriteString(conn, fmt.Sprint("Hello World\n", time.Now(), "\n")) +conn.Close() +} +} diff --git a/aws-scripts/say_hello.sh b/aws-scripts/say_hello.sh new file mode 100644 index 000000000..0a9c27341 --- /dev/null +++ b/aws-scripts/say_hello.sh @@ -0,0 +1 @@ +echo "Hi, I am in aws-scripts and its seems to have worked." \ No newline at end of file diff --git a/aws-setup/AppSpec.yml b/aws-setup/AppSpec.yml new file mode 100644 index 000000000..b9c76b877 --- /dev/null +++ b/aws-setup/AppSpec.yml @@ -0,0 +1,22 @@ +version: 0.0 +os: linux +files: + - source: ../ + destination: /projects/ +hooks: + BeforeInstall: + - location: aws-scripts/say_hello.sh + timeout: 300 + runas: root + AfterInstall: + - location: aws-scripts/say_hello.sh #maybe start the server + timeout: 300 + runas: root + ApplicationStart: + - location: aws-scripts/say_hello.sh #maybe start the server. + timeout: 300 + runas: root + ApplicationStop: + - location: aws-scripts/say_hello.sh + timeout: 300 + runas: root \ No newline at end of file diff --git a/aws-setup/copy_code.sh b/aws-setup/copy_code.sh new file mode 100644 index 000000000..2b9fc1c3d --- /dev/null +++ b/aws-setup/copy_code.sh @@ -0,0 +1 @@ +scp -i main.pem -r ~/Documents/goworkspace/src/github.com/harmony-benchmark/ ec2-user@ec2-34-219-218-239.us-west-2.compute.amazonaws.com:/home/ec2-user/projects/ diff --git a/aws-setup/parse_json.py b/aws-setup/parse_json.py new file mode 100644 index 000000000..8e8eb643b --- /dev/null +++ b/aws-setup/parse_json.py @@ -0,0 +1,34 @@ +import json + +def get_public_ip(all_reservations): + all_public_ip_addresses = [] + for individual_instances in all_reservations: + instance_information = individual_instances['Instances'][0] + if "running" != instance_information["State"]["Name"]: + continue + all_public_ip_addresses.append(instance_information['PublicIpAddress']) + return all_public_ip_addresses + +def make_peers_list(all_reservations,port="9001",filename="ipList.txt"): + p = get_public_ip(all_reservations) + f = open(filename,"w") + for i in range(len(p)): + if i == 0: + f.write(p[i] + " " + port + " " + "leader"+"\n") + else: + f.write(p[i] + " " + port + " " + "validator"+"\n") + f.close() + +def is_it_running(f): + pass + +if __name__ == "__main__": + json_data=open("aws.json").read() + f = json.loads(json_data) + all_reservations = f['Reservations'] + + make_peers_list(all_reservations) + + + + \ No newline at end of file diff --git a/aws-setup/setup_instances.sh b/aws-setup/setup_instances.sh new file mode 100755 index 000000000..56e8d6534 --- /dev/null +++ b/aws-setup/setup_instances.sh @@ -0,0 +1 @@ +aws ec2 run-instances --image-id ami-e251209a --count 1 --instance-type t2.nano --key-name main --security-group-ids sg-066a8b0ec187c7247 diff --git a/aws-setup/user-data.sh b/aws-setup/user-data.sh new file mode 100644 index 000000000..b84bfd5fb --- /dev/null +++ b/aws-setup/user-data.sh @@ -0,0 +1,9 @@ +#!/bin/bash -x +REGION=$(curl 169.254.169.254/latest/meta-data/placement/availability-zone/ | sed 's/[a-z]$//') +yum update -y +yum install ruby wget -y +cd /home/ec2-user +wget https://aws-codedeploy-$REGION.s3.amazonaws.com/latest/install +chmod +x ./install +./install auto +mkdir projects \ No newline at end of file diff --git a/benchmark_node.go b/benchmark_node.go index 75ff7eefa..0db0d28f8 100644 --- a/benchmark_node.go +++ b/benchmark_node.go @@ -1,10 +1,11 @@ package main import ( + "./consensus" + "./p2p" "bufio" "flag" "fmt" - "io" "log" "net" "os" @@ -19,57 +20,28 @@ const ( ) // Start a server and process the request by a handler. -func startServer(port int, handler func(net.Conn)) { - listen, err := net.Listen("tcp4", ":"+strconv.Itoa(port)) +func startServer(port string, handler func(net.Conn, *consensus.Consensus), consensus *consensus.Consensus) { + listen, err := net.Listen("tcp4", ":"+port) defer listen.Close() if err != nil { - log.Fatalf("Socket listen port %d failed,%s", port, err) + log.Fatalf("Socket listen port %s failed,%s", port, err) os.Exit(1) } - log.Printf("Begin listen port: %d", port) + log.Printf("Begin listen port: %s", port) for { conn, err := listen.Accept() if err != nil { + log.Printf("Error listening on port: %s. Exiting.", port) log.Fatalln(err) continue } - go handler(conn) + go handler(conn, consensus) } } -// Handler of the leader node. -func leaderHandler(conn net.Conn) { - defer conn.Close() - var ( - buf = make([]byte, 1024) - r = bufio.NewReader(conn) - w = bufio.NewWriter(conn) - ) - - receivedMessage := "" -ILOOP: - for { - n, err := r.Read(buf) - data := string(buf[:n]) - - receivedMessage += data - - switch err { - case io.EOF: - break ILOOP - case nil: - log.Println("Receive:", data) - if isTransportOver(data) { - break ILOOP - } - - default: - log.Fatalf("Receive data failed:%s", err) - return - } - } - receivedMessage = strings.TrimSpace(receivedMessage) - ports := convertIntoInts(receivedMessage) +func relayToPorts(msg string, conn net.Conn) { + w := bufio.NewWriter(conn) + ports := convertIntoInts(msg) ch := make(chan int) for i, port := range ports { go Send(port, strconv.Itoa(i), ch) @@ -82,6 +54,7 @@ ILOOP: w.Write([]byte(Message)) w.Flush() log.Printf("Send: %s", Message) + } // Helper library to convert '1,2,3,4' into []int{1,2,3,4}. @@ -125,6 +98,7 @@ func SocketClient(ip, message string, port int) (res string) { return } +//https://gist.github.com/kenshinx/5796276 // Send a message to another node with given port. func Send(port int, message string, ch chan int) (returnMessage string) { ip := "127.0.0.1" @@ -134,53 +108,90 @@ func Send(port int, message string, ch chan int) (returnMessage string) { return } -// Handler for slave node. -func slaveHandler(conn net.Conn) { +// Handler of the leader node. +func NodeHandler(conn net.Conn, consensus *consensus.Consensus) { defer conn.Close() - var ( - buf = make([]byte, 1024) - r = bufio.NewReader(conn) - w = bufio.NewWriter(conn) - ) -ILOOP: - for { - n, err := r.Read(buf) - data := string(buf[:n]) - - switch err { - case io.EOF: - break ILOOP - case nil: - log.Println("Receive:", data) - if isTransportOver(data) { - break ILOOP - } - default: - log.Fatalf("Receive data failed:%s", err) - return + payload, err := p2p.ReadMessagePayload(conn) + if err != nil { + if consensus.IsLeader { + log.Fatalf("[Leader] Receive data failed:%s", err) + } else { + log.Fatalf("[Slave] Receive data failed:%s", err) } } - w.Write([]byte(Message)) - w.Flush() - log.Printf("Send: %s", Message) + if consensus.IsLeader { + consensus.ProcessMessageLeader(payload) + } else { + consensus.ProcessMessageValidator(payload) + } + //relayToPorts(receivedMessage, conn) } -func isTransportOver(data string) (over bool) { - over = strings.HasSuffix(data, "\r\n\r\n") - return +func initConsensus(ip, port, ipfile string) consensus.Consensus { + // The first Ip, port passed will be leader. + consensus := consensus.Consensus{} + peer := p2p.Peer{Port: port, Ip: ip} + Peers := getPeers(ip, port, ipfile) + leaderPeer := getLeader(ipfile) + if leaderPeer == peer { + consensus.IsLeader = true + } else { + consensus.IsLeader = false + } + consensus.Leader = leaderPeer + consensus.Validators = Peers + + consensus.PriKey = ip + ":" + port // use ip:port as unique key for now + return consensus +} + +func getLeader(iplist string) p2p.Peer { + file, _ := os.Open(iplist) + fscanner := bufio.NewScanner(file) + var leaderPeer p2p.Peer + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + ip, port, status := p[0], p[1], p[2] + if status == "leader" { + leaderPeer.Ip = ip + leaderPeer.Port = port + } + } + return leaderPeer +} + +func getPeers(Ip, Port, iplist string) []p2p.Peer { + file, _ := os.Open(iplist) + fscanner := bufio.NewScanner(file) + var peerList []p2p.Peer + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + ip, port, status := p[0], p[1], p[2] + if status == "leader" || ip == Ip && port == Port { + continue + } + peer := p2p.Peer{Port: port, Ip: ip} + peerList = append(peerList, peer) + } + return peerList } func main() { - port := flag.Int("port", 3333, "port of the node.") - mode := flag.String("mode", "leader", "should be slave or leader") + ip := flag.String("ip", "127.0.0.1", "IP of the node") + port := flag.String("port", "9000", "port of the node.") + ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses") flag.Parse() - - if strings.ToLower(*mode) == "leader" { - // Start leader node. - startServer(*port, leaderHandler) - } else if strings.ToLower(*mode) == "slave" { - // Start slave node. - startServer(*port, slaveHandler) + fmt.Println() + consensus := initConsensus(*ip, *port, *ipfile) + var nodeStatus string + if consensus.IsLeader { + nodeStatus = "leader" + } else { + nodeStatus = "validator" } + fmt.Println(consensus) + fmt.Printf("This node is a %s node with ip: %s and port: %s\n", nodeStatus, *ip, *port) + fmt.Println() + startServer(*port, NodeHandler, &consensus) } diff --git a/consensus/consensus.go b/consensus/consensus.go new file mode 100644 index 000000000..f9898e28a --- /dev/null +++ b/consensus/consensus.go @@ -0,0 +1,55 @@ +// Consensus package implements the Cosi PBFT consensus +package consensus // consensus + +import ( + "../p2p" +) + +// Consensus data containing all info related to one consensus process +type Consensus struct { + State ConsensusState + // Signatures collected from validators + Signatures []string + // Actual block data to reach consensus on + Data string + // List of validators + Validators []p2p.Peer + // Leader + Leader p2p.Peer + // private key of current node + PriKey string + // Whether I am leader. False means I am validator + IsLeader bool +} + +// Consensus state enum for both leader and validator +// States for leader: +// READY, ANNOUNCE_DONE, CHALLENGE_DONE, FINISHED +// States for validator: +// READY, COMMIT_DONE, RESPONSE_DONE, FINISHED +type ConsensusState int + +const ( + READY ConsensusState = iota + ANNOUNCE_DONE + COMMIT_DONE + CHALLENGE_DONE + RESPONSE_DONE + FINISHED +) + +// Returns string name for the ConsensusState enum +func (state ConsensusState) String() string { + names := [...]string{ + "READY", + "ANNOUNCE_DONE", + "COMMIT_DONE", + "CHALLENGE_DONE", + "RESPONSE_DONE", + "FINISHED"} + + if state < READY || state > RESPONSE_DONE { + return "Unknown" + } + return names[state] +} diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go new file mode 100644 index 000000000..52b482a8d --- /dev/null +++ b/consensus/consensus_leader.go @@ -0,0 +1,79 @@ +package consensus + +import ( + "log" + "fmt" + "../p2p" + "sync" +) + +var mutex = &sync.Mutex{} + +// Leader's consensus message dispatcher +func (consensus *Consensus) ProcessMessageLeader(message []byte) { + msgType, err := GetConsensusMessageType(message) + if err != nil { + log.Print(err) + } + + payload, err := GetConsensusMessagePayload(message) + if err != nil { + log.Print(err) + } + + msg := string(payload) + fmt.Printf("[Leader] Received and processing message: %s, %s\n", msgType, msg) + switch msgType { + case ANNOUNCE: + fmt.Println("Unexpected message type: %s", msgType) + case COMMIT: + consensus.processCommitMessage(msg) + case CHALLENGE: + fmt.Println("Unexpected message type: %s", msgType) + case RESPONSE: + consensus.processResponseMessage(msg) + case START_CONSENSUS: + consensus.processStartConsensusMessage(msg) + default: + fmt.Println("Unexpected message type: %s", msgType) + } +} + +// Handler for message which triggers consensus process +func (consensus *Consensus) processStartConsensusMessage(msg string) { + consensus.startConsensus(msg) +} + +func (consensus *Consensus) startConsensus(msg string) { + // prepare message and broadcast to validators + + msgToSend := ConstructConsensusMessage(ANNOUNCE, []byte("block")) + p2p.BroadcastMessage(consensus.Validators, msgToSend) + // Set state to ANNOUNCE_DONE + consensus.State = ANNOUNCE_DONE +} + +func (consensus *Consensus) processCommitMessage(msg string) { + // verify and aggregate all the signatures + mutex.Lock() + consensus.Signatures = append(consensus.Signatures, msg) + + // Broadcast challenge + // Set state to CHALLENGE_DONE + consensus.State = CHALLENGE_DONE + mutex.Unlock() + + log.Printf("Number of signatures received: %d", len(consensus.Signatures)) + if len(consensus.Signatures) >= (2 * len(consensus.Validators)) / 3 + 1 { + log.Printf("Consensus reached with %d signatures: %s", len(consensus.Signatures), consensus.Signatures) + } + +} + +func (consensus *Consensus) processResponseMessage(msg string) { + // verify and aggregate all signatures + + // Set state to FINISHED + consensus.State = FINISHED + +} \ No newline at end of file diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go new file mode 100644 index 000000000..6f095e3f5 --- /dev/null +++ b/consensus/consensus_validator.go @@ -0,0 +1,62 @@ +package consensus + +import ( + "log" + "fmt" + "../p2p" +) + +// Validator's consensus message dispatcher +func (consensus *Consensus) ProcessMessageValidator(message []byte) { + msgType, err := GetConsensusMessageType(message) + if err != nil { + log.Print(err) + } + + payload, err := GetConsensusMessagePayload(message) + if err != nil { + log.Print(err) + } + + msg := string(payload) + fmt.Printf("[Validator] Received and processing message: %s, %s\n", msgType, msg) + switch msgType { + case ANNOUNCE: + consensus.processAnnounceMessage(msg) + case COMMIT: + fmt.Println("Unexpected message type: %s", msgType) + case CHALLENGE: + consensus.processChallengeMessage(msg) + case RESPONSE: + fmt.Println("Unexpected message type: %s", msgType) + default: + fmt.Println("Unexpected message type: %s", msgType) + } +} + +func (consensus *Consensus) processAnnounceMessage(msg string) { + // verify block data + + // sign block + + // TODO: return the signature(commit) to leader + // For now, simply return the private key of this node. + msgToSend := ConstructConsensusMessage(COMMIT, []byte(consensus.PriKey)) + p2p.SendMessage(consensus.Leader, msgToSend) + + // Set state to COMMIT_DONE + consensus.State = COMMIT_DONE + +} + +func (consensus *Consensus) processChallengeMessage(msg string) { + // verify block data and the aggregated signatures + + // sign the message + + // return the signature(response) to leader + + // Set state to RESPONSE_DONE + consensus.State = RESPONSE_DONE +} + diff --git a/consensus/message.go b/consensus/message.go new file mode 100644 index 000000000..8eade7afd --- /dev/null +++ b/consensus/message.go @@ -0,0 +1,74 @@ +package consensus + +import ( + "bytes" + "errors" +) + +/* +Consensus message is the payload of p2p message. +Consensus message data structure: + + +---- message start ----- +1 byte - consensus.MessageType + 0x00 - ANNOUNCE + 0x01 - COMMIT + ... +payload (n bytes) - consensus message payload (the data to run consensus with) +---- message end ----- + +*/ + +const MESSAGE_TYPE_BYTES = 1 + +// Consensus communication message type. +// Leader and validator dispatch messages based on incoming message type +type MessageType int + +const ( + ANNOUNCE MessageType = iota + COMMIT + CHALLENGE + RESPONSE + START_CONSENSUS + ERROR = -1 +) + +// Returns string name for the MessageType enum +func (msgType MessageType) String() string { + names := [...]string{ + "ANNOUNCE", + "COMMIT", + "CHALLENGE", + "RESPONSE", + "START_CONSENSUS"} + + if msgType < ANNOUNCE || msgType > START_CONSENSUS { + return "Unknown" + } + return names[msgType] +} + +// Get the consensus message type from the p2p message payload +func GetConsensusMessageType(message []byte) (MessageType, error) { + if len(message) < 1 { + return ERROR, errors.New("Failed to get consensus message type: no data available.") + } + return MessageType(message[0]), nil +} + +// Get the consensus message payload from the p2p message payload +func GetConsensusMessagePayload(message []byte) ([]byte, error) { + if len(message) < 2 { + return []byte{}, errors.New("Failed to get consensus message payload: no data available.") + } + return message[MESSAGE_TYPE_BYTES:], nil +} + +// Concatenate msgType as one byte with payload, and return the whole byte array +func ConstructConsensusMessage(msgType MessageType, payload []byte) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(msgType)}) + byteBuffer.Write(payload) + return byteBuffer.Bytes() +} diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 000000000..3fec9f11a --- /dev/null +++ b/deploy.sh @@ -0,0 +1,6 @@ +./kill_node.sh +ipfile=$1 +while read ip port mode; do + echo $ip $port $mode $ipfile + go run ./benchmark_node.go -ip $ip -port $port -ipfile $ipfile& +done < $ipfile \ No newline at end of file diff --git a/deploy_node.sh b/deploy_node.sh deleted file mode 100755 index 07e6d7955..000000000 --- a/deploy_node.sh +++ /dev/null @@ -1,8 +0,0 @@ -LEADER_PORT=3333 -SLAVE_START_PORT=3000 -SLAVE_END_PORT=3009 -for port in `seq $SLAVE_START_PORT $SLAVE_END_PORT`; -do - go run ./benchmark_node.go -mode slave -port $port & -done -go run ./benchmark_node.go -mode leader -port $LEADER_PORT & \ No newline at end of file diff --git a/ipList.txt b/ipList.txt new file mode 100644 index 000000000..dba63afb2 --- /dev/null +++ b/ipList.txt @@ -0,0 +1,12 @@ +127.0.0.1 9000 leader +127.0.0.1 9001 validator +127.0.0.1 9002 validator +127.0.0.1 9003 validator +127.0.0.1 9004 validator +127.0.0.1 9005 validator +127.0.0.1 9006 validator +127.0.0.1 9007 validator +127.0.0.1 9008 validator +127.0.0.1 9009 validator +127.0.0.1 9010 validator +127.0.0.1 9011 validator diff --git a/kill_node.sh b/kill_node.sh index ef742cee6..a3b72209f 100755 --- a/kill_node.sh +++ b/kill_node.sh @@ -1,4 +1,5 @@ -for pid in `/bin/ps -fu $USER| grep "slave.go\|slave -port\|leader" | grep -v "grep" | awk '{print $2}'`; +for pid in `/bin/ps -fu $USER| grep "slave.go\|slave -port\|leader\|benchmark_node" | grep -v "grep" | awk '{print $2}'`; do + echo $pid kill -9 $pid done diff --git a/node.go b/node.go index ee3d1555d..7665ba0da 100644 --- a/node.go +++ b/node.go @@ -85,7 +85,8 @@ func TxnGenerator(numOfTxns int, lenOfRandomString int) <-chan string { go func() { for i := 0; i < numOfTxns; i++ { out <- randomString(lenOfRandomString) - time.Sleep(2 * time.Second) + fmt.Printf("Transaction Number %d\n", i) + //time.Sleep(2 * time.Second) } close(out) }() @@ -96,7 +97,7 @@ func main() { var ( //isLeader Node numOfTxns = 1000 - numOfNodes = 10 + numOfNodes = 10 N = make([]Node, 10) lenOfRandomString = 10 node_ips = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} @@ -112,7 +113,7 @@ func main() { Txns := BufferedTxnQueueWithFanOut(txnqueue, numOfNodes) for num := range Txns { txn := Txns[num] - go consume(txn,num) - } - time.Sleep(60 * time.Second) + go consume(txn, num) + } + time.Sleep(60 * time.Second) } diff --git a/p2p/message.go b/p2p/message.go new file mode 100644 index 000000000..b2156c117 --- /dev/null +++ b/p2p/message.go @@ -0,0 +1,84 @@ +package p2p + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + "log" + "net" +) + +/* +P2p Message data structure: + +---- message start ----- +1 byte - message type + 0x00 - normal message (no need to forward) + 0x11 - p2p message (may need to forward to other neighbors) +4 bytes - message size n in number of bytes +payload (n bytes) - actual message payload +---- message end ----- + +*/ + +// Read the message type and payload size, and return the actual payload. +func ReadMessagePayload(conn net.Conn) ([]byte, error) { + var ( + payloadBuf = bytes.NewBuffer([]byte{}) + r = bufio.NewReader(conn) + ) + + //// Read 1 byte for messge type + msgType, err := r.ReadByte() + if err != nil { + log.Fatalf("Error reading the p2p message type field") + return payloadBuf.Bytes(), err + } + log.Printf("Received p2p message with type: %x", msgType) + // TODO: check on msgType and take actions accordingly + + //// Read 4 bytes for message size + fourBytes := make([]byte, 4) + n, err := r.Read(fourBytes) + if err != nil { + log.Fatalf("Error reading the p2p message size field") + return payloadBuf.Bytes(), err + } else if n < len(fourBytes) { + log.Fatalf("Failed reading the p2p message size field: only read %d bytes", n) + return payloadBuf.Bytes(), err + } + + log.Print(fourBytes) + // Number of bytes for the message payload + bytesToRead := binary.BigEndian.Uint32(fourBytes) + log.Printf("The payload size is %d bytes.", bytesToRead) + + //// Read the payload in chunk of 1024 bytes + tmpBuf := make([]byte, 1024) +ILOOP: + for { + if bytesToRead < 1024 { + // Read the last number of bytes less than 1024 + tmpBuf = make([]byte, bytesToRead) + } + n, err := r.Read(tmpBuf) + payloadBuf.Write(tmpBuf[:n]) + + switch err { + case io.EOF: + // TODO: should we return error here, or just ignore it? + log.Printf("EOF reached while reading p2p message") + break ILOOP + case nil: + bytesToRead -= uint32(n) // TODO: think about avoid the casting in every loop + if bytesToRead <= 0 { + break ILOOP + } + default: + log.Printf("Error reading p2p message") + return []byte{}, err + } + } + return payloadBuf.Bytes(), nil +} diff --git a/p2p/peer.go b/p2p/peer.go new file mode 100644 index 000000000..16a721bcb --- /dev/null +++ b/p2p/peer.go @@ -0,0 +1,79 @@ +package p2p + +import ( + "bytes" + "encoding/binary" + "fmt" + "log" + "net" + "strings" +) + +// Object for a p2p peer (node) +type Peer struct { + // Ip address of the peer + Ip string + // Port number of the peer + Port string + // Public key of the peer + PubKey string +} + +// Send the message to the peer +func SendMessage(peer Peer, msg []byte) { + // Construct normal p2p message + payload := ConstructP2pMessage(byte(0), msg) + + send(peer.Ip, peer.Port, payload) +} + +// Send the message to a list of peers +func BroadcastMessage(peers []Peer, msg []byte) { + // Construct broadcast p2p message + payload := ConstructP2pMessage(byte(17), msg) + + for _, peer := range peers { + send(peer.Ip, peer.Port, payload) + } +} + +// Construct the p2p message as [messageType, payloadSize, payload] +func ConstructP2pMessage(msgType byte, payload []byte) []byte { + + firstByte := byte(17) // messageType + sizeBytes := make([]byte, 4) // payloadSize + + binary.BigEndian.PutUint32(sizeBytes, uint32(len(payload))) + + byteBuffer := bytes.NewBuffer([]byte{}) + byteBuffer.WriteByte(firstByte) + byteBuffer.Write(sizeBytes) + byteBuffer.Write(payload) + return byteBuffer.Bytes() +} + +// SocketClient is to connect a socket given a port and send the given message. +func sendWithSocketClient(ip, port string, message []byte) (res string) { + log.Printf("Sending message to ip %s and port %s\n", ip, port) + addr := strings.Join([]string{ip, port}, ":") + conn, err := net.Dial("tcp", addr) + + if err != nil { + log.Fatalln(err) + } + defer conn.Close() + + conn.Write(message) + log.Printf("Sent to ip %s and port %s: %s\n", ip, port, message) + + // No ack (reply) message from the receiver for now. + // TODO: think about + return +} + +// Send a message to another node with given port. +func send(ip, port string, message []byte) (returnMessage string) { + returnMessage = sendWithSocketClient(ip, port, message) + fmt.Println(returnMessage) + return +} diff --git a/send_tnx.sh b/send_tnx.sh index 0b1854e5f..150891154 100755 --- a/send_tnx.sh +++ b/send_tnx.sh @@ -1,4 +1,4 @@ -for port in `seq 0 9`; -do - echo "3000 3001 3002 3003 3004 3005 3006 3007 3008 3009" | nc 127.0.0.1 3333 -done \ No newline at end of file +# the hex is basically 0, 0032, 4, "9001 9002 9003 9004 9005 9006 9007 9008 9009 9010" +# explanation: type, size(50 bytes), [START_CONSENSUS, payload] +# START_CONSENSUS + payload = 50 bytes +echo -e '\x00\x00\x00\x00\x32\x04\x39\x30\x30\x31\x20\x39\x30\x30\x32\x20\x39\x30\x30\x33\x20\x39\x30\x30\x34\x20\x39\x30\x30\x35\x20\x39\x30\x30\x36\x20\x39\x30\x30\x37\x20\x39\x30\x30\x38\x20\x39\x30\x30\x39\x20\x39\x30\x31\x30' | nc 127.0.0.1 9000