The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
woop/aws-code/transaction_generator.go

262 lines
8.1 KiB

package main
import (
"bufio"
"encoding/hex"
"flag"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/client"
"harmony-benchmark/consensus"
"harmony-benchmark/log"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
"math/rand"
"os"
"strconv"
"strings"
"time"
)
// Get numTxs number of Fake transactions based on the existing UtxoPool.
// The transactions are generated by going through the existing utxos and
// randomly select a subset of them as input to new transactions. The output
// address of the new transaction are randomly selected from 1 - 1000.
// NOTE: the genesis block should contain 1000 coinbase transactions adding
// value to each address in [1 - 1000]. See node.AddMoreFakeTransactions()
func getNewFakeTransactions(shardId int, dataNodes *[]node.Node, numTxs int, numShards int, crossShard bool) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/*
UTXO map structure:
address - [
txId1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txId2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
var txs []*blockchain.Transaction
var crossTxs []*blockchain.Transaction
count := 0
countAll := 0
for address, txMap := range (*dataNodes)[shardId].UtxoPool.UtxoMap {
for txIdStr, utxoMap := range txMap {
id, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
txId := [32]byte{}
copy(txId[:], id[:])
for index, value := range utxoMap {
countAll++
randNum := rand.Intn(100)
if randNum < 30 { // 30% sample rate to select UTXO to use for new transactions
// Spend the money of current UTXO to a random address in [1 - 1000]
if randNum < 10 && crossShard { // 30% cross shard transactions: add another txinput from another shard
// Get the cross shard utxo from another shard
crossShardId := (int((*dataNodes)[shardId].Consensus.ShardID) + 1) % numShards // shard with neighboring Id
crossShardNode := (*dataNodes)[crossShardId]
crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[address]
crossTxin := new(blockchain.TXInput)
crossValue := 0
for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap {
id, err := hex.DecodeString(crossTxIdStr)
if err != nil {
continue
}
crossTxId := [32]byte{}
copy(crossTxId[:], id[:])
for crossShardIndex, crossShardValue := range crossShardUtxos {
crossValue = crossShardValue
crossTxin = &blockchain.TXInput{crossTxId, crossShardIndex, address, uint32(crossShardId)}
}
}
// Fill in the utxo from current shard
txin := blockchain.TXInput{txId, index, address, (*dataNodes)[shardId].Consensus.ShardID}
txInputs := []blockchain.TXInput{txin}
if crossTxin != nil {
txInputs = append(txInputs, *crossTxin)
}
txout := blockchain.TXOutput{value + crossValue, strconv.Itoa(rand.Intn(10000))}
tx := blockchain.Transaction{[32]byte{}, txInputs, []blockchain.TXOutput{txout}}
tx.SetID()
if count >= numTxs {
continue
}
crossTxs = append(crossTxs, &tx)
count++
} else {
txin := blockchain.TXInput{txId, index, address, (*dataNodes)[shardId].Consensus.ShardID}
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))}
tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx.SetID()
if count >= numTxs {
continue
}
txs = append(txs, &tx)
count++
}
}
}
}
}
log.Debug("UTXO", "poolSize", countAll, "numTxsToSend", numTxs)
return txs, crossTxs
}
func getValidators(config string) []p2p.Peer {
file, _ := os.Open(config)
fscanner := bufio.NewScanner(file)
var peerList []p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" || status == "client" {
continue
}
peer := p2p.Peer{Port: port, Ip: ip}
peerList = append(peerList, peer)
}
return peerList
}
func getLeadersAndShardIds(config *[][]string) ([]p2p.Peer, []uint32) {
var peerList []p2p.Peer
var shardIds []uint32
for _, node := range *config {
ip, port, status, shardId := node[0], node[1], node[2], node[3]
if status == "leader" {
peerList = append(peerList, p2p.Peer{Ip: ip, Port: port})
val, err := strconv.Atoi(shardId)
if err == nil {
shardIds = append(shardIds, uint32(val))
} else {
log.Error("[Generator] Error parsing the shard Id ", shardId)
}
}
}
return peerList, shardIds
}
func readConfigFile(configFile string) [][]string {
file, _ := os.Open(configFile)
fscanner := bufio.NewScanner(file)
result := [][]string{}
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
result = append(result, p)
}
return result
}
func getClientPort(config *[][]string) string {
for _, node := range *config {
_, port, status, _ := node[0], node[1], node[2], node[3]
if status == "client" {
return port
}
}
return ""
}
func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
numTxsPerBatch := flag.Int("num_txs_per_batch", 10000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
crossShard := flag.Bool("cross_shard", false, "whether to send cross shard txs")
flag.Parse()
config := readConfigFile(*configFile)
leaders, shardIds := getLeadersAndShardIds(&config)
// Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder)
h := log.MultiHandler(
log.Must.FileHandler(logFileName, log.LogfmtFormat()),
log.StdoutHandler)
// In cases where you just want a stdout logger, use the following one instead.
// h := log.CallerFileHandler(log.StdoutHandler)
log.Root().SetHandler(h)
// Client server setup
clientPort := getClientPort(&config)
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.NewNode(&consensusObj)
if clientPort != "" {
clientNode.Client = client.NewClient()
go func() {
clientNode.StartServer(clientPort)
}()
}
// Testing node to mirror the node data in consensus
nodes := []node.Node{}
for _, shardId := range shardIds {
node := node.NewNode(&consensus.Consensus{ShardID: shardId})
node.AddMoreFakeTransactions(10000)
nodes = append(nodes, node)
}
time.Sleep(10 * time.Second) // wait for nodes to be ready
start := time.Now()
totalTime := 60.0
for true {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
fmt.Println(int(t.Sub(start)), start, totalTime)
break
}
t = time.Now()
allCrossTxs := []*blockchain.Transaction{}
for i, leader := range leaders {
txs, crossTxs := getNewFakeTransactions(i, &nodes, *numTxsPerBatch, len(shardIds), *crossShard)
allCrossTxs = append(allCrossTxs, crossTxs...)
msg := node.ConstructTransactionListMessage(txs)
fmt.Printf("[Generator] Creating fake txs for leader took %s", time.Since(t))
log.Debug("[Generator] Sending txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
p2p.SendMessage(leader, msg)
// Update local utxo pool to mirror the utxo pool of a real node (within-shard tx)
nodes[i].UtxoPool.Update(txs)
}
msg := node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
for _, node := range nodes {
// Update local utxo pool to mirror the utxo pool of a real node (cross-shard tx)
node.UtxoPool.Update(allCrossTxs)
}
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = &client.CrossShardTxAndProofs{Transaction: *tx}
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
}
// Send a stop message to stop the nodes at the end
msg := node.ConstructStopMessage()
peers := append(getValidators(*configFile), leaders...)
p2p.BroadcastMessage(peers, msg)
}