fix merge from master

pull/2/head
Minh Doan 7 years ago
commit 5ad2c3e3f8
  1. 40
      benchmark_node.go
  2. 34
      consensus/consensus.go
  3. 71
      consensus/consensus_leader.go
  4. 22
      consensus/consensus_validator.go
  5. 10
      node.go
  6. 3
      p2p/peer.go

@ -3,7 +3,6 @@ package main
import ( import (
"bufio" "bufio"
"flag" "flag"
"fmt"
"log" "log"
"net" "net"
"os" "os"
@ -49,7 +48,7 @@ func relayToPorts(msg string, conn net.Conn) {
} }
count := 0 count := 0
for count < len(ports) { for count < len(ports) {
fmt.Println(<-ch) log.Println(<-ch)
count++ count++
} }
w.Write([]byte(Message)) w.Write([]byte(Message))
@ -73,7 +72,7 @@ func convertIntoInts(data string) []int {
// Do check error. // Do check error.
func checkError(err error) { func checkError(err error) {
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) log.Fatalln(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1) os.Exit(1)
} }
} }
@ -105,7 +104,7 @@ func Send(port int, message string, ch chan int) (returnMessage string) {
ip := "127.0.0.1" ip := "127.0.0.1"
returnMessage = SocketClient(ip, message, port) returnMessage = SocketClient(ip, message, port)
ch <- port ch <- port
fmt.Println(returnMessage) log.Println(returnMessage)
return return
} }
@ -129,24 +128,6 @@ func NodeHandler(conn net.Conn, consensus *consensus.Consensus) {
//relayToPorts(receivedMessage, conn) //relayToPorts(receivedMessage, conn)
} }
func initConsensus(ip, port, ipfile string) consensus.Consensus {
// The first Ip, port passed will be leader.
consensus := consensus.Consensus{}
peer := p2p.Peer{Port: port, Ip: ip}
Peers := getPeers(ip, port, ipfile)
leaderPeer := getLeader(ipfile)
if leaderPeer == peer {
consensus.IsLeader = true
} else {
consensus.IsLeader = false
}
consensus.Leader = leaderPeer
consensus.Validators = Peers
consensus.PriKey = ip + ":" + port // use ip:port as unique key for now
return consensus
}
func getLeader(iplist string) p2p.Peer { func getLeader(iplist string) p2p.Peer {
file, _ := os.Open(iplist) file, _ := os.Open(iplist)
fscanner := bufio.NewScanner(file) fscanner := bufio.NewScanner(file)
@ -183,16 +164,17 @@ func main() {
port := flag.String("port", "9000", "port of the node.") port := flag.String("port", "9000", "port of the node.")
ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses") ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses")
flag.Parse() flag.Parse()
fmt.Println() log.Println()
consensus := initConsensus(*ip, *port, *ipfile)
consensusObj := consensus.InitConsensus(*ip, *port, getPeers(*ip, *port, *ipfile), getLeader(*ipfile))
var nodeStatus string var nodeStatus string
if consensus.IsLeader { if consensusObj.IsLeader {
nodeStatus = "leader" nodeStatus = "leader"
} else { } else {
nodeStatus = "validator" nodeStatus = "validator"
} }
fmt.Println(consensus) log.Println(consensusObj)
fmt.Printf("This node is a %s node with ip: %s and port: %s\n", nodeStatus, *ip, *port) log.Printf("This node is a %s node with ip: %s and port: %s\n", nodeStatus, *ip, *port)
fmt.Println() log.Println()
startServer(*port, NodeHandler, &consensus) startServer(*port, NodeHandler, &consensusObj)
} }

@ -7,17 +7,19 @@ import (
// Consensus data containing all info related to one consensus process // Consensus data containing all info related to one consensus process
type Consensus struct { type Consensus struct {
State ConsensusState state ConsensusState
// Signatures collected from validators // Signatures collected from validators
Signatures []string commits map[string]string
// Signatures collected from validators
responses map[string]string
// Actual block data to reach consensus on // Actual block data to reach consensus on
Data string data string
// List of validators // List of validators
Validators []p2p.Peer validators []p2p.Peer
// Leader // Leader
Leader p2p.Peer leader p2p.Peer
// private key of current node // private key of current node
PriKey string priKey string
// Whether I am leader. False means I am validator // Whether I am leader. False means I am validator
IsLeader bool IsLeader bool
} }
@ -53,3 +55,23 @@ func (state ConsensusState) String() string {
} }
return names[state] return names[state]
} }
func InitConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus {
// The first Ip, port passed will be leader.
consensus := Consensus{}
peer := p2p.Peer{Port: port, Ip: ip}
Peers := peers
leaderPeer := leader
if leaderPeer == peer {
consensus.IsLeader = true
} else {
consensus.IsLeader = false
}
consensus.commits = make(map[string]string)
consensus.responses = make(map[string]string)
consensus.leader = leaderPeer
consensus.validators = Peers
consensus.priKey = ip + ":" + port // use ip:port as unique key for now
return consensus
}

@ -2,7 +2,6 @@ package consensus
import ( import (
"log" "log"
"fmt"
"../p2p" "../p2p"
"sync" "sync"
) )
@ -22,20 +21,20 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
} }
msg := string(payload) msg := string(payload)
fmt.Printf("[Leader] Received and processing message: %s, %s\n", msgType, msg) log.Printf("[Leader] Received and processing message: %s, %s\n", msgType, msg)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
case COMMIT: case COMMIT:
consensus.processCommitMessage(msg) consensus.processCommitMessage(msg)
case CHALLENGE: case CHALLENGE:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
case RESPONSE: case RESPONSE:
consensus.processResponseMessage(msg) consensus.processResponseMessage(msg)
case START_CONSENSUS: case START_CONSENSUS:
consensus.processStartConsensusMessage(msg) consensus.processStartConsensusMessage(msg)
default: default:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
} }
} }
@ -48,32 +47,60 @@ func (consensus *Consensus) startConsensus(msg string) {
// prepare message and broadcast to validators // prepare message and broadcast to validators
msgToSend := ConstructConsensusMessage(ANNOUNCE, []byte("block")) msgToSend := ConstructConsensusMessage(ANNOUNCE, []byte("block"))
p2p.BroadcastMessage(consensus.Validators, msgToSend) p2p.BroadcastMessage(consensus.validators, msgToSend)
// Set state to ANNOUNCE_DONE // Set state to ANNOUNCE_DONE
consensus.State = ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE
} }
func (consensus *Consensus) processCommitMessage(msg string) { func (consensus *Consensus) processCommitMessage(msg string) {
// verify and aggregate all the signatures // proceed only when the message is not received before and this consensus phase is not done.
mutex.Lock() if _, ok := consensus.commits[msg]; !ok && consensus.state != CHALLENGE_DONE {
consensus.Signatures = append(consensus.Signatures, msg) mutex.Lock()
consensus.commits[msg] = msg
// Broadcast challenge log.Printf("Number of commits received: %d", len(consensus.commits))
// Set state to CHALLENGE_DONE mutex.Unlock()
consensus.State = CHALLENGE_DONE } else {
mutex.Unlock() return
log.Printf("Number of signatures received: %d", len(consensus.Signatures))
if len(consensus.Signatures) >= (2 * len(consensus.Validators)) / 3 + 1 {
log.Printf("Consensus reached with %d signatures: %s", len(consensus.Signatures), consensus.Signatures)
} }
if consensus.state != CHALLENGE_DONE && len(consensus.commits) >= (2 * len(consensus.validators)) / 3 + 1 {
mutex.Lock()
if consensus.state == ANNOUNCE_DONE {
// Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE
}
mutex.Unlock()
// Broadcast challenge
msgToSend := ConstructConsensusMessage(CHALLENGE, []byte("challenge"))
p2p.BroadcastMessage(consensus.validators, msgToSend)
log.Printf("Enough commits received with %d signatures: %s", len(consensus.commits), consensus.commits)
}
} }
func (consensus *Consensus) processResponseMessage(msg string) { func (consensus *Consensus) processResponseMessage(msg string) {
// verify and aggregate all signatures // proceed only when the message is not received before and this consensus phase is not done.
if _, ok := consensus.responses[msg]; !ok && consensus.state != FINISHED {
mutex.Lock()
consensus.responses[msg] = msg
log.Printf("Number of responses received: %d", len(consensus.responses))
mutex.Unlock()
} else {
return
}
// Set state to FINISHED if consensus.state != FINISHED && len(consensus.responses) >= (2 * len(consensus.validators)) / 3 + 1 {
consensus.State = FINISHED mutex.Lock()
if consensus.state == CHALLENGE_DONE {
// Set state to FINISHED
consensus.state = FINISHED
log.Println("Hooray! Consensus reached!!!!!!!!!!!!!")
}
mutex.Unlock()
// TODO: composes new block and broadcast the new block to validators
log.Printf("Consensus reached with %d signatures: %s", len(consensus.responses), consensus.responses)
}
} }

@ -2,7 +2,6 @@ package consensus
import ( import (
"log" "log"
"fmt"
"../p2p" "../p2p"
) )
@ -19,18 +18,18 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
} }
msg := string(payload) msg := string(payload)
fmt.Printf("[Validator] Received and processing message: %s, %s\n", msgType, msg) log.Printf("[Validator] Received and processing message: %s, %s\n", msgType, msg)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
consensus.processAnnounceMessage(msg) consensus.processAnnounceMessage(msg)
case COMMIT: case COMMIT:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
case CHALLENGE: case CHALLENGE:
consensus.processChallengeMessage(msg) consensus.processChallengeMessage(msg)
case RESPONSE: case RESPONSE:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
default: default:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
} }
} }
@ -41,11 +40,11 @@ func (consensus *Consensus) processAnnounceMessage(msg string) {
// TODO: return the signature(commit) to leader // TODO: return the signature(commit) to leader
// For now, simply return the private key of this node. // For now, simply return the private key of this node.
msgToSend := ConstructConsensusMessage(COMMIT, []byte(consensus.PriKey)) msgToSend := ConstructConsensusMessage(COMMIT, []byte(consensus.priKey))
p2p.SendMessage(consensus.Leader, msgToSend) p2p.SendMessage(consensus.leader, msgToSend)
// Set state to COMMIT_DONE // Set state to COMMIT_DONE
consensus.State = COMMIT_DONE consensus.state = COMMIT_DONE
} }
@ -54,9 +53,12 @@ func (consensus *Consensus) processChallengeMessage(msg string) {
// sign the message // sign the message
// return the signature(response) to leader // TODO: return the signature(response) to leader
// For now, simply return the private key of this node.
msgToSend := ConstructConsensusMessage(RESPONSE, []byte(consensus.priKey))
p2p.SendMessage(consensus.leader, msgToSend)
// Set state to RESPONSE_DONE // Set state to RESPONSE_DONE
consensus.State = RESPONSE_DONE consensus.state = RESPONSE_DONE
} }

@ -1,7 +1,7 @@
package main package main
import ( import (
"fmt" "log"
"math/rand" "math/rand"
"time" "time"
) )
@ -32,18 +32,18 @@ func randomString(len int) string {
func (n Node) send(cin <-chan string, id int) { func (n Node) send(cin <-chan string, id int) {
for msg := range cin { for msg := range cin {
fmt.Printf("Leader has sent message %s to %d\n", msg, id) log.Printf("Leader has sent message %s to %d\n", msg, id)
} }
} }
func consume(cin <-chan string, id int) { func consume(cin <-chan string, id int) {
for msg := range cin { for msg := range cin {
fmt.Printf("Leader has sent message %s to %d\n", msg, id) log.Printf("Leader has sent message %s to %d\n", msg, id)
} }
} }
func (n Node) receive() { func (n Node) receive() {
fmt.Printf("Node: %d received message\n", n.ip) log.Printf("Node: %d received message\n", n.ip)
} }
func createNode(ip int, isLeader bool) Node { func createNode(ip int, isLeader bool) Node {
@ -85,7 +85,7 @@ func TxnGenerator(numOfTxns int, lenOfRandomString int) <-chan string {
go func() { go func() {
for i := 0; i < numOfTxns; i++ { for i := 0; i < numOfTxns; i++ {
out <- randomString(lenOfRandomString) out <- randomString(lenOfRandomString)
fmt.Printf("Transaction Number %d\n", i) log.Printf("Transaction Number %d\n", i)
//time.Sleep(2 * time.Second) //time.Sleep(2 * time.Second)
} }
close(out) close(out)

@ -3,7 +3,6 @@ package p2p
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"log" "log"
"net" "net"
"strings" "strings"
@ -74,6 +73,6 @@ func sendWithSocketClient(ip, port string, message []byte) (res string) {
// Send a message to another node with given port. // Send a message to another node with given port.
func send(ip, port string, message []byte) (returnMessage string) { func send(ip, port string, message []byte) (returnMessage string) {
returnMessage = sendWithSocketClient(ip, port, message) returnMessage = sendWithSocketClient(ip, port, message)
fmt.Println(returnMessage) log.Println(returnMessage)
return return
} }

Loading…
Cancel
Save