From cb983ce48460f832faf13928aa61b4dca5adc412 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 20 Jun 2018 11:25:43 -0700 Subject: [PATCH 1/4] Add transactions num and Ids list to block --- aws-code/transaction_generator.go | 2 +- blockchain/block.go | 25 +++++++++++++++++++------ blockchain/blockchain.go | 6 +++--- blockchain/transaction.go | 8 ++++---- blockchain/utxopool.go | 8 ++++---- node/node_handler.go | 18 ++++++------------ 6 files changed, 37 insertions(+), 30 deletions(-) diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 1724acb6d..f6b297940 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -53,7 +53,7 @@ func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Trans // Spend the money of current UTXO to a random address in [1 - 1000] txin := blockchain.TXInput{txId, index, address} txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(1000))} - tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}} + tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}} tx.SetID() if count >= numTxs { diff --git a/blockchain/block.go b/blockchain/block.go index a51059402..794b92ab7 100644 --- a/blockchain/block.go +++ b/blockchain/block.go @@ -9,12 +9,19 @@ import ( "time" ) -// Block keeps block headers. +// Block keeps block headers, transactions and signature. type Block struct { - Timestamp int64 + // Header + Timestamp int64 + PrevBlockHash [32]byte + Hash [32]byte + NumTransactions int32 + TransactionIds [][32]byte + + // Transactions Transactions []*Transaction - PrevBlockHash [32]byte - Hash [32]byte + + // Signature... } // Serialize serializes the block @@ -56,7 +63,7 @@ func (b *Block) HashTransactions() []byte { var txHash [32]byte for _, tx := range b.Transactions { - txHashes = append(txHashes, tx.ID) + txHashes = append(txHashes, tx.ID[:]) } txHash = sha256.Sum256(bytes.Join(txHashes, []byte{})) return txHash[:] @@ -64,7 +71,13 @@ func (b *Block) HashTransactions() []byte { // NewBlock creates and returns a neew block. func NewBlock(transactions []*Transaction, prevBlockHash [32]byte) *Block { - block := &Block{time.Now().Unix(), transactions, prevBlockHash, [32]byte{}} + numTxs := int32(len(transactions)) + var txIds [][32]byte + + for _, tx := range transactions { + txIds = append(txIds, tx.ID) + } + block := &Block{time.Now().Unix(), prevBlockHash, [32]byte{},numTxs, txIds,transactions} copy(block.Hash[:], block.HashTransactions()[:]) // TODO(Minh): the blockhash should be a hash of everything in the block return block diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index d3fb703cd..3a4a210cb 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -29,7 +29,7 @@ func (bc *Blockchain) FindUnspentTransactions(address string) []Transaction { block := bc.Blocks[index] for _, tx := range block.Transactions { - txID := hex.EncodeToString(tx.ID) + txID := hex.EncodeToString(tx.ID[:]) idx := -1 // TODO(minhdoan): Optimize this. @@ -85,7 +85,7 @@ func (bc *Blockchain) FindSpendableOutputs(address string, amount int) (int, map Work: for _, tx := range unspentTXs { - txID := hex.EncodeToString(tx.ID) + txID := hex.EncodeToString(tx.ID[:]) for outIdx, txOutput := range tx.TxOutput { if txOutput.Address == address && accumulated < amount { @@ -132,7 +132,7 @@ func (bc *Blockchain) NewUTXOTransaction(from, to string, amount int) *Transacti outputs = append(outputs, TXOutput{acc - amount, from}) // a change } - tx := Transaction{nil, inputs, outputs} + tx := Transaction{[32]byte{}, inputs, outputs} tx.SetID() return &tx diff --git a/blockchain/transaction.go b/blockchain/transaction.go index dab10cb2a..ee38c78fb 100644 --- a/blockchain/transaction.go +++ b/blockchain/transaction.go @@ -11,7 +11,7 @@ import ( // Transaction represents a Bitcoin transaction type Transaction struct { - ID []byte // 32 byte hash + ID [32]byte // 32 byte hash TxInput []TXInput TxOutput []TXOutput } @@ -43,7 +43,7 @@ func (tx *Transaction) SetID() { log.Panic(err) } hash = sha256.Sum256(encoded.Bytes()) - tx.ID = hash[:] + tx.ID = hash } // NewCoinbaseTX creates a new coinbase transaction @@ -54,7 +54,7 @@ func NewCoinbaseTX(to, data string) *Transaction { txin := TXInput{[]byte{}, -1, data} txout := TXOutput{DefaultCoinbaseValue, to} - tx := Transaction{nil, []TXInput{txin}, []TXOutput{txout}} + tx := Transaction{[32]byte{}, []TXInput{txin}, []TXOutput{txout}} tx.SetID() return &tx } @@ -76,7 +76,7 @@ func (txOutput *TXOutput) String() string { // Used for debuging. func (tx *Transaction) String() string { - res := fmt.Sprintf("ID: %v\n", hex.EncodeToString(tx.ID)) + res := fmt.Sprintf("ID: %v\n", hex.EncodeToString(tx.ID[:])) res += fmt.Sprintf("TxInput:\n") for id, value := range tx.TxInput { res += fmt.Sprintf("%v: %v\n", id, value.String()) diff --git a/blockchain/utxopool.go b/blockchain/utxopool.go index 6ab830546..3a2ab1173 100644 --- a/blockchain/utxopool.go +++ b/blockchain/utxopool.go @@ -79,7 +79,7 @@ func (utxoPool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool { // VerifyOneTransaction verifies if a list of transactions valid. func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool { spentTXOs := make(map[string]map[string]map[int]bool) - txID := hex.EncodeToString(tx.ID) + txID := hex.EncodeToString(tx.ID[:]) inTotal := 0 // Calculate the sum of TxInput for _, in := range tx.TxInput { @@ -121,7 +121,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool { // UpdateOneTransaction updates utxoPool in respect to the new Transaction. func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) { if utxoPool != nil { - txID := hex.EncodeToString(tx.ID) + txID := hex.EncodeToString(tx.ID[:]) // Remove for _, in := range tx.TxInput { @@ -166,7 +166,7 @@ func (utxoPool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool { func (utxoPool *UTXOPool) Update(transactions []*Transaction) { if utxoPool != nil { for _, tx := range transactions { - curTxID := hex.EncodeToString(tx.ID) + curTxID := hex.EncodeToString(tx.ID[:]) // Remove for _, in := range tx.TxInput { @@ -192,7 +192,7 @@ func (utxoPool *UTXOPool) Update(transactions []*Transaction) { // CreateUTXOPoolFromTransaction a Utxo pool from a genesis transaction. func CreateUTXOPoolFromTransaction(tx *Transaction) *UTXOPool { var utxoPool UTXOPool - txID := hex.EncodeToString(tx.ID) + txID := hex.EncodeToString(tx.ID[:]) utxoPool.UtxoMap = make(map[string]map[string]map[int]int) for index, out := range tx.TxOutput { utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int) diff --git a/node/node_handler.go b/node/node_handler.go index fd50b34d3..c91eacc1c 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -85,19 +85,21 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { case REQUEST: reader := bytes.NewBuffer(msgPayload[1:]) var txIds map[[32]byte]bool - txId := make([]byte, 32) // 32 byte hash Id + buf := make([]byte, 32) // 32 byte hash Id for { - _, err := reader.Read(txId) + _, err := reader.Read(buf) if err != nil { break } - txIds[getFixedByteTxId(txId)] = true + var txId [32]byte + copy(txId[:], buf) + txIds[txId] = true } var txToReturn []*blockchain.Transaction for _, tx := range node.pendingTransactions { - if txIds[getFixedByteTxId(tx.ID)] { + if txIds[tx.ID] { txToReturn = append(txToReturn, tx) } } @@ -106,14 +108,6 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { } } -// Copy the txId byte slice over to 32 byte array so the map can key on it -func getFixedByteTxId(txId []byte) [32]byte { - var id [32]byte - for i := range id { - id[i] = txId[i] - } - return id -} func (node *Node) WaitForConsensusReady(readySignal chan int) { node.log.Debug("Waiting for consensus ready", "node", node) From de988ef76ab6339603cfbfca12dcafe9a7fff4fd Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 20 Jun 2018 12:27:58 -0700 Subject: [PATCH 2/4] Add block verification in validator node; link verification function from Node obj to Consensus obj --- benchmark_main.go | 2 ++ consensus/consensus.go | 2 ++ consensus/consensus_leader.go | 41 ++++++++------------- consensus/consensus_leader_test.go | 11 +++--- consensus/consensus_validator.go | 52 +++++++++++++++++++++++---- consensus/consensus_validator_test.go | 4 +-- node/node_handler.go | 5 +++ 7 files changed, 75 insertions(+), 42 deletions(-) diff --git a/benchmark_main.go b/benchmark_main.go index 6a39407f4..5b17a7b0f 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -76,6 +76,8 @@ func main() { consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) node := node.NewNode(&consensus) + consensus.BlockVerifier = node.VerifyNewBlock // Assign block verifier to the consensus + // Temporary testing code, to be removed. node.AddMoreFakeTransactions() diff --git a/consensus/consensus.go b/consensus/consensus.go index 3148c0b3d..8b57b52f6 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -8,6 +8,7 @@ import ( "harmony-benchmark/p2p" "regexp" "strconv" + "harmony-benchmark/blockchain" ) // Consensus data containing all info related to one consensus process @@ -40,6 +41,7 @@ type Consensus struct { // Signal channel for starting a new consensus process ReadySignal chan int + BlockVerifier func(*blockchain.Block)bool //// Network related fields msgCategory byte diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 3aaa72d9b..ca5b75ba7 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -6,11 +6,10 @@ import ( "bytes" "crypto/sha256" "encoding/binary" - "errors" - "fmt" "harmony-benchmark/blockchain" "harmony-benchmark/p2p" "strings" + "encoding/gob" ) var mutex = &sync.Mutex{} @@ -63,21 +62,23 @@ func (consensus *Consensus) processStartConsensusMessage(payload []byte) { func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { // prepare message and broadcast to validators - // Construct new block - //newBlock := constructNewBlock() - copy(newBlock.Hash[:32], consensus.blockHash[:]) - msgToSend, err := consensus.constructAnnounceMessage() - if err != nil { - return - } + // Copy over block hash and block header data + copy(consensus.blockHash[:], newBlock.Hash[:]) + + byteBuffer := bytes.NewBuffer([]byte{}) + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(newBlock) + consensus.blockHeader = byteBuffer.Bytes() + + msgToSend := consensus.constructAnnounceMessage() // Set state to ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE p2p.BroadcastMessage(consensus.validators, msgToSend) } // Construct the announce message to send to validators -func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { +func (consensus Consensus) constructAnnounceMessage() []byte { buffer := bytes.NewBuffer([]byte{}) // 4 byte consensus id @@ -86,9 +87,6 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { buffer.Write(fourBytes) // 32 byte block hash - if len(consensus.blockHash) != 32 { - return buffer.Bytes(), errors.New(fmt.Sprintf("Block Hash size is %d bytes", len(consensus.blockHash))) - } buffer.Write(consensus.blockHash[:]) // 2 byte leader id @@ -97,11 +95,10 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { buffer.Write(twoBytes) // n byte of block header - blockHeader := getBlockHeader() - buffer.Write(blockHeader) + buffer.Write(consensus.blockHeader) // 4 byte of payload size - sizeOfPayload := uint32(len(blockHeader)) + sizeOfPayload := uint32(len(consensus.blockHeader)) binary.BigEndian.PutUint32(fourBytes, sizeOfPayload) buffer.Write(fourBytes) @@ -109,17 +106,7 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { signature := signMessage(buffer.Bytes()) buffer.Write(signature) - return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes()), nil -} - -// Get the hash of a block's byte stream -func getBlockHash(block []byte) [32]byte { - return sha256.Sum256(block) -} - -// TODO: fill in this function -func getBlockHeader() []byte { - return make([]byte, 200) + return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes()) } func signMessage(message []byte) []byte { diff --git a/consensus/consensus_leader_test.go b/consensus/consensus_leader_test.go index f500c8d25..6e1e96397 100644 --- a/consensus/consensus_leader_test.go +++ b/consensus/consensus_leader_test.go @@ -6,16 +6,13 @@ import ( ) func TestConstructAnnounceMessage(test *testing.T) { - header := getBlockHeader() leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - consensus.blockHash = getBlockHash(make([]byte, 10)) - msg, err := consensus.constructAnnounceMessage() + consensus.blockHash = [32]byte{} + header := consensus.blockHeader + msg := consensus.constructAnnounceMessage() - if err != nil { - test.Error("Annouce message is not constructed successfully") - } if len(msg) != 1+1+1+4+32+2+4+64+len(header) { test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) } @@ -25,7 +22,7 @@ func TestConstructChallengeMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - consensus.blockHash = getBlockHash(make([]byte, 10)) + consensus.blockHash = [32]byte{} msg := consensus.constructChallengeMessage() if len(msg) != 1+1+1+4+32+2+33+33+32+64 { diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 7c02fdd13..861ee0ec7 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -4,6 +4,10 @@ import ( "bytes" "encoding/binary" "harmony-benchmark/p2p" + "strconv" + "regexp" + "encoding/gob" + "harmony-benchmark/blockchain" ) // Validator's consensus message dispatcher @@ -43,8 +47,8 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { blockHash := payload[offset : offset+32] offset += 32 - // 2 byte validator id - leaderId := string(payload[offset : offset+2]) + // 2 byte leader id + leaderId := binary.BigEndian.Uint16(payload[offset : offset+2]) offset += 2 // n byte of block header @@ -63,18 +67,54 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { // TODO: make use of the data. This is just to avoid the unused variable warning _ = consensusId - _ = blockHash + _ = leaderId _ = blockHeader _ = blockHeaderSize _ = signature - copy(blockHash[:32], consensus.blockHash[:]) - // verify block data + copy(consensus.blockHash[:], blockHash[:]) + + // Verify block data + // check consensus Id if consensusId != consensus.consensusId { - consensus.Log.Debug("Received message", "fromConsensus", consensus) + consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId) + return + } + + // check leader Id + leaderPrivKey := consensus.leader.Ip + consensus.leader.Port + reg, _ := regexp.Compile("[^0-9]+") + socketId := reg.ReplaceAllString(leaderPrivKey, "") + value, _ := strconv.Atoi(socketId) + if leaderId != uint16(value) { + consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId) + return + } + + // check block header is valid + txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader)) // skip the SEND messge type + + var blockHeaderObj blockchain.Block // TODO: separate header from block + err := txDecoder.Decode(&blockHeaderObj) + if err != nil { + consensus.Log.Debug("[ERROR] Unparseable block header data") + return + } + + // check block hash + if bytes.Compare(blockHash[:], blockHeaderObj.HashTransactions()[:]) != 0 || bytes.Compare(blockHeaderObj.Hash[:], blockHeaderObj.HashTransactions()[:]) != 0 { + consensus.Log.Debug("[ERROR] Block hash doesn't match") return } + + // check block data (transactions + if !consensus.BlockVerifier(&blockHeaderObj) { + consensus.Log.Debug("[ERROR] Block content is not verified successfully") + return + } + + // sign block // TODO: return the signature(commit) to leader diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go index a8e2b0e7e..0f9477e2b 100644 --- a/consensus/consensus_validator_test.go +++ b/consensus/consensus_validator_test.go @@ -9,7 +9,7 @@ func TestConstructCommitMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - consensus.blockHash = getBlockHash(make([]byte, 10)) + consensus.blockHash = [32]byte{} msg := consensus.constructCommitMessage() if len(msg) != 1+1+1+4+32+2+33+64 { @@ -21,7 +21,7 @@ func TestConstructResponseMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) - consensus.blockHash = getBlockHash(make([]byte, 10)) + consensus.blockHash = [32]byte{} msg := consensus.constructResponseMessage() if len(msg) != 1+1+1+4+32+2+32+64 { diff --git a/node/node_handler.go b/node/node_handler.go index c91eacc1c..570c9044b 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -147,4 +147,9 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { // Send the new block to consensus so it can be confirmed. node.BlockChannel <- *newBlock } +} + +func (node *Node) VerifyNewBlock(block *blockchain.Block) bool { + // TODO: fill in this function + return true } \ No newline at end of file From 3f9bdde12ec5112a9223470f0445c7225efeff85 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 20 Jun 2018 12:49:18 -0700 Subject: [PATCH 3/4] Implement block verifier for verifying new block --- consensus/consensus_validator.go | 32 +++++++++++++++++++++++--------- node/node_handler.go | 3 +-- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 861ee0ec7..8063c9470 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -93,7 +93,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { } // check block header is valid - txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader)) // skip the SEND messge type + txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader)) var blockHeaderObj blockchain.Block // TODO: separate header from block err := txDecoder.Decode(&blockHeaderObj) @@ -114,9 +114,6 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { return } - - // sign block - // TODO: return the signature(commit) to leader // For now, simply return the private key of this node. msgToSend := consensus.constructCommitMessage() @@ -154,8 +151,8 @@ func (consensus Consensus) constructCommitMessage() []byte { return consensus.ConstructConsensusMessage(COMMIT, buffer.Bytes()) } -// TODO: fill in this function func getCommitMessage() []byte { + // TODO: use real cosi signature return make([]byte, 33) } @@ -171,7 +168,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { offset += 32 // 2 byte leader id - leaderId := string(payload[offset : offset+2]) + leaderId := binary.BigEndian.Uint16(payload[offset : offset+2]) offset += 2 // 33 byte of aggregated commit @@ -200,13 +197,30 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { _ = challenge _ = signature - // verify block data and the aggregated signatures + // erify block data and the aggregated signatures + // check consensus Id if consensusId != consensus.consensusId { - consensus.Log.Debug("Received message", "fromConsensus", consensusId) + consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId) + return + } + + // check leader Id + leaderPrivKey := consensus.leader.Ip + consensus.leader.Port + reg, _ := regexp.Compile("[^0-9]+") + socketId := reg.ReplaceAllString(leaderPrivKey, "") + value, _ := strconv.Atoi(socketId) + if leaderId != uint16(value) { + consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId) + return + } + + // check block hash + if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 { + consensus.Log.Debug("[ERROR] Block hash doesn't match") return } - // sign the message + // TODO: verify aggregated commits with real schnor cosign verification // TODO: return the signature(response) to leader // For now, simply return the private key of this node. diff --git a/node/node_handler.go b/node/node_handler.go index 570c9044b..e0be379d7 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -150,6 +150,5 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { } func (node *Node) VerifyNewBlock(block *blockchain.Block) bool { - // TODO: fill in this function - return true + return node.UtxoPool.VerifyTransactions(block.Transactions) } \ No newline at end of file From 3b7edd4710cad7f181464086fb7c2c3c60de0a1b Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 20 Jun 2018 14:23:54 -0700 Subject: [PATCH 4/4] Add post-consensus processing of blockchain (add new block to blockchain) --- benchmark_main.go | 9 ++++++++- consensus/consensus.go | 4 ++++ consensus/consensus_leader.go | 10 ++++++++++ consensus/consensus_validator.go | 18 ++++++++++++++++-- node/node_handler.go | 21 +++++++++++---------- 5 files changed, 49 insertions(+), 13 deletions(-) diff --git a/benchmark_main.go b/benchmark_main.go index 5b17a7b0f..8679a25f4 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -76,7 +76,9 @@ func main() { consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) node := node.NewNode(&consensus) - consensus.BlockVerifier = node.VerifyNewBlock // Assign block verifier to the consensus + // Assign closure functions to the consensus object + consensus.BlockVerifier = node.VerifyNewBlock + consensus.OnConsensusDone = node.AddNewBlockToBlockchain // Temporary testing code, to be removed. node.AddMoreFakeTransactions() @@ -90,6 +92,11 @@ func main() { go func() { node.WaitForConsensusReady(consensus.ReadySignal) }() + } else { + // Node waiting to add new block to the blockchain + go func() { + node.WaitForConsensusReady(consensus.ReadySignal) + }() } node.StartServer(*port) diff --git a/consensus/consensus.go b/consensus/consensus.go index 8b57b52f6..6aad6d00d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -41,7 +41,11 @@ type Consensus struct { // Signal channel for starting a new consensus process ReadySignal chan int + // The verifier func passed from Node object BlockVerifier func(*blockchain.Block)bool + // The post-consensus processing func passed from Node object + // Called when consensus on a new block is done + OnConsensusDone func(*blockchain.Block) //// Network related fields msgCategory byte diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index ca5b75ba7..b0c56175d 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -287,6 +287,16 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "numOfNodes", len(consensus.validators)) consensus.ResetState() + + // TODO: reconstruct the whole block from header and transactions + // For now, we used the stored whole block in consensus.blockHeader + txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader)) + var blockHeaderObj blockchain.Block + err := txDecoder.Decode(&blockHeaderObj) + if err != nil { + consensus.Log.Debug("failed to construct the new block after consensus") + } + consensus.OnConsensusDone(&blockHeaderObj) consensus.consensusId++ // Send signal to Node so the new block can be added and new round of consensus can be triggered diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 8063c9470..442bc601f 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -94,13 +94,13 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { // check block header is valid txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader)) - - var blockHeaderObj blockchain.Block // TODO: separate header from block + var blockHeaderObj blockchain.Block // TODO: separate header from block. Right now, this blockHeader data is actually the whole block err := txDecoder.Decode(&blockHeaderObj) if err != nil { consensus.Log.Debug("[ERROR] Unparseable block header data") return } + consensus.blockHeader = blockHeader // check block hash if bytes.Compare(blockHash[:], blockHeaderObj.HashTransactions()[:]) != 0 || bytes.Compare(blockHeaderObj.Hash[:], blockHeaderObj.HashTransactions()[:]) != 0 { @@ -230,6 +230,20 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { // Set state to RESPONSE_DONE consensus.state = RESPONSE_DONE consensus.consensusId++ + + + // TODO: think about when validators know about the consensus is reached. + // For now, the blockchain is updated right here. + + // TODO: reconstruct the whole block from header and transactions + // For now, we used the stored whole block in consensus.blockHeader + txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader)) + var blockHeaderObj blockchain.Block + err := txDecoder.Decode(&blockHeaderObj) + if err != nil { + consensus.Log.Debug("failed to construct the new block after consensus") + } + consensus.OnConsensusDone(&blockHeaderObj) } // Construct the response message to send to leader (assumption the consensus data is already verified) diff --git a/node/node_handler.go b/node/node_handler.go index e0be379d7..6321eeda6 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -116,14 +116,6 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { for { // keep waiting for consensus ready <-readySignal //node.log.Debug("Adding new block", "currentChainSize", len(node.blockchain.Blocks), "numTxs", len(node.blockchain.GetLatestBlock().Transactions), "PrevHash", node.blockchain.GetLatestBlock().PrevBlockHash, "Hash", node.blockchain.GetLatestBlock().Hash) - if newBlock != nil { - // Consensus is done on the newBlock (in the previous round of consensus), add it to blockchain - node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) - // Update UTXO pool - node.UtxoPool.Update(node.transactionInConsensus) - // Clear transaction-in-consensus list - node.transactionInConsensus = []*blockchain.Transaction{} - } for { // Once we have more than 10 transactions pending we will try creating a new block if len(node.pendingTransactions) >= 10 { @@ -149,6 +141,15 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { } } -func (node *Node) VerifyNewBlock(block *blockchain.Block) bool { - return node.UtxoPool.VerifyTransactions(block.Transactions) +func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool { + return node.UtxoPool.VerifyTransactions(newBlock.Transactions) +} + +func (node *Node) AddNewBlockToBlockchain(newBlock *blockchain.Block) { + // Add it to blockchain + node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) + // Update UTXO pool + node.UtxoPool.Update(newBlock.Transactions) + // Clear transaction-in-consensus list + node.transactionInConsensus = []*blockchain.Transaction{} } \ No newline at end of file