Change txgen to send txs in subset of utxos, and refactor the tx sending code

pull/69/head
Rongjian Lan 6 years ago
parent f80196e97d
commit 83bfe9caca
  1. 66
      client/txgen/main.go

@ -64,7 +64,7 @@ type TxInfo struct {
// Returns: // Returns:
// all single-shard txs // all single-shard txs
// all cross-shard txs // all cross-shard txs
func generateSimulatedTransactions(batchCounter, batchNum int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) { func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/* /*
UTXO map structure: UTXO map structure:
address - [ address - [
@ -88,7 +88,7 @@ func generateSimulatedTransactions(batchCounter, batchNum int, shardID int, data
UTXOLOOP: UTXOLOOP:
// Loop over all addresses // Loop over all addresses
for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap {
if int(binary.BigEndian.Uint32(address[:]))%batchNum == batchCounter%batchNum { // Work on one subset of utxo at a time if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time
txInfo.address = address txInfo.address = address
// Loop over all txIds for the address // Loop over all txIds for the address
for txIdStr, utxoMap := range txMap { for txIdStr, utxoMap := range txMap {
@ -242,6 +242,7 @@ func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately")
flag.Parse() flag.Parse()
// Read the configs // Read the configs
@ -304,7 +305,6 @@ func main() {
go func() { go func() {
clientNode.StartServer(clientPort) clientNode.StartServer(clientPort)
}() }()
} }
// Transaction generation process // Transaction generation process
@ -319,36 +319,11 @@ func main() {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break break
} }
for shardId := 0; shardId < len(nodes); shardId++ {
allCrossTxs := []*blockchain.Transaction{} constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort)
// Generate simulated transactions
for i, leader := range leaders {
txs, crossTxs := generateSimulatedTransactions(batchCounter, 3, i, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
}
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
} }
time.Sleep(5000 * time.Millisecond) // Send a batch of transactions periodically
batchCounter++ batchCounter++
time.Sleep(1000 * time.Millisecond)
} }
// Send a stop message to stop the nodes at the end // Send a stop message to stop the nodes at the end
@ -356,3 +331,32 @@ func main() {
peers := append(config.GetValidators(), leaders...) peers := append(config.GetValidators(), leaders...)
p2p.BroadcastMessage(peers, msg) p2p.BroadcastMessage(peers, msg)
} }
func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p.Peer, nodes []*node.Node, clientNode *node.Node, clientPort string) {
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
leader := leaders[shardId]
txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
}
}

Loading…
Cancel
Save