diff --git a/benchmark_main.go b/benchmark_main.go index 5b17a7b0f..8679a25f4 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -76,7 +76,9 @@ func main() { consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) node := node.NewNode(&consensus) - consensus.BlockVerifier = node.VerifyNewBlock // Assign block verifier to the consensus + // Assign closure functions to the consensus object + consensus.BlockVerifier = node.VerifyNewBlock + consensus.OnConsensusDone = node.AddNewBlockToBlockchain // Temporary testing code, to be removed. node.AddMoreFakeTransactions() @@ -90,6 +92,11 @@ func main() { go func() { node.WaitForConsensusReady(consensus.ReadySignal) }() + } else { + // Node waiting to add new block to the blockchain + go func() { + node.WaitForConsensusReady(consensus.ReadySignal) + }() } node.StartServer(*port) diff --git a/consensus/consensus.go b/consensus/consensus.go index 8b57b52f6..6aad6d00d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -41,7 +41,11 @@ type Consensus struct { // Signal channel for starting a new consensus process ReadySignal chan int + // The verifier func passed from Node object BlockVerifier func(*blockchain.Block)bool + // The post-consensus processing func passed from Node object + // Called when consensus on a new block is done + OnConsensusDone func(*blockchain.Block) //// Network related fields msgCategory byte diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index ca5b75ba7..b0c56175d 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -287,6 +287,16 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "numOfNodes", len(consensus.validators)) consensus.ResetState() + + // TODO: reconstruct the whole block from header and transactions + // For now, we used the stored whole block in consensus.blockHeader + txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader)) + var blockHeaderObj blockchain.Block + err := txDecoder.Decode(&blockHeaderObj) + if err != nil { + consensus.Log.Debug("failed to construct the new block after consensus") + } + consensus.OnConsensusDone(&blockHeaderObj) consensus.consensusId++ // Send signal to Node so the new block can be added and new round of consensus can be triggered diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 8063c9470..442bc601f 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -94,13 +94,13 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { // check block header is valid txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader)) - - var blockHeaderObj blockchain.Block // TODO: separate header from block + var blockHeaderObj blockchain.Block // TODO: separate header from block. Right now, this blockHeader data is actually the whole block err := txDecoder.Decode(&blockHeaderObj) if err != nil { consensus.Log.Debug("[ERROR] Unparseable block header data") return } + consensus.blockHeader = blockHeader // check block hash if bytes.Compare(blockHash[:], blockHeaderObj.HashTransactions()[:]) != 0 || bytes.Compare(blockHeaderObj.Hash[:], blockHeaderObj.HashTransactions()[:]) != 0 { @@ -230,6 +230,20 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) { // Set state to RESPONSE_DONE consensus.state = RESPONSE_DONE consensus.consensusId++ + + + // TODO: think about when validators know about the consensus is reached. + // For now, the blockchain is updated right here. + + // TODO: reconstruct the whole block from header and transactions + // For now, we used the stored whole block in consensus.blockHeader + txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader)) + var blockHeaderObj blockchain.Block + err := txDecoder.Decode(&blockHeaderObj) + if err != nil { + consensus.Log.Debug("failed to construct the new block after consensus") + } + consensus.OnConsensusDone(&blockHeaderObj) } // Construct the response message to send to leader (assumption the consensus data is already verified) diff --git a/node/node_handler.go b/node/node_handler.go index e0be379d7..6321eeda6 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -116,14 +116,6 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { for { // keep waiting for consensus ready <-readySignal //node.log.Debug("Adding new block", "currentChainSize", len(node.blockchain.Blocks), "numTxs", len(node.blockchain.GetLatestBlock().Transactions), "PrevHash", node.blockchain.GetLatestBlock().PrevBlockHash, "Hash", node.blockchain.GetLatestBlock().Hash) - if newBlock != nil { - // Consensus is done on the newBlock (in the previous round of consensus), add it to blockchain - node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) - // Update UTXO pool - node.UtxoPool.Update(node.transactionInConsensus) - // Clear transaction-in-consensus list - node.transactionInConsensus = []*blockchain.Transaction{} - } for { // Once we have more than 10 transactions pending we will try creating a new block if len(node.pendingTransactions) >= 10 { @@ -149,6 +141,15 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { } } -func (node *Node) VerifyNewBlock(block *blockchain.Block) bool { - return node.UtxoPool.VerifyTransactions(block.Transactions) +func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool { + return node.UtxoPool.VerifyTransactions(newBlock.Transactions) +} + +func (node *Node) AddNewBlockToBlockchain(newBlock *blockchain.Block) { + // Add it to blockchain + node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) + // Update UTXO pool + node.UtxoPool.Update(newBlock.Transactions) + // Clear transaction-in-consensus list + node.transactionInConsensus = []*blockchain.Transaction{} } \ No newline at end of file