package main import ( "bufio" "encoding/hex" "flag" "fmt" "harmony-benchmark/blockchain" "harmony-benchmark/client" "harmony-benchmark/consensus" "harmony-benchmark/log" "harmony-benchmark/node" "harmony-benchmark/p2p" "math/rand" "os" "strconv" "strings" "sync" "time" ) var utxoPoolMutex sync.Mutex // Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards. // The transactions are generated by going through the existing utxos and // randomly select a subset of them as the input for each new transaction. The output // address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses. // // When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard. // Similarly, generate another utxo output in that second shard. // // NOTE: the genesis block should contain N coinbase transactions which add // token (1000) to each address in [0 - N). See node.AddTestingAddresses() // // Params: // shardId - the shardId for current shard // dataNodes - nodes containing utxopools of all shards // maxNumTxs - the max number of txs to generate // crossShard - whether to generate cross shard txs // Returns: // all single-shard txs // all cross-shard txs func generateSimulatedTransactions(shardId int, dataNodes []*node.Node, maxNumTxs int, crossShard bool) ([]*blockchain.Transaction, []*blockchain.Transaction) { /* UTXO map structure: address - [ txId1 - [ outputIndex1 - value1 outputIndex2 - value2 ] txId2 - [ outputIndex1 - value1 outputIndex2 - value2 ] ] */ var txs []*blockchain.Transaction var crossTxs []*blockchain.Transaction txsCount := 0 utxoPoolMutex.Lock() UTXOLOOP: // Loop over all addresses for address, txMap := range dataNodes[shardId].UtxoPool.UtxoMap { // Loop over all txIds for the address for txIdStr, utxoMap := range txMap { // Parse TxId id, err := hex.DecodeString(txIdStr) if err != nil { continue } txId := [32]byte{} copy(txId[:], id[:]) // Loop over all utxos for the txId for index, value := range utxoMap { if txsCount >= maxNumTxs { break UTXOLOOP } randNum := rand.Intn(100) // 30% sample rate to select UTXO to use for new transactions if randNum < 30 { if crossShard && randNum < 10 { // 30% cross shard transactions: add another txinput from another shard // shard with neighboring Id crossShardId := (int(dataNodes[shardId].Consensus.ShardID) + 1) % len(dataNodes) crossShardNode := dataNodes[crossShardId] crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[address] // Get the cross shard utxo from another shard var crossTxin *blockchain.TXInput crossUtxoValue := 0 // Loop over utxos for the same address from the other shard and use the first utxo as the second cross tx input for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap { // Parse TxId id, err := hex.DecodeString(crossTxIdStr) if err != nil { continue } crossTxId := [32]byte{} copy(crossTxId[:], id[:]) for crossShardIndex, crossShardValue := range crossShardUtxos { crossUtxoValue = crossShardValue crossTxin = &blockchain.TXInput{crossTxId, crossShardIndex, address, uint32(crossShardId)} break } if crossTxin != nil { break } } // Add the utxo from current shard txin := blockchain.TXInput{txId, index, address, dataNodes[shardId].Consensus.ShardID} txInputs := []blockchain.TXInput{txin} // Add the utxo from the other shard, if any if crossTxin != nil { txInputs = append(txInputs, *crossTxin) } // Spend the utxo from the current shard to a random address in [0 - N) txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000)), dataNodes[shardId].Consensus.ShardID} txOutputs := []blockchain.TXOutput{txout} // Spend the utxo from the other shard, if any, to a random address in [0 - N) if crossTxin != nil { crossTxout := blockchain.TXOutput{crossUtxoValue, strconv.Itoa(rand.Intn(10000)), uint32(crossShardId)} txOutputs = append(txOutputs, crossTxout) } // Construct the new transaction tx := blockchain.Transaction{[32]byte{}, txInputs, txOutputs, nil} tx.SetID() crossTxs = append(crossTxs, &tx) txsCount++ } else { // Add the utxo as new tx input txin := blockchain.TXInput{txId, index, address, dataNodes[shardId].Consensus.ShardID} // Spend the utxo to a random address in [0 - N) txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000)), dataNodes[shardId].Consensus.ShardID} tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}, nil} tx.SetID() txs = append(txs, &tx) txsCount++ } } } } } utxoPoolMutex.Unlock() return txs, crossTxs } // Gets all the validator peers func getValidators(config string) []p2p.Peer { file, _ := os.Open(config) fscanner := bufio.NewScanner(file) var peerList []p2p.Peer for fscanner.Scan() { p := strings.Split(fscanner.Text(), " ") ip, port, status := p[0], p[1], p[2] if status == "leader" || status == "client" { continue } peer := p2p.Peer{Port: port, Ip: ip} peerList = append(peerList, peer) } return peerList } // Gets all the leader peers and corresponding shard Ids func getLeadersAndShardIds(config *[][]string) ([]p2p.Peer, []uint32) { var peerList []p2p.Peer var shardIds []uint32 for _, node := range *config { ip, port, status, shardId := node[0], node[1], node[2], node[3] if status == "leader" { peerList = append(peerList, p2p.Peer{Ip: ip, Port: port}) val, err := strconv.Atoi(shardId) if err == nil { shardIds = append(shardIds, uint32(val)) } else { log.Error("[Generator] Error parsing the shard Id ", shardId) } } } return peerList, shardIds } // Parse the config file and return a 2d array containing the file data func readConfigFile(configFile string) [][]string { file, _ := os.Open(configFile) fscanner := bufio.NewScanner(file) result := [][]string{} for fscanner.Scan() { p := strings.Split(fscanner.Text(), " ") result = append(result, p) } return result } // Gets the port of the client node in the config func getClientPort(config *[][]string) string { for _, node := range *config { _, port, status, _ := node[0], node[1], node[2], node[3] if status == "client" { return port } } return "" } // A utility func that counts the total number of utxos in a pool. func countNumOfUtxos(utxoPool *blockchain.UTXOPool) int { countAll := 0 for _, utxoMap := range utxoPool.UtxoMap { for txIdStr, val := range utxoMap { _ = val id, err := hex.DecodeString(txIdStr) if err != nil { continue } txId := [32]byte{} copy(txId[:], id[:]) for _, utxo := range val { _ = utxo countAll++ } } } return countAll } func main() { configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 10000, "number of transactions to send per message") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") flag.Parse() // Read the configs config := readConfigFile(*configFile) leaders, shardIds := getLeadersAndShardIds(&config) // Do cross shard tx if there are more than one shard crossShard := len(shardIds) > 1 // TODO(Richard): refactor this chuck to a single method // Setup a logger to stdout and log file. logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder) h := log.MultiHandler( log.StdoutHandler, log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file // log.Must.NetHandler("tcp", ":3000", log.JSONFormat()) // Log to remote ) log.Root().SetHandler(h) // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for _, shardId := range shardIds { node := node.NewNode(&consensus.Consensus{ShardID: shardId}) // Assign many fake addresses so we have enough address to place with at first node.AddTestingAddresses(10000) nodes = append(nodes, &node) } // Client/txgenerator server node setup clientPort := getClientPort(&config) consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) clientNode := node.NewNode(&consensusObj) if clientPort != "" { clientNode.Client = client.NewClient(&leaders) // This func is used to update the client's utxopool when new blocks are received from the leaders updateBlocksFunc := func(blocks []*blockchain.Block) { log.Debug("Received new block from leader", "len", len(blocks)) for _, block := range blocks { for _, node := range nodes { if node.Consensus.ShardID == block.ShardId { log.Debug("Adding block from leader", "shardId", block.ShardId) // Add it to blockchain utxoPoolMutex.Lock() node.AddNewBlock(block) utxoPoolMutex.Unlock() } else { continue } } } } clientNode.Client.UpdateBlocks = updateBlocksFunc // Start the client server to listen to leader's message go func() { clientNode.StartServer(clientPort) }() } // Transaction generation process time.Sleep(10 * time.Second) // wait for nodes to be ready start := time.Now() totalTime := 60.0 for true { t := time.Now() if t.Sub(start).Seconds() >= totalTime { log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) break } allCrossTxs := []*blockchain.Transaction{} // Generate simulated transactions for i, leader := range leaders { txs, crossTxs := generateSimulatedTransactions(i, nodes, *maxNumTxsPerBatch, crossShard) allCrossTxs = append(allCrossTxs, crossTxs...) log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs)) msg := node.ConstructTransactionListMessage(txs) p2p.SendMessage(leader, msg) // Note cross shard txs are later sent in batch } if len(allCrossTxs) > 0 { log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs)) msg := node.ConstructTransactionListMessage(allCrossTxs) p2p.BroadcastMessage(leaders, msg) // Put cross shard tx into a pending list waiting for proofs from leaders if clientPort != "" { clientNode.Client.PendingCrossTxsMutex.Lock() for _, tx := range allCrossTxs { clientNode.Client.PendingCrossTxs[tx.ID] = tx } clientNode.Client.PendingCrossTxsMutex.Unlock() } } time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically } // Send a stop message to stop the nodes at the end msg := node.ConstructStopMessage() peers := append(getValidators(*configFile), leaders...) p2p.BroadcastMessage(peers, msg) }