Bring back the accidentally ignored txgen folder and change it to txgenerator to avoid the gitignore mistake
parent
c8627fa32a
commit
383a8406a5
@ -0,0 +1,355 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"bufio" |
||||
"encoding/hex" |
||||
"flag" |
||||
"fmt" |
||||
"harmony-benchmark/blockchain" |
||||
"harmony-benchmark/client" |
||||
"harmony-benchmark/consensus" |
||||
"harmony-benchmark/log" |
||||
"harmony-benchmark/node" |
||||
"harmony-benchmark/p2p" |
||||
"math/rand" |
||||
"os" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
var utxoPoolMutex sync.Mutex |
||||
|
||||
// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards.
|
||||
// The transactions are generated by going through the existing utxos and
|
||||
// randomly select a subset of them as the input for each new transaction. The output
|
||||
// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses.
|
||||
//
|
||||
// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard.
|
||||
// Similarly, generate another utxo output in that second shard.
|
||||
//
|
||||
// NOTE: the genesis block should contain N coinbase transactions which add
|
||||
// token (1000) to each address in [0 - N). See node.AddTestingAddresses()
|
||||
//
|
||||
// Params:
|
||||
// shardId - the shardId for current shard
|
||||
// dataNodes - nodes containing utxopools of all shards
|
||||
// maxNumTxs - the max number of txs to generate
|
||||
// crossShard - whether to generate cross shard txs
|
||||
// Returns:
|
||||
// all single-shard txs
|
||||
// all cross-shard txs
|
||||
func generateSimulatedTransactions(shardId int, dataNodes []*node.Node, maxNumTxs int, crossShard bool) ([]*blockchain.Transaction, []*blockchain.Transaction) { |
||||
/* |
||||
UTXO map structure: |
||||
address - [ |
||||
txId1 - [ |
||||
outputIndex1 - value1 |
||||
outputIndex2 - value2 |
||||
] |
||||
txId2 - [ |
||||
outputIndex1 - value1 |
||||
outputIndex2 - value2 |
||||
] |
||||
] |
||||
*/ |
||||
var txs []*blockchain.Transaction |
||||
var crossTxs []*blockchain.Transaction |
||||
txsCount := 0 |
||||
|
||||
utxoPoolMutex.Lock() |
||||
|
||||
UTXOLOOP: |
||||
// Loop over all addresses
|
||||
for address, txMap := range dataNodes[shardId].UtxoPool.UtxoMap { |
||||
// Loop over all txIds for the address
|
||||
for txIdStr, utxoMap := range txMap { |
||||
// Parse TxId
|
||||
id, err := hex.DecodeString(txIdStr) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
txId := [32]byte{} |
||||
copy(txId[:], id[:]) |
||||
|
||||
// Loop over all utxos for the txId
|
||||
for index, value := range utxoMap { |
||||
if txsCount >= maxNumTxs { |
||||
break UTXOLOOP |
||||
} |
||||
randNum := rand.Intn(100) |
||||
|
||||
// 30% sample rate to select UTXO to use for new transactions
|
||||
if randNum < 30 { |
||||
if crossShard && randNum < 10 { // 30% cross shard transactions: add another txinput from another shard
|
||||
// shard with neighboring Id
|
||||
crossShardId := (int(dataNodes[shardId].Consensus.ShardID) + 1) % len(dataNodes) |
||||
|
||||
crossShardNode := dataNodes[crossShardId] |
||||
crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[address] |
||||
|
||||
// Get the cross shard utxo from another shard
|
||||
var crossTxin *blockchain.TXInput |
||||
crossUtxoValue := 0 |
||||
// Loop over utxos for the same address from the other shard and use the first utxo as the second cross tx input
|
||||
for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap { |
||||
// Parse TxId
|
||||
id, err := hex.DecodeString(crossTxIdStr) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
crossTxId := [32]byte{} |
||||
copy(crossTxId[:], id[:]) |
||||
|
||||
for crossShardIndex, crossShardValue := range crossShardUtxos { |
||||
crossUtxoValue = crossShardValue |
||||
crossTxin = &blockchain.TXInput{crossTxId, crossShardIndex, address, uint32(crossShardId)} |
||||
break |
||||
} |
||||
if crossTxin != nil { |
||||
break |
||||
} |
||||
} |
||||
|
||||
// Add the utxo from current shard
|
||||
txin := blockchain.TXInput{txId, index, address, dataNodes[shardId].Consensus.ShardID} |
||||
txInputs := []blockchain.TXInput{txin} |
||||
|
||||
// Add the utxo from the other shard, if any
|
||||
if crossTxin != nil { |
||||
txInputs = append(txInputs, *crossTxin) |
||||
} |
||||
|
||||
// Spend the utxo from the current shard to a random address in [0 - N)
|
||||
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000)), dataNodes[shardId].Consensus.ShardID} |
||||
txOutputs := []blockchain.TXOutput{txout} |
||||
|
||||
// Spend the utxo from the other shard, if any, to a random address in [0 - N)
|
||||
if crossTxin != nil { |
||||
crossTxout := blockchain.TXOutput{crossUtxoValue, strconv.Itoa(rand.Intn(10000)), uint32(crossShardId)} |
||||
txOutputs = append(txOutputs, crossTxout) |
||||
} |
||||
|
||||
// Construct the new transaction
|
||||
tx := blockchain.Transaction{[32]byte{}, txInputs, txOutputs, nil} |
||||
tx.SetID() |
||||
|
||||
crossTxs = append(crossTxs, &tx) |
||||
txsCount++ |
||||
} else { |
||||
// Add the utxo as new tx input
|
||||
txin := blockchain.TXInput{txId, index, address, dataNodes[shardId].Consensus.ShardID} |
||||
|
||||
// Spend the utxo to a random address in [0 - N)
|
||||
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000)), dataNodes[shardId].Consensus.ShardID} |
||||
tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}, nil} |
||||
tx.SetID() |
||||
|
||||
txs = append(txs, &tx) |
||||
txsCount++ |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
utxoPoolMutex.Unlock() |
||||
|
||||
return txs, crossTxs |
||||
} |
||||
|
||||
// Gets all the validator peers
|
||||
func getValidators(config string) []p2p.Peer { |
||||
file, _ := os.Open(config) |
||||
fscanner := bufio.NewScanner(file) |
||||
var peerList []p2p.Peer |
||||
for fscanner.Scan() { |
||||
p := strings.Split(fscanner.Text(), " ") |
||||
ip, port, status := p[0], p[1], p[2] |
||||
if status == "leader" || status == "client" { |
||||
continue |
||||
} |
||||
peer := p2p.Peer{Port: port, Ip: ip} |
||||
peerList = append(peerList, peer) |
||||
} |
||||
return peerList |
||||
} |
||||
|
||||
// Gets all the leader peers and corresponding shard Ids
|
||||
func getLeadersAndShardIds(config *[][]string) ([]p2p.Peer, []uint32) { |
||||
var peerList []p2p.Peer |
||||
var shardIds []uint32 |
||||
for _, node := range *config { |
||||
ip, port, status, shardId := node[0], node[1], node[2], node[3] |
||||
if status == "leader" { |
||||
peerList = append(peerList, p2p.Peer{Ip: ip, Port: port}) |
||||
val, err := strconv.Atoi(shardId) |
||||
if err == nil { |
||||
shardIds = append(shardIds, uint32(val)) |
||||
} else { |
||||
log.Error("[Generator] Error parsing the shard Id ", shardId) |
||||
} |
||||
} |
||||
} |
||||
return peerList, shardIds |
||||
} |
||||
|
||||
// Parse the config file and return a 2d array containing the file data
|
||||
func readConfigFile(configFile string) [][]string { |
||||
file, _ := os.Open(configFile) |
||||
fscanner := bufio.NewScanner(file) |
||||
|
||||
result := [][]string{} |
||||
for fscanner.Scan() { |
||||
p := strings.Split(fscanner.Text(), " ") |
||||
result = append(result, p) |
||||
} |
||||
return result |
||||
} |
||||
|
||||
// Gets the port of the client node in the config
|
||||
func getClientPort(config *[][]string) string { |
||||
for _, node := range *config { |
||||
_, port, status, _ := node[0], node[1], node[2], node[3] |
||||
if status == "client" { |
||||
return port |
||||
} |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
// A utility func that counts the total number of utxos in a pool.
|
||||
func countNumOfUtxos(utxoPool *blockchain.UTXOPool) int { |
||||
countAll := 0 |
||||
for _, utxoMap := range utxoPool.UtxoMap { |
||||
for txIdStr, val := range utxoMap { |
||||
_ = val |
||||
id, err := hex.DecodeString(txIdStr) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
|
||||
txId := [32]byte{} |
||||
copy(txId[:], id[:]) |
||||
for _, utxo := range val { |
||||
_ = utxo |
||||
countAll++ |
||||
} |
||||
} |
||||
} |
||||
return countAll |
||||
} |
||||
|
||||
func main() { |
||||
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") |
||||
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 10000, "number of transactions to send per message") |
||||
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") |
||||
flag.Parse() |
||||
|
||||
// Read the configs
|
||||
config := readConfigFile(*configFile) |
||||
leaders, shardIds := getLeadersAndShardIds(&config) |
||||
|
||||
// Do cross shard tx if there are more than one shard
|
||||
crossShard := len(shardIds) > 1 |
||||
|
||||
// TODO(Richard): refactor this chuck to a single method
|
||||
// Setup a logger to stdout and log file.
|
||||
logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder) |
||||
h := log.MultiHandler( |
||||
log.Must.FileHandler(logFileName, log.LogfmtFormat()), |
||||
log.StdoutHandler) |
||||
// In cases where you just want a stdout logger, use the following one instead.
|
||||
// h := log.CallerFileHandler(log.StdoutHandler)
|
||||
log.Root().SetHandler(h) |
||||
|
||||
// Nodes containing utxopools to mirror the shards' data in the network
|
||||
nodes := []*node.Node{} |
||||
for _, shardId := range shardIds { |
||||
node := node.NewNode(&consensus.Consensus{ShardID: shardId}) |
||||
// Assign many fake addresses so we have enough address to place with at first
|
||||
node.AddTestingAddresses(10000) |
||||
nodes = append(nodes, &node) |
||||
} |
||||
|
||||
// Client/txgenerator server node setup
|
||||
clientPort := getClientPort(&config) |
||||
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) |
||||
clientNode := node.NewNode(&consensusObj) |
||||
|
||||
if clientPort != "" { |
||||
clientNode.Client = client.NewClient(&leaders) |
||||
|
||||
// This func is used to update the client's utxopool when new blocks are received from the leaders
|
||||
updateBlocksFunc := func(blocks []*blockchain.Block) { |
||||
log.Debug("Received new block from leader", "len", len(blocks)) |
||||
for _, block := range blocks { |
||||
for _, node := range nodes { |
||||
if node.Consensus.ShardID == block.ShardId { |
||||
log.Debug("Adding block from leader", "shardId", block.ShardId) |
||||
// Add it to blockchain
|
||||
utxoPoolMutex.Lock() |
||||
node.AddNewBlock(block) |
||||
utxoPoolMutex.Unlock() |
||||
} else { |
||||
continue |
||||
} |
||||
} |
||||
} |
||||
} |
||||
clientNode.Client.UpdateBlocks = updateBlocksFunc |
||||
|
||||
// Start the client server to listen to leader's message
|
||||
go func() { |
||||
clientNode.StartServer(clientPort) |
||||
}() |
||||
|
||||
} |
||||
|
||||
// Transaction generation process
|
||||
time.Sleep(10 * time.Second) // wait for nodes to be ready
|
||||
start := time.Now() |
||||
totalTime := 60.0 |
||||
|
||||
for true { |
||||
t := time.Now() |
||||
if t.Sub(start).Seconds() >= totalTime { |
||||
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) |
||||
break |
||||
} |
||||
|
||||
allCrossTxs := []*blockchain.Transaction{} |
||||
// Generate simulated transactions
|
||||
for i, leader := range leaders { |
||||
txs, crossTxs := generateSimulatedTransactions(i, nodes, *maxNumTxsPerBatch, crossShard) |
||||
allCrossTxs = append(allCrossTxs, crossTxs...) |
||||
|
||||
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs)) |
||||
msg := node.ConstructTransactionListMessage(txs) |
||||
p2p.SendMessage(leader, msg) |
||||
// Note cross shard txs are later sent in batch
|
||||
} |
||||
|
||||
if len(allCrossTxs) > 0 { |
||||
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs)) |
||||
msg := node.ConstructTransactionListMessage(allCrossTxs) |
||||
p2p.BroadcastMessage(leaders, msg) |
||||
|
||||
// Put cross shard tx into a pending list waiting for proofs from leaders
|
||||
if clientPort != "" { |
||||
clientNode.Client.PendingCrossTxsMutex.Lock() |
||||
for _, tx := range allCrossTxs { |
||||
clientNode.Client.PendingCrossTxs[tx.ID] = tx |
||||
} |
||||
clientNode.Client.PendingCrossTxsMutex.Unlock() |
||||
} |
||||
} |
||||
|
||||
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
|
||||
} |
||||
|
||||
// Send a stop message to stop the nodes at the end
|
||||
msg := node.ConstructStopMessage() |
||||
peers := append(getValidators(*configFile), leaders...) |
||||
p2p.BroadcastMessage(peers, msg) |
||||
} |
Loading…
Reference in new issue