package main import ( "bufio" "encoding/hex" "flag" "fmt" "harmony-benchmark/blockchain" "harmony-benchmark/client" "harmony-benchmark/consensus" "harmony-benchmark/log" "harmony-benchmark/node" "harmony-benchmark/p2p" "math/rand" "os" "strconv" "strings" "time" ) // Get numTxs number of Fake transactions based on the existing UtxoPool. // The transactions are generated by going through the existing utxos and // randomly select a subset of them as input to new transactions. The output // address of the new transaction are randomly selected from 1 - 1000. // NOTE: the genesis block should contain 1000 coinbase transactions adding // value to each address in [1 - 1000]. See node.AddMoreFakeTransactions() func getNewFakeTransactions(shardId int, dataNodes *[]node.Node, numTxs int, numShards int, crossShard bool) ([]*blockchain.Transaction, []*blockchain.Transaction) { /* UTXO map structure: address - [ txId1 - [ outputIndex1 - value1 outputIndex2 - value2 ] txId2 - [ outputIndex1 - value1 outputIndex2 - value2 ] ] */ var txs []*blockchain.Transaction var crossTxs []*blockchain.Transaction count := 0 countAll := 0 for address, txMap := range (*dataNodes)[shardId].UtxoPool.UtxoMap { for txIdStr, utxoMap := range txMap { id, err := hex.DecodeString(txIdStr) if err != nil { continue } txId := [32]byte{} copy(txId[:], id[:]) for index, value := range utxoMap { countAll++ randNum := rand.Intn(100) if randNum < 30 { // 30% sample rate to select UTXO to use for new transactions // Spend the money of current UTXO to a random address in [1 - 1000] if randNum < 10 && crossShard { // 30% cross shard transactions: add another txinput from another shard // Get the cross shard utxo from another shard crossShardId := (int((*dataNodes)[shardId].Consensus.ShardID) + 1) % numShards // shard with neighboring Id crossShardNode := (*dataNodes)[crossShardId] crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[address] var crossTxin *blockchain.TXInput crossValue := 0 for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap { id, err := hex.DecodeString(crossTxIdStr) if err != nil { continue } crossTxId := [32]byte{} copy(crossTxId[:], id[:]) for crossShardIndex, crossShardValue := range crossShardUtxos { crossValue = crossShardValue crossTxin = &blockchain.TXInput{crossTxId, crossShardIndex, address, uint32(crossShardId)} break } if crossTxin != nil { break } } // Fill in the utxo from current shard txin := blockchain.TXInput{txId, index, address, (*dataNodes)[shardId].Consensus.ShardID} txInputs := []blockchain.TXInput{txin} if crossTxin != nil { txInputs = append(txInputs, *crossTxin) } txout := blockchain.TXOutput{value + crossValue, strconv.Itoa(rand.Intn(10000)), (*dataNodes)[shardId].Consensus.ShardID} tx := blockchain.Transaction{[32]byte{}, txInputs, []blockchain.TXOutput{txout}, nil} tx.SetID() if count >= numTxs { continue } crossTxs = append(crossTxs, &tx) count++ } else { txin := blockchain.TXInput{txId, index, address, (*dataNodes)[shardId].Consensus.ShardID} txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000)), (*dataNodes)[shardId].Consensus.ShardID} tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}, nil} tx.SetID() if count >= numTxs { continue } txs = append(txs, &tx) count++ } } } } } log.Debug("UTXO", "poolSize", countAll, "numTxsToSend", numTxs) return txs, crossTxs } func getValidators(config string) []p2p.Peer { file, _ := os.Open(config) fscanner := bufio.NewScanner(file) var peerList []p2p.Peer for fscanner.Scan() { p := strings.Split(fscanner.Text(), " ") ip, port, status := p[0], p[1], p[2] if status == "leader" || status == "client" { continue } peer := p2p.Peer{Port: port, Ip: ip} peerList = append(peerList, peer) } return peerList } func getLeadersAndShardIds(config *[][]string) ([]p2p.Peer, []uint32) { var peerList []p2p.Peer var shardIds []uint32 for _, node := range *config { ip, port, status, shardId := node[0], node[1], node[2], node[3] if status == "leader" { peerList = append(peerList, p2p.Peer{Ip: ip, Port: port}) val, err := strconv.Atoi(shardId) if err == nil { shardIds = append(shardIds, uint32(val)) } else { log.Error("[Generator] Error parsing the shard Id ", shardId) } } } return peerList, shardIds } func readConfigFile(configFile string) [][]string { file, _ := os.Open(configFile) fscanner := bufio.NewScanner(file) result := [][]string{} for fscanner.Scan() { p := strings.Split(fscanner.Text(), " ") result = append(result, p) } return result } func getClientPort(config *[][]string) string { for _, node := range *config { _, port, status, _ := node[0], node[1], node[2], node[3] if status == "client" { return port } } return "" } func main() { configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") numTxsPerBatch := flag.Int("num_txs_per_batch", 10000, "number of transactions to send per message") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") crossShard := flag.Bool("cross_shard", true, "whether to send cross shard txs") flag.Parse() config := readConfigFile(*configFile) leaders, shardIds := getLeadersAndShardIds(&config) // Setup a logger to stdout and log file. logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder) h := log.MultiHandler( log.Must.FileHandler(logFileName, log.LogfmtFormat()), log.StdoutHandler) // In cases where you just want a stdout logger, use the following one instead. // h := log.CallerFileHandler(log.StdoutHandler) log.Root().SetHandler(h) // Client server setup clientPort := getClientPort(&config) consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) clientNode := node.NewNode(&consensusObj) if clientPort != "" { clientNode.Client = client.NewClient(&leaders) go func() { clientNode.StartServer(clientPort) }() } // Testing node to mirror the node data in consensus nodes := []node.Node{} for _, shardId := range shardIds { node := node.NewNode(&consensus.Consensus{ShardID: shardId}) node.AddMoreFakeTransactions(10000) nodes = append(nodes, node) } time.Sleep(10 * time.Second) // wait for nodes to be ready start := time.Now() totalTime := 60.0 for true { t := time.Now() if t.Sub(start).Seconds() >= totalTime { log.Debug("TIME OUT", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) break } t = time.Now() allCrossTxs := []*blockchain.Transaction{} for i, leader := range leaders { txs, crossTxs := getNewFakeTransactions(i, &nodes, *numTxsPerBatch, len(shardIds), *crossShard) allCrossTxs = append(allCrossTxs, crossTxs...) msg := node.ConstructTransactionListMessage(txs) fmt.Printf("[Generator] Creating fake txs for leader took %s", time.Since(t)) log.Debug("[Generator] Sending txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs)) p2p.SendMessage(leader, msg) // Update local utxo pool to mirror the utxo pool of a real node (within-shard tx) nodes[i].UtxoPool.Update(txs) } msg := node.ConstructTransactionListMessage(allCrossTxs) p2p.BroadcastMessage(leaders, msg) for _, node := range nodes { // Update local utxo pool to mirror the utxo pool of a real node (cross-shard tx) node.UtxoPool.Update(allCrossTxs) } clientNode.Client.PendingCrossTxsMutex.Lock() for _, tx := range allCrossTxs { //fmt.Printf("CROSS SHARD TX: %s", tx.String()) clientNode.Client.PendingCrossTxs[tx.ID] = tx } clientNode.Client.PendingCrossTxsMutex.Unlock() time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically } // Send a stop message to stop the nodes at the end msg := node.ConstructStopMessage() peers := append(getValidators(*configFile), leaders...) p2p.BroadcastMessage(peers, msg) }