From 412923292881238aef03bdb466fca132c8bf8cba Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Thu, 14 Jun 2018 22:51:23 -0700 Subject: [PATCH 1/4] add VerifyNewBlock and its test --- blockchain/block.go | 2 +- blockchain/blockchain.go | 19 ++++++++++++++++++- blockchain/blockchain_test.go | 16 ++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/blockchain/block.go b/blockchain/block.go index d3bfca4e5..2eb64fc31 100644 --- a/blockchain/block.go +++ b/blockchain/block.go @@ -62,7 +62,7 @@ func (b *Block) HashTransactions() []byte { return txHash[:] } -// NewBlock creates and returns Block. +// NewBlock creates and returns a neew block. func NewBlock(transactions []*Transaction, prevBlockHash []byte) *Block { block := &Block{time.Now().Unix(), transactions, prevBlockHash, []byte{}} block.Hash = block.HashTransactions() diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index cec050860..edcab3389 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -1,7 +1,9 @@ package blockchain import ( + "bytes" "encoding/hex" + "fmt" ) // Blockchain keeps a sequence of Blocks @@ -134,13 +136,28 @@ func (bc *Blockchain) NewUTXOTransaction(from, to string, amount int) *Transacti func (bc *Blockchain) AddNewTransferAmount(from, to string, amount int) *Blockchain { tx := bc.NewUTXOTransaction(from, to, amount) if tx != nil { - newBlock := NewBlock([]*Transaction{tx}, bc.blocks[len(bc.blocks)-1].PrevBlockHash) + newBlock := NewBlock([]*Transaction{tx}, bc.blocks[len(bc.blocks)-1].Hash) bc.blocks = append(bc.blocks, newBlock) return bc } return nil } +// VerifyNewBlock verifies if the new coming block is valid for the current blockchain. +func (bc *Blockchain) VerifyNewBlock(block *Block) bool { + length := len(bc.blocks) + if bytes.Compare(block.PrevBlockHash, bc.blocks[length-1].Hash) != 0 { + fmt.Println("MINh1") + return false + } + if block.Timestamp < bc.blocks[length-1].Timestamp { + fmt.Println("MINh2") + return false + } + // TODO(minhdoan): Check Transactions parts + return true +} + // CreateBlockchain creates a new blockchain DB func CreateBlockchain(address string) *Blockchain { // TODO: We assume we have not created any blockchain before. diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index 5db10a02e..0fb2003c9 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -61,3 +61,19 @@ func TestAddNewTransferAmount(t *testing.T) { t.Error("minh should not have enough fun to make the transfer") } } + +func TestVerifyNewBlock(t *testing.T) { + bc := CreateBlockchain("minh") + bc = bc.AddNewTransferAmount("minh", "alok", 3) + bc = bc.AddNewTransferAmount("minh", "rj", 100) + + tx := bc.NewUTXOTransaction("minh", "mark", 10) + if tx == nil { + t.Error("failed to create a new transaction.") + } + newBlock := NewBlock([]*Transaction{tx}, bc.blocks[len(bc.blocks)-1].Hash) + + if !bc.VerifyNewBlock(newBlock) { + t.Error("failed to add a new valid block.") + } +} From 4e0b706c0da8ee9e0a492d3725bcdb553ff725b3 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Fri, 15 Jun 2018 12:30:38 -0700 Subject: [PATCH 2/4] Correct some comments and variable names --- message/message.go | 13 +++++++------ node/node.go | 8 ++++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/message/message.go b/message/message.go index db4696263..78af3854c 100644 --- a/message/message.go +++ b/message/message.go @@ -10,13 +10,14 @@ Node will process the content of the p2p message ---- content start ----- 1 byte - message category - 0x00: consensus - 0x01: normal... -1 byte - action type - - consensus node + 0x00: COMMITTEE + 0x01: NODE... +1 byte - message type + - for COMMITTEE category 0x00: consensus - - normal node - 0x00: transaction + 0x01: sharding ... + - for NODE category + 0x00: transaction ... n - 2 bytes - actual message payload ---- content end ----- diff --git a/node/node.go b/node/node.go index d3d96cc28..ff7d60c8c 100644 --- a/node/node.go +++ b/node/node.go @@ -46,7 +46,7 @@ func (node *Node) NodeHandler(conn net.Conn) { defer conn.Close() // Read p2p message payload - payload, err := p2p.ReadMessageContent(conn) + content, err := p2p.ReadMessageContent(conn) consensus := node.consensus if err != nil { @@ -58,7 +58,7 @@ func (node *Node) NodeHandler(conn net.Conn) { return } - msgCategory, err := message.GetMessageCategory(payload) + msgCategory, err := message.GetMessageCategory(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read node type failed:%s", err) @@ -68,7 +68,7 @@ func (node *Node) NodeHandler(conn net.Conn) { return } - msgType, err := message.GetMessageType(payload) + msgType, err := message.GetMessageType(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read action type failed:%s", err) @@ -78,7 +78,7 @@ func (node *Node) NodeHandler(conn net.Conn) { return } - msgPayload, err := message.GetMessagePayload(payload) + msgPayload, err := message.GetMessagePayload(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read message payload failed:%s", err) From 79adb142799c9e4839c203bb92b889ab636d9cfe Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Fri, 15 Jun 2018 16:53:48 -0700 Subject: [PATCH 3/4] create channel to send new block to consensus and consensus to signal readiness --- benchmark_main.go | 14 ++++++++++++++ consensus/consensus.go | 11 +++++++++++ consensus/consensus_leader.go | 27 ++++++++++++++++++++++----- local_iplist.txt | 2 +- node/node.go | 26 +++++++++++++++++++++++++- 5 files changed, 73 insertions(+), 7 deletions(-) diff --git a/benchmark_main.go b/benchmark_main.go index a1cfa9474..ba30661f0 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -61,5 +61,19 @@ func main() { log.Println("======================================") node := node.NewNode(&consensus) + + if consensus.IsLeader { + // Let consensus run + go func() { + log.Println("Waiting for block") + consensus.WaitForNewBlock(node.BlockChannel) + }() + // Node waiting for consensus readiness to create new block + go func() { + log.Println("Waiting for consensus ready") + node.WaitForConsensusReady(consensus.ReadySignal) + }() + } + node.StartServer(*port) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 3e508269b..a3ee43197 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -35,6 +35,9 @@ type Consensus struct { // BlockHeader to run consensus on blockHeader []byte + // Signal channel for starting a new consensus process + ReadySignal chan int + //// Network related fields msgCategory byte actionType byte @@ -103,6 +106,14 @@ func NewConsensus(ip, port string, peers []p2p.Peer, leader p2p.Peer) Consensus value, err := strconv.Atoi(socketId) consensus.nodeId = uint16(value) + if consensus.IsLeader { + consensus.ReadySignal = make(chan int) + // send a signal to indicate it's ready to run consensus + go func() { + consensus.ReadySignal <- 1 + }() + } + consensus.msgCategory = byte(message.COMMITTEE) consensus.actionType = byte(message.CONSENSUS) return consensus diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index b87763b8d..0dff34d8f 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -9,10 +9,23 @@ import ( "encoding/binary" "errors" "fmt" + "harmony-benchmark/blockchain" ) var mutex = &sync.Mutex{} + +func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { + for { // keep waiting for new blocks + newBlock := <- blockChannel + log.Println("got block.....") + // TODO: think about potential race condition + if consensus.state == READY { + consensus.startConsensus(&newBlock) + } + } +} + // Leader's consensus message dispatcher func (consensus *Consensus) ProcessMessageLeader(message []byte) { msgType, err := GetConsensusMessageType(message) @@ -25,7 +38,6 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { log.Print(err) } - msg := string(payload) log.Printf("[Leader] Received and processing message: %s\n", msgType) switch msgType { case ANNOUNCE: @@ -37,18 +49,22 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { case RESPONSE: consensus.processResponseMessage(payload) case START_CONSENSUS: - consensus.processStartConsensusMessage(msg) + consensus.processStartConsensusMessage(payload) default: log.Println("Unexpected message type: %s", msgType) } } // Handler for message which triggers consensus process -func (consensus *Consensus) processStartConsensusMessage(msg string) { +func (consensus *Consensus) processStartConsensusMessage(payload []byte) { + consensus.startConsensus(blockchain.NewGenesisBlock(blockchain.NewCoinbaseTX("x", "y"))) +} + +func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { // prepare message and broadcast to validators // Construct new block - newBlock := constructNewBlock() - consensus.blockHash = getBlockHash(newBlock) + //newBlock := constructNewBlock() + consensus.blockHash = newBlock.Hash msgToSend, err := consensus.constructAnnounceMessage() if err != nil { @@ -273,6 +289,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { // TODO: do followups on the consensus log.Printf("HOORAY!!! CONSENSUS REACHED AMONG %d NODES!!!\n", len(consensus.validators)) consensus.ResetState() + consensus.ReadySignal <- 1 } // TODO: composes new block and broadcast the new block to validators } diff --git a/local_iplist.txt b/local_iplist.txt index 47de6810a..9f5d026fe 100644 --- a/local_iplist.txt +++ b/local_iplist.txt @@ -1,4 +1,3 @@ -127.0.0.1 9000 leader 127.0.0.1 9001 validator 127.0.0.1 9002 validator 127.0.0.1 9003 validator @@ -99,3 +98,4 @@ 127.0.0.1 9098 validator 127.0.0.1 9099 validator 127.0.0.1 9100 validator +127.0.0.1 9000 leader diff --git a/node/node.go b/node/node.go index ff7d60c8c..1679d3491 100644 --- a/node/node.go +++ b/node/node.go @@ -10,16 +10,18 @@ import ( "harmony-benchmark/blockchain" "bytes" "encoding/gob" + "time" ) // A node represents a program (machine) participating in the network type Node struct { consensus *consensus.Consensus + BlockChannel chan blockchain.Block pendingTransactions []blockchain.Transaction } // Start a server and process the request by a handler. -func (node Node) StartServer(port string) { +func (node *Node) StartServer(port string) { listenOnPort(port, node.NodeHandler) } @@ -112,7 +114,28 @@ func (node *Node) NodeHandler(conn net.Conn) { } node.pendingTransactions = append(node.pendingTransactions, *txList...) log.Println(len(node.pendingTransactions)) + + } + } +} + +func (node *Node) WaitForConsensusReady(readySignal chan int) { + for { // keep waiting for consensus ready + <- readySignal + log.Println("got ready signal.....") + // create a new block + newBlock := new(blockchain.Block) + for { + if len(node.pendingTransactions) >= 10 { + log.Println("creating new block") + // TODO: package actual transactions + newBlock = blockchain.NewGenesisBlock(blockchain.NewCoinbaseTX("x", "y")) + break + } + time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block. } + log.Println("sending new block to consensus") + node.BlockChannel <- *newBlock } } @@ -120,5 +143,6 @@ func (node *Node) NodeHandler(conn net.Conn) { func NewNode(consensus *consensus.Consensus) Node { node := Node{} node.consensus = consensus + node.BlockChannel = make(chan blockchain.Block) return node } \ No newline at end of file From d2927374b1d5802533ffc2f8be3b4e7cad716be8 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Fri, 15 Jun 2018 17:13:27 -0700 Subject: [PATCH 4/4] Take out transactions from pending Pool for new block --- aws-code/transaction_generator.go | 6 ++++++ consensus/consensus_leader.go | 1 - node/node.go | 14 +++++++++----- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 3877439a7..e437f2ce5 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -22,15 +22,21 @@ func main() { ip := flag.String("ip", "127.0.0.1", "IP of the leader") port := flag.String("port", "9000", "port of the leader.") + txToSend := flag.Int("tx_count", 100, "number of transaction") txs := make([]blockchain.Transaction, 10) + txCount := 0 for true { + if txCount >= *txToSend { + break + } for i := range txs { txs[i] = newRandTransaction() } msg := node.ConstructTransactionListMessage(txs) p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg) + txCount += len(txs) time.Sleep(1 * time.Second) // 10 transactions per second } } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 0dff34d8f..b8ecf2f77 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -18,7 +18,6 @@ var mutex = &sync.Mutex{} func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { for { // keep waiting for new blocks newBlock := <- blockChannel - log.Println("got block.....") // TODO: think about potential race condition if consensus.state == READY { consensus.startConsensus(&newBlock) diff --git a/node/node.go b/node/node.go index 1679d3491..441f9a9a3 100644 --- a/node/node.go +++ b/node/node.go @@ -122,19 +122,23 @@ func (node *Node) NodeHandler(conn net.Conn) { func (node *Node) WaitForConsensusReady(readySignal chan int) { for { // keep waiting for consensus ready <- readySignal - log.Println("got ready signal.....") // create a new block newBlock := new(blockchain.Block) for { if len(node.pendingTransactions) >= 10 { - log.Println("creating new block") - // TODO: package actual transactions - newBlock = blockchain.NewGenesisBlock(blockchain.NewCoinbaseTX("x", "y")) + log.Println("Creating new block") + // TODO (Minh): package actual transactions + // For now, just take out 10 transactions + var txList []*blockchain.Transaction + for _, tx := range node.pendingTransactions[0:10] { + txList = append(txList, &tx) + } + node.pendingTransactions = node.pendingTransactions[10:] + newBlock = blockchain.NewBlock(txList, []byte{}) break } time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block. } - log.Println("sending new block to consensus") node.BlockChannel <- *newBlock } }