diff --git a/aws-code/transaction_generator.go b/aws-code/transaction_generator.go index 7d0ed59eb..b019f485c 100644 --- a/aws-code/transaction_generator.go +++ b/aws-code/transaction_generator.go @@ -12,15 +12,63 @@ import ( "os" "strings" "time" + "harmony-benchmark/consensus" + "encoding/hex" + "strconv" ) -func newRandTransaction() blockchain.Transaction { - txin := blockchain.TXInput{[]byte{}, rand.Intn(100), string(rand.Uint64())} - txout := blockchain.TXOutput{rand.Intn(100), string(rand.Uint64())} - tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}} - tx.SetID() +// Get numTxs number of Fake transactions based on the existing UtxoPool. +// The transactions are generated by going through the existing utxos and +// randomly select a subset of them as input to new transactions. The output +// address of the new transaction are randomly selected from 1 - 1000. +// NOTE: the genesis block should contain 1000 coinbase transactions adding +// value to each address in [1 - 1000]. See node.AddMoreFakeTransactions() +func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Transaction { - return tx + /* + address - [ + txId1 - [ + outputIndex1 - value1 + outputIndex2 - value2 + ] + txId2 - [ + outputIndex1 - value1 + outputIndex2 - value2 + ] + ] + */ + + var outputs []*blockchain.Transaction + count := 0 + countAll := 0 + + for address, txMap := range dataNode.UtxoPool.UtxoMap { + for txIdStr, utxoMap := range txMap { + txId, err := hex.DecodeString(txIdStr) + if err != nil { + continue + } + for index, value := range utxoMap { + countAll++ + if rand.Intn(100) <= 20 { // 20% sample rate to select UTXO to use for new transactions + // Spend the money of current UTXO to a random address in [1 - 1000] + txin := blockchain.TXInput{txId, index, address} + txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(1000))} + tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}} + tx.SetID() + + if count >= numTxs { + continue + } + outputs = append(outputs, &tx) + count++ + } + } + } + } + + log.Debug("UTXO", "poolSize", countAll, "numTxsToSend", numTxs) + return outputs } func getValidators(config string) []p2p.Peer { @@ -63,28 +111,35 @@ func readConfigFile(configFile string) [][]string { } func main() { + // Setup a stdout logger + h := log.CallerFileHandler(log.StdoutHandler) + log.Root().SetHandler(h) + configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") - numTxsPerBatch := flag.Int("num_txs_per_batch", 100, "number of transactions to send per message") + numTxsPerBatch := flag.Int("num_txs_per_batch", 1000, "number of transactions to send per message") flag.Parse() config := readConfigFile(*configFile) + // Testing node + dataNode := node.NewNode(&consensus.Consensus{}) + dataNode.AddMoreFakeTransactions() + start := time.Now() totalTime := 60.0 - txs := make([]blockchain.Transaction, *numTxsPerBatch) leaders := getLeaders(&config) + time.Sleep(5 * time.Second) // wait for nodes to run for true { t := time.Now() if t.Sub(start).Seconds() >= totalTime { fmt.Println(int(t.Sub(start)), start, totalTime) break } - for i := range txs { - txs[i] = newRandTransaction() - - } - msg := node.ConstructTransactionListMessage(txs) - log.Debug("[Generator] Sending txs to leader[s]", "numOfLeader", len(leaders)) + txsToSend := getNewFakeTransactions(&dataNode, *numTxsPerBatch) + msg := node.ConstructTransactionListMessage(txsToSend) + log.Debug("[Generator] Sending txs...", "numTxs", len(txsToSend)) p2p.BroadcastMessage(leaders, msg) + + dataNode.UtxoPool.Update(txsToSend) time.Sleep(1 * time.Second) // 10 transactions per second } msg := node.ConstructStopMessage() diff --git a/benchmark_main.go b/benchmark_main.go index 6183f0a6f..faa275210 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -76,6 +76,7 @@ func main() { consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) node := node.NewNode(&consensus) + node.AddMoreFakeTransactions() if consensus.IsLeader { // Let consensus run diff --git a/blockchain/utxopool.go b/blockchain/utxopool.go index 7960745fd..ce3456628 100644 --- a/blockchain/utxopool.go +++ b/blockchain/utxopool.go @@ -14,7 +14,20 @@ const ( type UTXOPool struct { // Mapping from address to a map of transaction id to a map of the index of output // array in that transaction to that balance. - utxo map[string]map[string]map[int]int + /* + The 3-d map's structure: + address - [ + txId1 - [ + outputIndex1 - value1 + outputIndex2 - value2 + ] + txId2 - [ + outputIndex1 - value1 + outputIndex2 - value2 + ] + ] + */ + UtxoMap map[string]map[string]map[int]int } // VerifyTransactions verifies if a list of transactions valid. @@ -43,7 +56,7 @@ func (utxoPool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool { spentTXOs[in.Address][inTxID][index] = true // Sum the balance up to the inTotal. - if val, ok := utxoPool.utxo[in.Address][inTxID][index]; ok { + if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok { inTotal += val } else { return false @@ -73,7 +86,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool { inTxID := hex.EncodeToString(in.TxID) index := in.TxOutputIndex // Check if the transaction with the addres is spent or not. - if val, ok := utxoPool.utxo[in.Address][inTxID][index]; ok { + if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok { inTotal += val } else { return false @@ -113,19 +126,19 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) { // Remove for _, in := range tx.TxInput { inTxID := hex.EncodeToString(in.TxID) - delete(utxoPool.utxo[in.Address][inTxID], in.TxOutputIndex) + delete(utxoPool.UtxoMap[in.Address][inTxID], in.TxOutputIndex) } // Update for index, out := range tx.TxOutput { - if _, ok := utxoPool.utxo[out.Address]; !ok { - utxoPool.utxo[out.Address] = make(map[string]map[int]int) - utxoPool.utxo[out.Address][txID] = make(map[int]int) + if _, ok := utxoPool.UtxoMap[out.Address]; !ok { + utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int) + utxoPool.UtxoMap[out.Address][txID] = make(map[int]int) } - if _, ok := utxoPool.utxo[out.Address][txID]; !ok { - utxoPool.utxo[out.Address][txID] = make(map[int]int) + if _, ok := utxoPool.UtxoMap[out.Address][txID]; !ok { + utxoPool.UtxoMap[out.Address][txID] = make(map[int]int) } - utxoPool.utxo[out.Address][txID][index] = out.Value + utxoPool.UtxoMap[out.Address][txID][index] = out.Value } } } @@ -149,7 +162,7 @@ func (utxoPool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool { return false } -// Update utxo balances with a list of new transactions. +// Update Utxo balances with a list of new transactions. func (utxoPool *UTXOPool) Update(transactions []*Transaction) { if utxoPool != nil { for _, tx := range transactions { @@ -158,38 +171,38 @@ func (utxoPool *UTXOPool) Update(transactions []*Transaction) { // Remove for _, in := range tx.TxInput { inTxID := hex.EncodeToString(in.TxID) - delete(utxoPool.utxo[in.Address][inTxID], in.TxOutputIndex) + delete(utxoPool.UtxoMap[in.Address][inTxID], in.TxOutputIndex) } // Update for index, out := range tx.TxOutput { - if _, ok := utxoPool.utxo[out.Address]; !ok { - utxoPool.utxo[out.Address] = make(map[string]map[int]int) - utxoPool.utxo[out.Address][curTxID] = make(map[int]int) + if _, ok := utxoPool.UtxoMap[out.Address]; !ok { + utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int) + utxoPool.UtxoMap[out.Address][curTxID] = make(map[int]int) } - if _, ok := utxoPool.utxo[out.Address][curTxID]; !ok { - utxoPool.utxo[out.Address][curTxID] = make(map[int]int) + if _, ok := utxoPool.UtxoMap[out.Address][curTxID]; !ok { + utxoPool.UtxoMap[out.Address][curTxID] = make(map[int]int) } - utxoPool.utxo[out.Address][curTxID][index] = out.Value + utxoPool.UtxoMap[out.Address][curTxID][index] = out.Value } } } } -// CreateUTXOPoolFromTransaction a utxo pool from a genesis transaction. +// CreateUTXOPoolFromTransaction a Utxo pool from a genesis transaction. func CreateUTXOPoolFromTransaction(tx *Transaction) *UTXOPool { var utxoPool UTXOPool txID := hex.EncodeToString(tx.ID) - utxoPool.utxo = make(map[string]map[string]map[int]int) + utxoPool.UtxoMap = make(map[string]map[string]map[int]int) for index, out := range tx.TxOutput { - utxoPool.utxo[out.Address] = make(map[string]map[int]int) - utxoPool.utxo[out.Address][txID] = make(map[int]int) - utxoPool.utxo[out.Address][txID][index] = out.Value + utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int) + utxoPool.UtxoMap[out.Address][txID] = make(map[int]int) + utxoPool.UtxoMap[out.Address][txID][index] = out.Value } return &utxoPool } -// CreateUTXOPoolFromGenesisBlockChain a utxo pool from a genesis blockchain. +// CreateUTXOPoolFromGenesisBlockChain a Utxo pool from a genesis blockchain. func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool { tx := bc.Blocks[0].Transactions[0] return CreateUTXOPoolFromTransaction(tx) @@ -211,7 +224,7 @@ func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transact // Used for debugging. func (utxoPool *UTXOPool) String() string { res := "" - for address, v1 := range utxoPool.utxo { + for address, v1 := range utxoPool.UtxoMap { for txid, v2 := range v1 { for index, value := range v2 { res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value) diff --git a/node/message.go b/node/message.go index 3910eb85c..f92d38157 100644 --- a/node/message.go +++ b/node/message.go @@ -21,12 +21,17 @@ const ( ) //ConstructTransactionListMessage constructs serialized transactions -func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte { +func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) byteBuffer.WriteByte(byte(common.TRANSACTION)) byteBuffer.WriteByte(byte(SEND)) encoder := gob.NewEncoder(byteBuffer) - encoder.Encode(transactions) + // Copy over the tx data + txs := make([]blockchain.Transaction, len(transactions)) + for i := range txs { + txs[i] = *transactions[i] + } + encoder.Encode(txs) return byteBuffer.Bytes() } diff --git a/node/node.go b/node/node.go index 88668acdb..7c9306f65 100644 --- a/node/node.go +++ b/node/node.go @@ -7,6 +7,7 @@ import ( "net" "os" "sync" + "strconv" ) var pendingTxMutex = &sync.Mutex{} @@ -18,7 +19,7 @@ type Node struct { pendingTransactions []*blockchain.Transaction transactionInConsensus []*blockchain.Transaction blockchain *blockchain.Blockchain - utxoPool *blockchain.UTXOPool + UtxoPool *blockchain.UTXOPool log log.Logger } @@ -30,7 +31,7 @@ func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) { func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction { pendingTxMutex.Lock() - selected, unselected := node.utxoPool.SelectTransactionsForNewBlock(node.pendingTransactions) + selected, unselected := node.UtxoPool.SelectTransactionsForNewBlock(node.pendingTransactions) node.pendingTransactions = unselected pendingTxMutex.Unlock() return selected @@ -38,6 +39,7 @@ func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction { // Start a server and process the request by a handler. func (node *Node) StartServer(port string) { + node.log.Debug("Starting server", "node", node) node.listenOnPort(port) } @@ -62,6 +64,18 @@ func (node *Node) String() string { return node.consensus.String() } + +// Testing code. Should be deleted for production +// Create in genesis block 1000 transactions assigning 1000 token to each address in [1 - 1000] +func (node *Node) AddMoreFakeTransactions() { + txs := make([]*blockchain.Transaction, 1000) + for i := range txs { + txs[i] = blockchain.NewCoinbaseTX(strconv.Itoa(i), "") + } + node.blockchain.Blocks[0].Transactions = append(node.blockchain.Blocks[0].Transactions, txs...) + node.UtxoPool.Update(txs) +} + // Create a new Node func NewNode(consensus *consensus.Consensus) Node { node := Node{} @@ -78,10 +92,9 @@ func NewNode(consensus *consensus.Consensus) Node { node.blockchain = genesisBlock // UTXO pool from Genesis block - node.utxoPool = blockchain.CreateUTXOPoolFromGenesisBlockChain(node.blockchain) + node.UtxoPool = blockchain.CreateUTXOPoolFromGenesisBlockChain(node.blockchain) // Logger node.log = node.consensus.Log - node.log.Debug("New node", "node", node) return node } diff --git a/node/node_handler.go b/node/node_handler.go index b7dd47364..31e68a291 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -123,14 +123,14 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { // create a new block newBlock := new(blockchain.Block) for { - if len(node.pendingTransactions) >= 100 { + if len(node.pendingTransactions) >= 10 { selectedTxs := node.getTransactionsForNewBlock() if len(selectedTxs) == 0 { node.log.Debug("No transactions is selected for consensus", "pendingTx", len(node.pendingTransactions)) } else { - node.log.Debug("Creating new block", "node", node) + node.log.Debug("Creating new block", "numTxs", len(selectedTxs), "pendingTxs", len(node.pendingTransactions)) + newBlock = blockchain.NewBlock(selectedTxs, []byte{}) - node.log.Debug("Num of Pending Tx", "count", len(node.pendingTransactions)) break } } diff --git a/node/node_test.go b/node/node_test.go index bacced83d..c5e1fbd59 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -28,7 +28,7 @@ func TestNewNewNode(test *testing.T) { test.Error("Coinbase TX is not initialized for the node") } - if node.utxoPool == nil { + if node.UtxoPool == nil { test.Error("Utxo pool is not initialized for the node") } }