Two-round consensus finished; change logging to use log package everywhere; limit the visibility of Consensus struct fields

pull/2/head
Rongjian Lan 7 years ago
parent 561185a065
commit 403508f2d1
  1. 15
      benchmark_node.go
  2. 23
      consensus/consensus.go
  3. 59
      consensus/consensus_leader.go
  4. 22
      consensus/consensus_validator.go
  5. 10
      node.go
  6. 3
      p2p/peer.go

@ -5,7 +5,6 @@ import (
"./p2p" "./p2p"
"bufio" "bufio"
"flag" "flag"
"fmt"
"log" "log"
"net" "net"
"os" "os"
@ -48,7 +47,7 @@ func relayToPorts(msg string, conn net.Conn) {
} }
count := 0 count := 0
for count < len(ports) { for count < len(ports) {
fmt.Println(<-ch) log.Println(<-ch)
count++ count++
} }
w.Write([]byte(Message)) w.Write([]byte(Message))
@ -72,7 +71,7 @@ func convertIntoInts(data string) []int {
// Do check error. // Do check error.
func checkError(err error) { func checkError(err error) {
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) log.Fatalln(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1) os.Exit(1)
} }
} }
@ -104,7 +103,7 @@ func Send(port int, message string, ch chan int) (returnMessage string) {
ip := "127.0.0.1" ip := "127.0.0.1"
returnMessage = SocketClient(ip, message, port) returnMessage = SocketClient(ip, message, port)
ch <- port ch <- port
fmt.Println(returnMessage) log.Println(returnMessage)
return return
} }
@ -164,7 +163,7 @@ func main() {
port := flag.String("port", "9000", "port of the node.") port := flag.String("port", "9000", "port of the node.")
ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses") ipfile := flag.String("ipfile", "iplist.txt", "file containing all ip addresses")
flag.Parse() flag.Parse()
fmt.Println() log.Println()
consensusObj := consensus.InitConsensus(*ip, *port, getPeers(*ip, *port, *ipfile), getLeader(*ipfile)) consensusObj := consensus.InitConsensus(*ip, *port, getPeers(*ip, *port, *ipfile), getLeader(*ipfile))
var nodeStatus string var nodeStatus string
@ -173,8 +172,8 @@ func main() {
} else { } else {
nodeStatus = "validator" nodeStatus = "validator"
} }
fmt.Println(consensusObj) log.Println(consensusObj)
fmt.Printf("This node is a %s node with ip: %s and port: %s\n", nodeStatus, *ip, *port) log.Printf("This node is a %s node with ip: %s and port: %s\n", nodeStatus, *ip, *port)
fmt.Println() log.Println()
startServer(*port, NodeHandler, &consensusObj) startServer(*port, NodeHandler, &consensusObj)
} }

@ -7,17 +7,19 @@ import (
// Consensus data containing all info related to one consensus process // Consensus data containing all info related to one consensus process
type Consensus struct { type Consensus struct {
State ConsensusState state ConsensusState
// Signatures collected from validators // Signatures collected from validators
Signatures map[string]string commits map[string]string
// Signatures collected from validators
responses map[string]string
// Actual block data to reach consensus on // Actual block data to reach consensus on
Data string data string
// List of validators // List of validators
Validators []p2p.Peer validators []p2p.Peer
// Leader // Leader
Leader p2p.Peer leader p2p.Peer
// private key of current node // private key of current node
PriKey string priKey string
// Whether I am leader. False means I am validator // Whether I am leader. False means I am validator
IsLeader bool IsLeader bool
} }
@ -65,10 +67,11 @@ func InitConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus
} else { } else {
consensus.IsLeader = false consensus.IsLeader = false
} }
consensus.Signatures = make(map[string]string) consensus.commits = make(map[string]string)
consensus.Leader = leaderPeer consensus.responses = make(map[string]string)
consensus.Validators = Peers consensus.leader = leaderPeer
consensus.validators = Peers
consensus.PriKey = ip + ":" + port // use ip:port as unique key for now consensus.priKey = ip + ":" + port // use ip:port as unique key for now
return consensus return consensus
} }

@ -2,7 +2,6 @@ package consensus
import ( import (
"log" "log"
"fmt"
"../p2p" "../p2p"
"sync" "sync"
) )
@ -22,20 +21,20 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
} }
msg := string(payload) msg := string(payload)
fmt.Printf("[Leader] Received and processing message: %s, %s\n", msgType, msg) log.Printf("[Leader] Received and processing message: %s, %s\n", msgType, msg)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
case COMMIT: case COMMIT:
consensus.processCommitMessage(msg) consensus.processCommitMessage(msg)
case CHALLENGE: case CHALLENGE:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
case RESPONSE: case RESPONSE:
consensus.processResponseMessage(msg) consensus.processResponseMessage(msg)
case START_CONSENSUS: case START_CONSENSUS:
consensus.processStartConsensusMessage(msg) consensus.processStartConsensusMessage(msg)
default: default:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
} }
} }
@ -48,42 +47,60 @@ func (consensus *Consensus) startConsensus(msg string) {
// prepare message and broadcast to validators // prepare message and broadcast to validators
msgToSend := ConstructConsensusMessage(ANNOUNCE, []byte("block")) msgToSend := ConstructConsensusMessage(ANNOUNCE, []byte("block"))
p2p.BroadcastMessage(consensus.Validators, msgToSend) p2p.BroadcastMessage(consensus.validators, msgToSend)
// Set state to ANNOUNCE_DONE // Set state to ANNOUNCE_DONE
consensus.State = ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE
} }
func (consensus *Consensus) processCommitMessage(msg string) { func (consensus *Consensus) processCommitMessage(msg string) {
// verify and aggregate all the signatures // proceed only when the message is not received before and this consensus phase is not done.
if _, ok := consensus.Signatures[msg]; !ok { if _, ok := consensus.commits[msg]; !ok && consensus.state != CHALLENGE_DONE {
mutex.Lock() mutex.Lock()
consensus.Signatures[msg] = msg consensus.commits[msg] = msg
log.Printf("Number of commits received: %d", len(consensus.commits))
mutex.Unlock() mutex.Unlock()
} else {
return
} }
log.Printf("Number of signatures received: %d", len(consensus.Signatures)) if consensus.state != CHALLENGE_DONE && len(consensus.commits) >= (2 * len(consensus.validators)) / 3 + 1 {
if consensus.State != CHALLENGE_DONE && len(consensus.Signatures) >= (2 * len(consensus.Validators)) / 3 + 1 {
mutex.Lock() mutex.Lock()
if consensus.State == ANNOUNCE_DONE { if consensus.state == ANNOUNCE_DONE {
// Set state to CHALLENGE_DONE // Set state to CHALLENGE_DONE
consensus.State = CHALLENGE_DONE consensus.state = CHALLENGE_DONE
} }
mutex.Unlock() mutex.Unlock()
// Broadcast challenge // Broadcast challenge
msgToSend := ConstructConsensusMessage(CHALLENGE, []byte("challenge")) msgToSend := ConstructConsensusMessage(CHALLENGE, []byte("challenge"))
p2p.BroadcastMessage(consensus.Validators, msgToSend) p2p.BroadcastMessage(consensus.validators, msgToSend)
log.Printf("Consensus reached with %d signatures: %s", len(consensus.Signatures), consensus.Signatures) log.Printf("Enough commits received with %d signatures: %s", len(consensus.commits), consensus.commits)
} }
} }
func (consensus *Consensus) processResponseMessage(msg string) { func (consensus *Consensus) processResponseMessage(msg string) {
// verify and aggregate all signatures // proceed only when the message is not received before and this consensus phase is not done.
if _, ok := consensus.responses[msg]; !ok && consensus.state != FINISHED {
mutex.Lock()
consensus.responses[msg] = msg
log.Printf("Number of responses received: %d", len(consensus.responses))
mutex.Unlock()
} else {
return
}
// Set state to FINISHED
consensus.State = FINISHED
if consensus.state != FINISHED && len(consensus.responses) >= (2 * len(consensus.validators)) / 3 + 1 {
mutex.Lock()
if consensus.state == CHALLENGE_DONE {
// Set state to FINISHED
consensus.state = FINISHED
log.Println("Hooray! Consensus reached!!!!!!!!!!!!!")
}
mutex.Unlock()
// TODO: composes new block and broadcast the new block to validators
log.Printf("Consensus reached with %d signatures: %s", len(consensus.responses), consensus.responses)
}
} }

@ -2,7 +2,6 @@ package consensus
import ( import (
"log" "log"
"fmt"
"../p2p" "../p2p"
) )
@ -19,18 +18,18 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
} }
msg := string(payload) msg := string(payload)
fmt.Printf("[Validator] Received and processing message: %s, %s\n", msgType, msg) log.Printf("[Validator] Received and processing message: %s, %s\n", msgType, msg)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
consensus.processAnnounceMessage(msg) consensus.processAnnounceMessage(msg)
case COMMIT: case COMMIT:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
case CHALLENGE: case CHALLENGE:
consensus.processChallengeMessage(msg) consensus.processChallengeMessage(msg)
case RESPONSE: case RESPONSE:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
default: default:
fmt.Println("Unexpected message type: %s", msgType) log.Println("Unexpected message type: %s", msgType)
} }
} }
@ -41,11 +40,11 @@ func (consensus *Consensus) processAnnounceMessage(msg string) {
// TODO: return the signature(commit) to leader // TODO: return the signature(commit) to leader
// For now, simply return the private key of this node. // For now, simply return the private key of this node.
msgToSend := ConstructConsensusMessage(COMMIT, []byte(consensus.PriKey)) msgToSend := ConstructConsensusMessage(COMMIT, []byte(consensus.priKey))
p2p.SendMessage(consensus.Leader, msgToSend) p2p.SendMessage(consensus.leader, msgToSend)
// Set state to COMMIT_DONE // Set state to COMMIT_DONE
consensus.State = COMMIT_DONE consensus.state = COMMIT_DONE
} }
@ -54,9 +53,12 @@ func (consensus *Consensus) processChallengeMessage(msg string) {
// sign the message // sign the message
// return the signature(response) to leader // TODO: return the signature(response) to leader
// For now, simply return the private key of this node.
msgToSend := ConstructConsensusMessage(RESPONSE, []byte(consensus.priKey))
p2p.SendMessage(consensus.leader, msgToSend)
// Set state to RESPONSE_DONE // Set state to RESPONSE_DONE
consensus.State = RESPONSE_DONE consensus.state = RESPONSE_DONE
} }

@ -1,7 +1,7 @@
package main package main
import ( import (
"fmt" "log"
"math/rand" "math/rand"
"time" "time"
) )
@ -32,18 +32,18 @@ func randomString(len int) string {
func (n Node) send(cin <-chan string, id int) { func (n Node) send(cin <-chan string, id int) {
for msg := range cin { for msg := range cin {
fmt.Printf("Leader has sent message %s to %d\n", msg, id) log.Printf("Leader has sent message %s to %d\n", msg, id)
} }
} }
func consume(cin <-chan string, id int) { func consume(cin <-chan string, id int) {
for msg := range cin { for msg := range cin {
fmt.Printf("Leader has sent message %s to %d\n", msg, id) log.Printf("Leader has sent message %s to %d\n", msg, id)
} }
} }
func (n Node) receive() { func (n Node) receive() {
fmt.Printf("Node: %d received message\n", n.ip) log.Printf("Node: %d received message\n", n.ip)
} }
func createNode(ip int, isLeader bool) Node { func createNode(ip int, isLeader bool) Node {
@ -85,7 +85,7 @@ func TxnGenerator(numOfTxns int, lenOfRandomString int) <-chan string {
go func() { go func() {
for i := 0; i < numOfTxns; i++ { for i := 0; i < numOfTxns; i++ {
out <- randomString(lenOfRandomString) out <- randomString(lenOfRandomString)
fmt.Printf("Transaction Number %d\n", i) log.Printf("Transaction Number %d\n", i)
//time.Sleep(2 * time.Second) //time.Sleep(2 * time.Second)
} }
close(out) close(out)

@ -3,7 +3,6 @@ package p2p
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"log" "log"
"net" "net"
"strings" "strings"
@ -74,6 +73,6 @@ func sendWithSocketClient(ip, port string, message []byte) (res string) {
// Send a message to another node with given port. // Send a message to another node with given port.
func send(ip, port string, message []byte) (returnMessage string) { func send(ip, port string, message []byte) (returnMessage string) {
returnMessage = sendWithSocketClient(ip, port, message) returnMessage = sendWithSocketClient(ip, port, message)
fmt.Println(returnMessage) log.Println(returnMessage)
return return
} }

Loading…
Cancel
Save