diff --git a/.gitignore b/.gitignore index 0b5b3c5aa..136d787a0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ # IdeaIDE .idea + +#VsIDE .vscode # Executables @@ -11,4 +13,4 @@ *.hex # Mac -.DS_Store +.DS_Store diff --git a/benchmark_node.go b/benchmark_node.go index e8a28b635..d4e32f045 100644 --- a/benchmark_node.go +++ b/benchmark_node.go @@ -27,7 +27,6 @@ func startServer(port string, handler func(net.Conn, *consensus.Consensus), cons log.Fatalf("Socket listen port %s failed,%s", port, err) os.Exit(1) } - log.Printf("Begin listen port: %s", port) for { conn, err := listen.Accept() if err != nil { @@ -115,10 +114,11 @@ func NodeHandler(conn net.Conn, consensus *consensus.Consensus) { payload, err := p2p.ReadMessagePayload(conn) if err != nil { if consensus.IsLeader { - log.Fatalf("[Leader] Receive data failed:%s", err) + log.Printf("[Leader] Receive data failed:%s", err) } else { - log.Fatalf("[Slave] Receive data failed:%s", err) + log.Printf("[Slave] Receive data failed:%s", err) } + return } if consensus.IsLeader { consensus.ProcessMessageLeader(payload) @@ -164,7 +164,6 @@ func main() { port := flag.String("port", "9000", "port of the node.") ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses") flag.Parse() - log.Println() consensusObj := consensus.InitConsensus(*ip, *port, getPeers(*ip, *port, *ipfile), getLeader(*ipfile)) var nodeStatus string @@ -173,8 +172,9 @@ func main() { } else { nodeStatus = "validator" } - log.Println(consensusObj) - log.Printf("This node is a %s node with ip: %s and port: %s\n", nodeStatus, *ip, *port) - log.Println() + + log.Println("======================================") + log.Printf("This node is a %s node listening on ip: %s and port: %s\n", nodeStatus, *ip, *port) + log.Println("======================================") startServer(*port, NodeHandler, &consensusObj) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 210649378..0a4624a65 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -75,3 +75,11 @@ func InitConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus consensus.priKey = ip + ":" + port // use ip:port as unique key for now return consensus } + + +// Reset the state of the consensus +func (consensus *Consensus) ResetState() { + consensus.state = READY + consensus.commits = make(map[string]string) + consensus.responses = make(map[string]string) +} \ No newline at end of file diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 481f51297..effa64ab5 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -48,58 +48,66 @@ func (consensus *Consensus) startConsensus(msg string) { // prepare message and broadcast to validators msgToSend := ConstructConsensusMessage(ANNOUNCE, []byte("block")) - p2p.BroadcastMessage(consensus.validators, msgToSend) // Set state to ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE + p2p.BroadcastMessage(consensus.validators, msgToSend) } func (consensus *Consensus) processCommitMessage(msg string) { // proceed only when the message is not received before and this consensus phase is not done. - if _, ok := consensus.commits[msg]; !ok && consensus.state != CHALLENGE_DONE { - mutex.Lock() + mutex.Lock() + _, ok := consensus.commits[msg] + shouldProcess := !ok && consensus.state == ANNOUNCE_DONE + if shouldProcess { consensus.commits[msg] = msg log.Printf("Number of commits received: %d", len(consensus.commits)) - mutex.Unlock() - } else { + } + mutex.Unlock() + + if !shouldProcess { return } - if consensus.state != CHALLENGE_DONE && len(consensus.commits) >= (2*len(consensus.validators))/3+1 { - mutex.Lock() + mutex.Lock() + if len(consensus.commits) >= (2*len(consensus.validators))/3+1 { + log.Printf("Enough commits received with %d signatures: %s", len(consensus.commits), consensus.commits) if consensus.state == ANNOUNCE_DONE { // Set state to CHALLENGE_DONE consensus.state = CHALLENGE_DONE } - mutex.Unlock() // Broadcast challenge msgToSend := ConstructConsensusMessage(CHALLENGE, []byte("challenge")) p2p.BroadcastMessage(consensus.validators, msgToSend) - - log.Printf("Enough commits received with %d signatures: %s", len(consensus.commits), consensus.commits) } + mutex.Unlock() } func (consensus *Consensus) processResponseMessage(msg string) { // proceed only when the message is not received before and this consensus phase is not done. - if _, ok := consensus.responses[msg]; !ok && consensus.state != FINISHED { - mutex.Lock() + mutex.Lock() + _, ok := consensus.responses[msg] + shouldProcess := !ok && consensus.state == CHALLENGE_DONE + if shouldProcess { consensus.responses[msg] = msg log.Printf("Number of responses received: %d", len(consensus.responses)) - mutex.Unlock() - } else { + } + mutex.Unlock() + + if !shouldProcess { return } - if consensus.state != FINISHED && len(consensus.responses) >= (2*len(consensus.validators))/3+1 { - mutex.Lock() + mutex.Lock() + if len(consensus.responses) >= (2*len(consensus.validators))/3+1 { + log.Printf("Consensus reached with %d signatures: %s", len(consensus.responses), consensus.responses) if consensus.state == CHALLENGE_DONE { // Set state to FINISHED consensus.state = FINISHED - log.Println("Hooray! Consensus reached!!!!!!!!!!!!!") + // TODO: do followups on the consensus + log.Printf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators)) + consensus.ResetState() } - mutex.Unlock() // TODO: composes new block and broadcast the new block to validators - - log.Printf("Consensus reached with %d signatures: %s", len(consensus.responses), consensus.responses) } + mutex.Unlock() } diff --git a/deploy.sh b/deploy.sh index 3fec9f11a..9ceeb47b3 100755 --- a/deploy.sh +++ b/deploy.sh @@ -1,6 +1,26 @@ +# System kernel setup (Mac) +# Reference: http://www.macfreek.nl/memory/Kernel_Configuration +sudo sysctl -w kern.ipc.somaxconn=1024 # Limit of number of new connections +#sudo sysctl -w kern.ipc.maxsockets=1024 # Initial number of sockets in memory +sudo sysctl -w net.inet.tcp.msl=1000 # TIME_WAIT +sudo sysctl -w net.inet.tcp.rfc1323=1 # Enable TCP window scaling +sudo sysctl -w kern.ipc.maxsockbuf=4194304 # Maximum TCP Window size +sudo sysctl -w net.inet.tcp.sendspace=131072 # Default send buffer +sudo sysctl -w net.inet.tcp.recvspace=358400 # Default receive buffer + +# The commented suffix is for linux +# Reference: https://github.com/Zilliqa/Zilliqa/blob/master/tests/Node/test_node_simple.sh +#sudo sysctl net.core.somaxconn=1024 +#sudo sysctl net.core.netdev_max_backlog=65536; +#sudo sysctl net.ipv4.tcp_tw_reuse=1; +#sudo sysctl -w net.ipv4.tcp_rmem='65536 873800 1534217728'; +#sudo sysctl -w net.ipv4.tcp_wmem='65536 873800 1534217728'; +#sudo sysctl -w net.ipv4.tcp_mem='65536 873800 1534217728'; + + ./kill_node.sh ipfile=$1 while read ip port mode; do - echo $ip $port $mode $ipfile + #echo $ip $port $mode $ipfile go run ./benchmark_node.go -ip $ip -port $port -ipfile $ipfile& done < $ipfile \ No newline at end of file diff --git a/kill_node.sh b/kill_node.sh index a3b72209f..08b90f223 100755 --- a/kill_node.sh +++ b/kill_node.sh @@ -1,5 +1,5 @@ for pid in `/bin/ps -fu $USER| grep "slave.go\|slave -port\|leader\|benchmark_node" | grep -v "grep" | awk '{print $2}'`; do - echo $pid + echo 'Killed process: '$pid kill -9 $pid done diff --git a/local_iplist.txt b/local_iplist.txt index dba63afb2..47de6810a 100644 --- a/local_iplist.txt +++ b/local_iplist.txt @@ -10,3 +10,92 @@ 127.0.0.1 9009 validator 127.0.0.1 9010 validator 127.0.0.1 9011 validator +127.0.0.1 9012 validator +127.0.0.1 9013 validator +127.0.0.1 9014 validator +127.0.0.1 9015 validator +127.0.0.1 9016 validator +127.0.0.1 9017 validator +127.0.0.1 9018 validator +127.0.0.1 9019 validator +127.0.0.1 9020 validator +127.0.0.1 9021 validator +127.0.0.1 9022 validator +127.0.0.1 9023 validator +127.0.0.1 9024 validator +127.0.0.1 9025 validator +127.0.0.1 9026 validator +127.0.0.1 9027 validator +127.0.0.1 9028 validator +127.0.0.1 9029 validator +127.0.0.1 9030 validator +127.0.0.1 9031 validator +127.0.0.1 9032 validator +127.0.0.1 9033 validator +127.0.0.1 9034 validator +127.0.0.1 9035 validator +127.0.0.1 9036 validator +127.0.0.1 9037 validator +127.0.0.1 9038 validator +127.0.0.1 9039 validator +127.0.0.1 9040 validator +127.0.0.1 9041 validator +127.0.0.1 9042 validator +127.0.0.1 9043 validator +127.0.0.1 9044 validator +127.0.0.1 9045 validator +127.0.0.1 9046 validator +127.0.0.1 9047 validator +127.0.0.1 9048 validator +127.0.0.1 9049 validator +127.0.0.1 9050 validator +127.0.0.1 9051 validator +127.0.0.1 9052 validator +127.0.0.1 9053 validator +127.0.0.1 9054 validator +127.0.0.1 9055 validator +127.0.0.1 9056 validator +127.0.0.1 9057 validator +127.0.0.1 9058 validator +127.0.0.1 9059 validator +127.0.0.1 9060 validator +127.0.0.1 9061 validator +127.0.0.1 9062 validator +127.0.0.1 9063 validator +127.0.0.1 9064 validator +127.0.0.1 9065 validator +127.0.0.1 9066 validator +127.0.0.1 9067 validator +127.0.0.1 9068 validator +127.0.0.1 9069 validator +127.0.0.1 9070 validator +127.0.0.1 9071 validator +127.0.0.1 9072 validator +127.0.0.1 9073 validator +127.0.0.1 9074 validator +127.0.0.1 9075 validator +127.0.0.1 9076 validator +127.0.0.1 9077 validator +127.0.0.1 9078 validator +127.0.0.1 9079 validator +127.0.0.1 9080 validator +127.0.0.1 9081 validator +127.0.0.1 9082 validator +127.0.0.1 9083 validator +127.0.0.1 9084 validator +127.0.0.1 9085 validator +127.0.0.1 9086 validator +127.0.0.1 9087 validator +127.0.0.1 9088 validator +127.0.0.1 9089 validator +127.0.0.1 9090 validator +127.0.0.1 9091 validator +127.0.0.1 9092 validator +127.0.0.1 9093 validator +127.0.0.1 9094 validator +127.0.0.1 9095 validator +127.0.0.1 9096 validator +127.0.0.1 9097 validator +127.0.0.1 9098 validator +127.0.0.1 9099 validator +127.0.0.1 9100 validator diff --git a/p2p/message.go b/p2p/message.go index eab1005c1..b7940ed31 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -7,6 +7,7 @@ import ( "io" "log" "net" + "time" ) /* @@ -29,35 +30,46 @@ func ReadMessagePayload(conn net.Conn) ([]byte, error) { r = bufio.NewReader(conn) ) + timeoutDuration := 1 * time.Second + conn.SetReadDeadline(time.Now().Add(timeoutDuration)) + //// Read 1 byte for messge type - msgType, err := r.ReadByte() - if err != nil { - log.Fatalf("Error reading the p2p message type field") + _, err := r.ReadByte() + switch err { + case io.EOF: + log.Printf("Error reading the p2p message type field: %s", err) + return payloadBuf.Bytes(), err + case nil: + + //log.Printf("Received p2p message type: %x\n", msgType) + default: + log.Printf("Error reading the p2p message type field: %s", err) return payloadBuf.Bytes(), err } - log.Printf("Received p2p message with type: %x", msgType) // TODO: check on msgType and take actions accordingly //// Read 4 bytes for message size fourBytes := make([]byte, 4) n, err := r.Read(fourBytes) if err != nil { - log.Fatalf("Error reading the p2p message size field") + log.Printf("Error reading the p2p message size field") return payloadBuf.Bytes(), err } else if n < len(fourBytes) { - log.Fatalf("Failed reading the p2p message size field: only read %d bytes", n) + log.Printf("Failed reading the p2p message size field: only read %d bytes", n) return payloadBuf.Bytes(), err } - log.Print(fourBytes) + //log.Print(fourBytes) // Number of bytes for the message payload bytesToRead := binary.BigEndian.Uint32(fourBytes) - log.Printf("The payload size is %d bytes.", bytesToRead) + //log.Printf("The payload size is %d bytes.", bytesToRead) //// Read the payload in chunk of 1024 bytes tmpBuf := make([]byte, 1024) ILOOP: for { + timeoutDuration := 1 * time.Second + conn.SetReadDeadline(time.Now().Add(timeoutDuration)) if bytesToRead < 1024 { // Read the last number of bytes less than 1024 tmpBuf = make([]byte, bytesToRead) diff --git a/p2p/peer.go b/p2p/peer.go index 60b5d3ed6..773296fbc 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -53,26 +53,25 @@ func ConstructP2pMessage(msgType byte, payload []byte) []byte { // SocketClient is to connect a socket given a port and send the given message. func sendWithSocketClient(ip, port string, message []byte) (res string) { - log.Printf("Sending message to ip %s and port %s\n", ip, port) + //log.Printf("Sending message to ip %s and port %s\n", ip, port) addr := strings.Join([]string{ip, port}, ":") conn, err := net.Dial("tcp", addr) if err != nil { - log.Fatalln(err) + log.Println(err) + return } defer conn.Close() conn.Write(message) - log.Printf("Sent to ip %s and port %s: %s\n", ip, port, message) + //log.Printf("Sent to ip %s and port %s: %s\n", ip, port, message) // No ack (reply) message from the receiver for now. - // TODO: think about return } // Send a message to another node with given port. func send(ip, port string, message []byte) (returnMessage string) { - returnMessage = sendWithSocketClient(ip, port, message) - log.Println(returnMessage) + sendWithSocketClient(ip, port, message) return }