create channel to send new block to consensus and consensus to signal readiness

pull/3/head^2
Rongjian Lan 6 years ago
parent 4e0b706c0d
commit 79adb14279
  1. 14
      benchmark_main.go
  2. 11
      consensus/consensus.go
  3. 27
      consensus/consensus_leader.go
  4. 2
      local_iplist.txt
  5. 26
      node/node.go

@ -61,5 +61,19 @@ func main() {
log.Println("======================================")
node := node.NewNode(&consensus)
if consensus.IsLeader {
// Let consensus run
go func() {
log.Println("Waiting for block")
consensus.WaitForNewBlock(node.BlockChannel)
}()
// Node waiting for consensus readiness to create new block
go func() {
log.Println("Waiting for consensus ready")
node.WaitForConsensusReady(consensus.ReadySignal)
}()
}
node.StartServer(*port)
}

@ -35,6 +35,9 @@ type Consensus struct {
// BlockHeader to run consensus on
blockHeader []byte
// Signal channel for starting a new consensus process
ReadySignal chan int
//// Network related fields
msgCategory byte
actionType byte
@ -103,6 +106,14 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus
value, err := strconv.Atoi(socketId)
consensus.nodeId = uint16(value)
if consensus.IsLeader {
consensus.ReadySignal = make(chan int)
// send a signal to indicate it's ready to run consensus
go func() {
consensus.ReadySignal <- 1
}()
}
consensus.msgCategory = byte(message.COMMITTEE)
consensus.actionType = byte(message.CONSENSUS)
return consensus

@ -9,10 +9,23 @@ import (
"encoding/binary"
"errors"
"fmt"
"harmony-benchmark/blockchain"
)
var mutex = &sync.Mutex{}
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
for { // keep waiting for new blocks
newBlock := <- blockChannel
log.Println("got block.....")
// TODO: think about potential race condition
if consensus.state == READY {
consensus.startConsensus(&newBlock)
}
}
}
// Leader's consensus message dispatcher
func (consensus *Consensus) ProcessMessageLeader(message []byte) {
msgType, err := GetConsensusMessageType(message)
@ -25,7 +38,6 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
log.Print(err)
}
msg := string(payload)
log.Printf("[Leader] Received and processing message: %s\n", msgType)
switch msgType {
case ANNOUNCE:
@ -37,18 +49,22 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
case RESPONSE:
consensus.processResponseMessage(payload)
case START_CONSENSUS:
consensus.processStartConsensusMessage(msg)
consensus.processStartConsensusMessage(payload)
default:
log.Println("Unexpected message type: %s", msgType)
}
}
// Handler for message which triggers consensus process
func (consensus *Consensus) processStartConsensusMessage(msg string) {
func (consensus *Consensus) processStartConsensusMessage(payload []byte) {
consensus.startConsensus(blockchain.NewGenesisBlock(blockchain.NewCoinbaseTX("x", "y")))
}
func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
// prepare message and broadcast to validators
// Construct new block
newBlock := constructNewBlock()
consensus.blockHash = getBlockHash(newBlock)
//newBlock := constructNewBlock()
consensus.blockHash = newBlock.Hash
msgToSend, err := consensus.constructAnnounceMessage()
if err != nil {
@ -273,6 +289,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
// TODO: do followups on the consensus
log.Printf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators))
consensus.ResetState()
consensus.ReadySignal <- 1
}
// TODO: composes new block and broadcast the new block to validators
}

@ -1,4 +1,3 @@
127.0.0.1 9000 leader
127.0.0.1 9001 validator
127.0.0.1 9002 validator
127.0.0.1 9003 validator
@ -99,3 +98,4 @@
127.0.0.1 9098 validator
127.0.0.1 9099 validator
127.0.0.1 9100 validator
127.0.0.1 9000 leader

@ -10,16 +10,18 @@ import (
"harmony-benchmark/blockchain"
"bytes"
"encoding/gob"
"time"
)
// A node represents a program (machine) participating in the network
type Node struct {
consensus *consensus.Consensus
BlockChannel chan blockchain.Block
pendingTransactions []blockchain.Transaction
}
// Start a server and process the request by a handler.
func (node Node) StartServer(port string) {
func (node *Node) StartServer(port string) {
listenOnPort(port, node.NodeHandler)
}
@ -112,7 +114,28 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
log.Println(len(node.pendingTransactions))
}
}
}
func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready
<- readySignal
log.Println("got ready signal.....")
// create a new block
newBlock := new(blockchain.Block)
for {
if len(node.pendingTransactions) >= 10 {
log.Println("creating new block")
// TODO: package actual transactions
newBlock = blockchain.NewGenesisBlock(blockchain.NewCoinbaseTX("x", "y"))
break
}
time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block.
}
log.Println("sending new block to consensus")
node.BlockChannel <- *newBlock
}
}
@ -120,5 +143,6 @@ func (node *Node) NodeHandler(conn net.Conn) {
func NewNode(consensus *consensus.Consensus) Node {
node := Node{}
node.consensus = consensus
node.BlockChannel = make(chan blockchain.Block)
return node
}
Loading…
Cancel
Save