updated log functions

pull/5/head
Richard Liu 7 years ago
parent 12cc01da3c
commit a3663106d3
  1. 31
      consensus/consensus.go
  2. 18
      consensus/consensus_leader.go
  3. 8
      consensus/consensus_validator.go
  4. 47
      node/node.go

@ -2,11 +2,12 @@
package consensus // consensus package consensus // consensus
import ( import (
"fmt"
"harmony-benchmark/message"
"harmony-benchmark/p2p" "harmony-benchmark/p2p"
"regexp"
"log" "log"
"regexp"
"strconv" "strconv"
"harmony-benchmark/message"
) )
// Consensus data containing all info related to one consensus process // Consensus data containing all info related to one consensus process
@ -40,7 +41,7 @@ type Consensus struct {
//// Network related fields //// Network related fields
msgCategory byte msgCategory byte
actionType byte actionType byte
} }
// Consensus state enum for both leader and validator // Consensus state enum for both leader and validator
@ -119,10 +120,30 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus
return consensus return consensus
} }
// Reset the state of the consensus // Reset the state of the consensus
func (consensus *Consensus) ResetState() { func (consensus *Consensus) ResetState() {
consensus.state = READY consensus.state = READY
consensus.commits = make(map[string]string) consensus.commits = make(map[string]string)
consensus.responses = make(map[string]string) consensus.responses = make(map[string]string)
} }
// Returns ID of this consensus
func (consensus *Consensus) GetIdentityString() string {
var duty string
if consensus.IsLeader {
duty = "LDR" // leader
} else {
duty = "SLV" // slave
}
return fmt.Sprintf("[%s, %s, %v]", duty, consensus.priKey, consensus.nodeId)
}
// Prints log with ID of this consensus
func (consensus *Consensus) Logln(v ...interface{}) {
log.Printf("%s %s", consensus.GetIdentityString(), fmt.Sprintln(v...))
}
// Prints formatted log with ID of this consensus
func (consensus *Consensus) Logf(format string, v ...interface{}) {
log.Printf("%s %s", consensus.GetIdentityString(), fmt.Sprintf(format, v...))
}

@ -36,20 +36,20 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
log.Print(err) log.Print(err)
} }
log.Printf("[Leader] Received and processing message: %s\n", msgType) consensus.Logf("Received and processing message: %s\n", msgType)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
log.Printf("Unexpected message type: %s", msgType) consensus.Logf("Unexpected message type: %s", msgType)
case COMMIT: case COMMIT:
consensus.processCommitMessage(payload) consensus.processCommitMessage(payload)
case CHALLENGE: case CHALLENGE:
log.Printf("Unexpected message type: %s", msgType) consensus.Logf("Unexpected message type: %s", msgType)
case RESPONSE: case RESPONSE:
consensus.processResponseMessage(payload) consensus.processResponseMessage(payload)
case START_CONSENSUS: case START_CONSENSUS:
consensus.processStartConsensusMessage(payload) consensus.processStartConsensusMessage(payload)
default: default:
log.Printf("Unexpected message type: %s", msgType) consensus.Logf("Unexpected message type: %s", msgType)
} }
} }
@ -166,7 +166,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
shouldProcess := !ok && consensus.state == ANNOUNCE_DONE shouldProcess := !ok && consensus.state == ANNOUNCE_DONE
if shouldProcess { if shouldProcess {
consensus.commits[validatorId] = validatorId consensus.commits[validatorId] = validatorId
log.Printf("Number of commits received: %d", len(consensus.commits)) consensus.Logf("Number of commits received: %d", len(consensus.commits))
} }
mutex.Unlock() mutex.Unlock()
@ -176,7 +176,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
mutex.Lock() mutex.Lock()
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 { if len(consensus.commits) >= (2*len(consensus.validators))/3+1 {
log.Printf("Enough commits received with %d signatures", len(consensus.commits)) consensus.Logf("Enough commits received with %d signatures", len(consensus.commits))
if consensus.state == ANNOUNCE_DONE { if consensus.state == ANNOUNCE_DONE {
// Set state to CHALLENGE_DONE // Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE consensus.state = CHALLENGE_DONE
@ -271,7 +271,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
shouldProcess := !ok && consensus.state == CHALLENGE_DONE shouldProcess := !ok && consensus.state == CHALLENGE_DONE
if shouldProcess { if shouldProcess {
consensus.responses[validatorId] = validatorId consensus.responses[validatorId] = validatorId
log.Printf("Number of responses received: %d", len(consensus.responses)) consensus.Logf("Number of responses received: %d", len(consensus.responses))
} }
mutex.Unlock() mutex.Unlock()
@ -281,12 +281,12 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
mutex.Lock() mutex.Lock()
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 { if len(consensus.responses) >= (2*len(consensus.validators))/3+1 {
log.Printf("Consensus reached with %d signatures.", len(consensus.responses)) consensus.Logf("Consensus reached with %d signatures.", len(consensus.responses))
if consensus.state == CHALLENGE_DONE { if consensus.state == CHALLENGE_DONE {
// Set state to FINISHED // Set state to FINISHED
consensus.state = FINISHED consensus.state = FINISHED
// TODO: do followups on the consensus // TODO: do followups on the consensus
log.Printf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators)) consensus.Logf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators))
consensus.ResetState() consensus.ResetState()
consensus.ReadySignal <- 1 consensus.ReadySignal <- 1
} }

@ -19,18 +19,18 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
log.Print(err) log.Print(err)
} }
log.Printf("[Validator] Received and processing message: %s\n", msgType) consensus.Logf("Received and processing message: %s\n", msgType)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
consensus.processAnnounceMessage(payload) consensus.processAnnounceMessage(payload)
case COMMIT: case COMMIT:
log.Printf("Unexpected message type: %s", msgType) consensus.Logf("Unexpected message type: %s", msgType)
case CHALLENGE: case CHALLENGE:
consensus.processChallengeMessage(payload) consensus.processChallengeMessage(payload)
case RESPONSE: case RESPONSE:
log.Printf("Unexpected message type: %s", msgType) consensus.Logf("Unexpected message type: %s", msgType)
default: default:
log.Printf("Unexpected message type: %s", msgType) consensus.Logf("Unexpected message type: %s", msgType)
} }
} }

@ -52,41 +52,25 @@ func (node *Node) NodeHandler(conn net.Conn) {
consensus := node.consensus consensus := node.consensus
if err != nil { if err != nil {
if consensus.IsLeader { node.Logf("Read p2p data failed:%s", err)
log.Printf("[Leader] Read p2p data failed:%s", err)
} else {
log.Printf("[Slave] Read p2p data failed:%s", err)
}
return return
} }
msgCategory, err := message.GetMessageCategory(content) msgCategory, err := message.GetMessageCategory(content)
if err != nil { if err != nil {
if consensus.IsLeader { node.Logf("Read node type failed:%s", err)
log.Printf("[Leader] Read node type failed:%s", err)
} else {
log.Printf("[Slave] Read node type failed:%s", err)
}
return return
} }
msgType, err := message.GetMessageType(content) msgType, err := message.GetMessageType(content)
if err != nil { if err != nil {
if consensus.IsLeader { node.Logf("Read action type failed:%s", err)
log.Printf("[Leader] Read action type failed:%s", err)
} else {
log.Printf("[Slave] Read action type failed:%s", err)
}
return return
} }
msgPayload, err := message.GetMessagePayload(content) msgPayload, err := message.GetMessagePayload(content)
if err != nil { if err != nil {
if consensus.IsLeader { node.Logf("Read message payload failed:%s", err)
log.Printf("[Leader] Read message payload failed:%s", err)
} else {
log.Printf("[Slave] Read message payload failed:%s", err)
}
return return
} }
@ -110,14 +94,14 @@ func (node *Node) NodeHandler(conn net.Conn) {
txList := new([]blockchain.Transaction) txList := new([]blockchain.Transaction)
err := txDecoder.Decode(&txList) err := txDecoder.Decode(&txList)
if err != nil { if err != nil {
log.Println("Failed deserializing transaction list") node.Logln("Failed deserializing transaction list")
} }
node.pendingTransactions = append(node.pendingTransactions, *txList...) node.pendingTransactions = append(node.pendingTransactions, *txList...)
log.Println(len(node.pendingTransactions)) node.Logln(len(node.pendingTransactions))
case message.CONTROL: case message.CONTROL:
controlType := msgPayload[0] controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP { if ControlMessageType(controlType) == STOP {
log.Println("Stopping Node") node.Logln("Stopping Node")
os.Exit(0) os.Exit(0)
} }
@ -132,7 +116,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
newBlock := new(blockchain.Block) newBlock := new(blockchain.Block)
for { for {
if len(node.pendingTransactions) >= 10 { if len(node.pendingTransactions) >= 10 {
log.Println("Creating new block") node.Logln("Creating new block")
// TODO (Minh): package actual transactions // TODO (Minh): package actual transactions
// For now, just take out 10 transactions // For now, just take out 10 transactions
var txList []*blockchain.Transaction var txList []*blockchain.Transaction
@ -156,3 +140,18 @@ func NewNode(consensus *consensus.Consensus) Node {
node.BlockChannel = make(chan blockchain.Block) node.BlockChannel = make(chan blockchain.Block)
return node return node
} }
// Returns the identity string of this node
func (node *Node) GetIdentityString() string {
return node.consensus.GetIdentityString()
}
// Prints log with ID of this node
func (node *Node) Logln(v ...interface{}) {
node.consensus.Logln(v...)
}
// Prints formatted log with ID of this node
func (node *Node) Logf(format string, v ...interface{}) {
node.consensus.Logf(format, v...)
}

Loading…
Cancel
Save