From cb549d9693d82912859e558af6cb1e9cce1669fe Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 19 Jun 2018 21:18:02 -0700 Subject: [PATCH] Add more comments to new code; make prevHash 32 byte array --- aws-code/transaction_generator.go | 14 ++++++++------ benchmark_main.go | 1 + blockchain/block.go | 6 +++--- blockchain/block_test.go | 2 +- blockchain/blockchain.go | 5 +++-- blockchain/blockchain_test.go | 2 +- consensus/consensus_leader.go | 2 ++ node/node.go | 16 +++++++++++++--- node/node_handler.go | 15 ++++++++++----- 9 files changed, 42 insertions(+), 21 deletions(-) diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index b019f485c..1724acb6d 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -24,8 +24,8 @@ import ( // NOTE: the genesis block should contain 1000 coinbase transactions adding // value to each address in [1 - 1000]. See node.AddMoreFakeTransactions() func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Transaction { - /* + UTXO map structure: address - [ txId1 - [ outputIndex1 - value1 @@ -37,7 +37,6 @@ func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Trans ] ] */ - var outputs []*blockchain.Transaction count := 0 countAll := 0 @@ -119,15 +118,15 @@ func main() { numTxsPerBatch := flag.Int("num_txs_per_batch", 1000, "number of transactions to send per message") flag.Parse() config := readConfigFile(*configFile) + leaders := getLeaders(&config) - // Testing node + // Testing node to mirror the node data in consensus dataNode := node.NewNode(&consensus.Consensus{}) dataNode.AddMoreFakeTransactions() start := time.Now() totalTime := 60.0 - leaders := getLeaders(&config) - time.Sleep(5 * time.Second) // wait for nodes to run + time.Sleep(3 * time.Second) // wait for nodes to be ready for true { t := time.Now() if t.Sub(start).Seconds() >= totalTime { @@ -139,9 +138,12 @@ func main() { log.Debug("[Generator] Sending txs...", "numTxs", len(txsToSend)) p2p.BroadcastMessage(leaders, msg) + // Update local utxo pool to mirror the utxo pool of a real node dataNode.UtxoPool.Update(txsToSend) - time.Sleep(1 * time.Second) // 10 transactions per second + time.Sleep(1 * time.Second) // Send a batch of transactions every second } + + // Send a stop message to stop the nodes at the end msg := node.ConstructStopMessage() peers := append(getValidators(*configFile), leaders...) p2p.BroadcastMessage(peers, msg) diff --git a/benchmark_main.go b/benchmark_main.go index faa275210..6a39407f4 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -76,6 +76,7 @@ func main() { consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) node := node.NewNode(&consensus) + // Temporary testing code, to be removed. node.AddMoreFakeTransactions() if consensus.IsLeader { diff --git a/blockchain/block.go b/blockchain/block.go index c10ed69b5..a51059402 100644 --- a/blockchain/block.go +++ b/blockchain/block.go @@ -13,7 +13,7 @@ import ( type Block struct { Timestamp int64 Transactions []*Transaction - PrevBlockHash []byte + PrevBlockHash [32]byte Hash [32]byte } @@ -63,7 +63,7 @@ func (b *Block) HashTransactions() []byte { } // NewBlock creates and returns a neew block. -func NewBlock(transactions []*Transaction, prevBlockHash []byte) *Block { +func NewBlock(transactions []*Transaction, prevBlockHash [32]byte) *Block { block := &Block{time.Now().Unix(), transactions, prevBlockHash, [32]byte{}} copy(block.Hash[:], block.HashTransactions()[:]) // TODO(Minh): the blockhash should be a hash of everything in the block @@ -72,5 +72,5 @@ func NewBlock(transactions []*Transaction, prevBlockHash []byte) *Block { // NewGenesisBlock creates and returns genesis Block. func NewGenesisBlock(coinbase *Transaction) *Block { - return NewBlock([]*Transaction{coinbase}, []byte{}) + return NewBlock([]*Transaction{coinbase}, [32]byte{}) } diff --git a/blockchain/block_test.go b/blockchain/block_test.go index 72e32fffe..2566cfcdd 100644 --- a/blockchain/block_test.go +++ b/blockchain/block_test.go @@ -19,7 +19,7 @@ func TestBlockSerialize(t *testing.T) { t.Errorf("Serialize or Deserialize incorrect at TimeStamp.") } - if bytes.Compare(block.PrevBlockHash, deserializedBlock.PrevBlockHash) != 0 { + if bytes.Compare(block.PrevBlockHash[:], deserializedBlock.PrevBlockHash[:]) != 0 { t.Errorf("Serialize or Deserialize incorrect at PrevBlockHash.") } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 2dc0cbc94..d3fb703cd 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -12,6 +12,7 @@ type Blockchain struct { const genesisCoinbaseData = "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks" +// Get the latest block at the end of the chain func (bc *Blockchain) GetLatestBlock() *Block{ if len(bc.Blocks) == 0 { return nil @@ -142,7 +143,7 @@ func (bc *Blockchain) NewUTXOTransaction(from, to string, amount int) *Transacti func (bc *Blockchain) AddNewUserTransfer(utxoPool *UTXOPool, from, to string, amount int) bool { tx := bc.NewUTXOTransaction(from, to, amount) if tx != nil { - newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash[:]) + newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash) if bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) { return true } @@ -153,7 +154,7 @@ func (bc *Blockchain) AddNewUserTransfer(utxoPool *UTXOPool, from, to string, am // VerifyNewBlockAndUpdate verifies if the new coming block is valid for the current blockchain. func (bc *Blockchain) VerifyNewBlockAndUpdate(utxopool *UTXOPool, block *Block) bool { length := len(bc.Blocks) - if bytes.Compare(block.PrevBlockHash, bc.Blocks[length-1].Hash[:]) != 0 { + if bytes.Compare(block.PrevBlockHash[:], bc.Blocks[length-1].Hash[:]) != 0 { return false } if block.Timestamp < bc.Blocks[length-1].Timestamp { diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index d6d21b64b..ed7c73652 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -68,7 +68,7 @@ func TestVerifyNewBlock(t *testing.T) { if tx == nil { t.Error("failed to create a new transaction.") } - newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash[:]) + newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash) if !bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) { t.Error("failed to add a new valid block.") diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 04dd9347d..3aaa72d9b 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -301,6 +301,8 @@ func (consensus *Consensus) processResponseMessage(payload []byte) { consensus.ResetState() consensus.consensusId++ + + // Send signal to Node so the new block can be added and new round of consensus can be triggered consensus.ReadySignal <- 1 } // TODO: composes new block and broadcast the new block to validators diff --git a/node/node.go b/node/node.go index 7c9306f65..881c5cd99 100644 --- a/node/node.go +++ b/node/node.go @@ -14,21 +14,32 @@ var pendingTxMutex = &sync.Mutex{} // A node represents a program (machine) participating in the network type Node struct { + // Consensus object containing all consensus related data (e.g. committee members, signatures, commits) consensus *consensus.Consensus + // The channel to receive new blocks from Node BlockChannel chan blockchain.Block + // All the transactions received but not yet processed for consensus pendingTransactions []*blockchain.Transaction + // The transactions selected into the new block and under consensus process transactionInConsensus []*blockchain.Transaction + // The blockchain for the shard where this node belongs blockchain *blockchain.Blockchain + // The corresponding UTXO pool of the current blockchain UtxoPool *blockchain.UTXOPool + + // Log utility log log.Logger } +// Add new transactions to the pending transaction list func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) { pendingTxMutex.Lock() node.pendingTransactions = append(node.pendingTransactions, newTxs...) pendingTxMutex.Unlock() } +// Take out a subset of valid transactions from the pending transaction list +// Note the pending transaction list will then contain the rest of the txs func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction { pendingTxMutex.Lock() selected, unselected := node.UtxoPool.SelectTransactionsForNewBlock(node.pendingTransactions) @@ -64,9 +75,8 @@ func (node *Node) String() string { return node.consensus.String() } - -// Testing code. Should be deleted for production -// Create in genesis block 1000 transactions assigning 1000 token to each address in [1 - 1000] +// [Testing code] Should be deleted for production +// Create in genesis block 1000 transactions which assign 1000 token to each address in [1 - 1000] func (node *Node) AddMoreFakeTransactions() { txs := make([]*blockchain.Transaction, 1000) for i := range txs { diff --git a/node/node_handler.go b/node/node_handler.go index 51f383844..fd50b34d3 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -123,29 +123,34 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { <-readySignal //node.log.Debug("Adding new block", "currentChainSize", len(node.blockchain.Blocks), "numTxs", len(node.blockchain.GetLatestBlock().Transactions), "PrevHash", node.blockchain.GetLatestBlock().PrevBlockHash, "Hash", node.blockchain.GetLatestBlock().Hash) if newBlock != nil { - // Consensus is done on the previous newBlock, add it to blockchain + // Consensus is done on the newBlock (in the previous round of consensus), add it to blockchain node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) // Update UTXO pool node.UtxoPool.Update(node.transactionInConsensus) - // Clear transaction in consensus slice + // Clear transaction-in-consensus list node.transactionInConsensus = []*blockchain.Transaction{} } for { + // Once we have more than 10 transactions pending we will try creating a new block if len(node.pendingTransactions) >= 10 { selectedTxs := node.getTransactionsForNewBlock() if len(selectedTxs) == 0 { - node.log.Debug("No transactions is selected for consensus", "pendingTx", len(node.pendingTransactions)) + node.log.Debug("No valid transactions exist", "pendingTx", len(node.pendingTransactions)) } else { node.log.Debug("Creating new block", "numTxs", len(selectedTxs), "pendingTxs", len(node.pendingTransactions), "currentChainSize", len(node.blockchain.Blocks)) node.transactionInConsensus = selectedTxs - newBlock = blockchain.NewBlock(selectedTxs, node.blockchain.GetLatestBlock().Hash[:]) + newBlock = blockchain.NewBlock(selectedTxs, node.blockchain.GetLatestBlock().Hash) break } } - time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block. + // If not enough transactions to run consensus, + // periodically check whether we have enough transactions to package into block. + time.Sleep(1 * time.Second) } + + // Send the new block to consensus so it can be confirmed. node.BlockChannel <- *newBlock } } \ No newline at end of file