From 83bfe9caca20b9299b0d093f5a4fa21d0c3b7409 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 4 Sep 2018 21:36:43 -0700 Subject: [PATCH] Change txgen to send txs in subset of utxos, and refactor the tx sending code --- client/txgen/main.go | 66 +++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/client/txgen/main.go b/client/txgen/main.go index 823574fb5..679b20df6 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -64,7 +64,7 @@ type TxInfo struct { // Returns: // all single-shard txs // all cross-shard txs -func generateSimulatedTransactions(batchCounter, batchNum int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) { +func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) { /* UTXO map structure: address - [ @@ -88,7 +88,7 @@ func generateSimulatedTransactions(batchCounter, batchNum int, shardID int, data UTXOLOOP: // Loop over all addresses for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { - if int(binary.BigEndian.Uint32(address[:]))%batchNum == batchCounter%batchNum { // Work on one subset of utxo at a time + if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time txInfo.address = address // Loop over all txIds for the address for txIdStr, utxoMap := range txMap { @@ -242,6 +242,7 @@ func main() { configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") + numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately") flag.Parse() // Read the configs @@ -304,7 +305,6 @@ func main() { go func() { clientNode.StartServer(clientPort) }() - } // Transaction generation process @@ -319,36 +319,11 @@ func main() { log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) break } - - allCrossTxs := []*blockchain.Transaction{} - // Generate simulated transactions - for i, leader := range leaders { - txs, crossTxs := generateSimulatedTransactions(batchCounter, 3, i, nodes) - allCrossTxs = append(allCrossTxs, crossTxs...) - - log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs)) - msg := proto_node.ConstructTransactionListMessage(txs) - p2p.SendMessage(leader, msg) - // Note cross shard txs are later sent in batch - } - - if len(allCrossTxs) > 0 { - log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs)) - msg := proto_node.ConstructTransactionListMessage(allCrossTxs) - p2p.BroadcastMessage(leaders, msg) - - // Put cross shard tx into a pending list waiting for proofs from leaders - if clientPort != "" { - clientNode.Client.PendingCrossTxsMutex.Lock() - for _, tx := range allCrossTxs { - clientNode.Client.PendingCrossTxs[tx.ID] = tx - } - clientNode.Client.PendingCrossTxsMutex.Unlock() - } + for shardId := 0; shardId < len(nodes); shardId++ { + constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort) } - - time.Sleep(5000 * time.Millisecond) // Send a batch of transactions periodically batchCounter++ + time.Sleep(1000 * time.Millisecond) } // Send a stop message to stop the nodes at the end @@ -356,3 +331,32 @@ func main() { peers := append(config.GetValidators(), leaders...) p2p.BroadcastMessage(peers, msg) } + +func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p.Peer, nodes []*node.Node, clientNode *node.Node, clientPort string) { + allCrossTxs := []*blockchain.Transaction{} + // Generate simulated transactions + leader := leaders[shardId] + + txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes) + allCrossTxs = append(allCrossTxs, crossTxs...) + + log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs)) + msg := proto_node.ConstructTransactionListMessage(txs) + p2p.SendMessage(leader, msg) + // Note cross shard txs are later sent in batch + + if len(allCrossTxs) > 0 { + log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs)) + msg := proto_node.ConstructTransactionListMessage(allCrossTxs) + p2p.BroadcastMessage(leaders, msg) + + // Put cross shard tx into a pending list waiting for proofs from leaders + if clientPort != "" { + clientNode.Client.PendingCrossTxsMutex.Lock() + for _, tx := range allCrossTxs { + clientNode.Client.PendingCrossTxs[tx.ID] = tx + } + clientNode.Client.PendingCrossTxsMutex.Unlock() + } + } +}