update tx generator to send shard-specific txs

pull/14/head
Rongjian Lan 7 years ago
parent f97ba2b51a
commit cfb6c85c87
  1. 33
      aws-code/transaction_generator.go

@ -86,15 +86,22 @@ func getValidators(config string) []p2p.Peer {
return peerList return peerList
} }
func getLeaders(config *[][]string) []p2p.Peer { func getLeadersAndShardIds(config *[][]string) ([]p2p.Peer, []uint32) {
var peerList []p2p.Peer var peerList []p2p.Peer
var shardIds []uint32
for _, node := range *config { for _, node := range *config {
ip, port, status := node[0], node[1], node[2] ip, port, status, shardId := node[0], node[1], node[2], node[3]
if status == "leader" { if status == "leader" {
peerList = append(peerList, p2p.Peer{Ip: ip, Port: port}) peerList = append(peerList, p2p.Peer{Ip: ip, Port: port})
val, err := strconv.Atoi(shardId)
if err == nil {
shardIds = append(shardIds, uint32(val))
} else {
log.Error("[Generator] Error parsing the shard Id ", shardId)
} }
} }
return peerList }
return peerList, shardIds
} }
func readConfigFile(configFile string) [][]string { func readConfigFile(configFile string) [][]string {
@ -115,7 +122,7 @@ func main() {
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
flag.Parse() flag.Parse()
config := readConfigFile(*configFile) config := readConfigFile(*configFile)
leaders := getLeaders(&config) leaders, shardIds := getLeadersAndShardIds(&config)
// Setup a logger to stdout and log file. // Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder) logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder)
@ -127,8 +134,12 @@ func main() {
log.Root().SetHandler(h) log.Root().SetHandler(h)
// Testing node to mirror the node data in consensus // Testing node to mirror the node data in consensus
dataNode := node.NewNode(&consensus.Consensus{}) nodes := []node.Node{}
dataNode.AddMoreFakeTransactions(10000) for _, shardId := range shardIds {
node := node.NewNode(&consensus.Consensus{ShardID: shardId})
node.AddMoreFakeTransactions(10000)
nodes = append(nodes, node)
}
time.Sleep(10 * time.Second) // wait for nodes to be ready time.Sleep(10 * time.Second) // wait for nodes to be ready
@ -142,15 +153,17 @@ func main() {
} }
t = time.Now() t = time.Now()
txsToSend := getNewFakeTransactions(&dataNode, *numTxsPerBatch) for i, leader := range leaders {
txsToSend := getNewFakeTransactions(&nodes[i], *numTxsPerBatch)
msg := node.ConstructTransactionListMessage(txsToSend) msg := node.ConstructTransactionListMessage(txsToSend)
fmt.Printf("[Generator] Creating fake txs for leader took %s", time.Since(t)) fmt.Printf("[Generator] Creating fake txs for leader took %s", time.Since(t))
log.Debug("[Generator] Sending txs...", "numTxs", len(txsToSend)) log.Debug("[Generator] Sending txs ...", "leader", leader, "numTxs", len(txsToSend))
p2p.BroadcastMessage(leaders, msg) p2p.SendMessage(leader, msg)
// Update local utxo pool to mirror the utxo pool of a real node // Update local utxo pool to mirror the utxo pool of a real node
dataNode.UtxoPool.Update(txsToSend) nodes[i].UtxoPool.Update(txsToSend)
}
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
} }

Loading…
Cancel
Save