You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
251 lines
7.6 KiB
251 lines
7.6 KiB
6 years ago
|
package main
|
||
|
|
||
|
import (
|
||
|
"encoding/hex"
|
||
|
"flag"
|
||
|
"fmt"
|
||
|
"harmony-benchmark/blockchain"
|
||
|
"harmony-benchmark/client"
|
||
|
"harmony-benchmark/client/btctxiter"
|
||
|
"harmony-benchmark/configr"
|
||
|
"harmony-benchmark/consensus"
|
||
|
"harmony-benchmark/log"
|
||
|
"harmony-benchmark/node"
|
||
|
"harmony-benchmark/p2p"
|
||
6 years ago
|
proto_node "harmony-benchmark/proto/node"
|
||
6 years ago
|
"sync"
|
||
|
"time"
|
||
6 years ago
|
|
||
|
"github.com/piotrnar/gocoin/lib/btc"
|
||
6 years ago
|
)
|
||
|
|
||
|
type txGenSettings struct {
|
||
|
crossShard bool
|
||
|
maxNumTxsPerBatch int
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
utxoPoolMutex sync.Mutex
|
||
|
setting txGenSettings
|
||
|
btcTXIter btctxiter.BTCTXIterator
|
||
|
)
|
||
|
|
||
|
// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards.
|
||
|
// The transactions are generated by going through the existing utxos and
|
||
|
// randomly select a subset of them as the input for each new transaction. The output
|
||
|
// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses.
|
||
|
//
|
||
|
// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard.
|
||
|
// Similarly, generate another utxo output in that second shard.
|
||
|
//
|
||
|
// NOTE: the genesis block should contain N coinbase transactions which add
|
||
|
// token (1000) to each address in [0 - N). See node.AddTestingAddresses()
|
||
|
//
|
||
|
// Params:
|
||
|
// shardID - the shardID for current shard
|
||
|
// dataNodes - nodes containing utxopools of all shards
|
||
|
// Returns:
|
||
|
// all single-shard txs
|
||
|
// all cross-shard txs
|
||
|
func generateSimulatedTransactions(shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
|
||
|
/*
|
||
6 years ago
|
UTXO map structure:
|
||
|
{
|
||
|
address: {
|
||
|
txID: {
|
||
|
outputIndex: value
|
||
|
}
|
||
|
}
|
||
|
}
|
||
6 years ago
|
*/
|
||
|
|
||
|
utxoPoolMutex.Lock()
|
||
6 years ago
|
txs := []*blockchain.Transaction{}
|
||
|
crossTxs := []*blockchain.Transaction{}
|
||
|
|
||
|
nodeShardID := dataNodes[shardID].Consensus.ShardID
|
||
|
cnt := 0
|
||
|
|
||
|
LOOP:
|
||
|
for true {
|
||
|
blk := btcTXIter.IterateBTCTX()
|
||
|
blk.BuildTxList()
|
||
|
for _, btcTx := range blk.Txs {
|
||
|
tx := blockchain.Transaction{}
|
||
|
// tx.ID = tx.Hash.String()
|
||
|
if btcTx.IsCoinBase() {
|
||
|
// TxIn coinbase, newly generated coins
|
||
|
prevTxID := [32]byte{}
|
||
|
// TODO: merge txID with txIndex
|
||
|
tx.TxInput = []blockchain.TXInput{blockchain.TXInput{prevTxID, -1, "", nodeShardID}}
|
||
|
} else {
|
||
|
for _, txi := range btcTx.TxIn {
|
||
|
tx.TxInput = append(tx.TxInput, blockchain.TXInput{txi.Input.Hash, int(txi.Input.Vout), "", nodeShardID})
|
||
|
}
|
||
6 years ago
|
}
|
||
|
|
||
6 years ago
|
for _, txo := range btcTx.TxOut {
|
||
|
txoAddr := btc.NewAddrFromPkScript(txo.Pk_script, false)
|
||
|
if txoAddr == nil {
|
||
|
log.Warn("TxOut: can't decode address")
|
||
6 years ago
|
}
|
||
6 years ago
|
txout := blockchain.TXOutput{int(txo.Value), txoAddr.String(), nodeShardID}
|
||
|
tx.TxOutput = append(tx.TxOutput, txout)
|
||
|
}
|
||
|
tx.SetID()
|
||
|
txs = append(txs, &tx)
|
||
|
log.Debug("[Generator] transformed btc tx", "block height", btcTXIter.GetIndex(), "txi", len(tx.TxInput), "txo", len(tx.TxOutput), "txCount", cnt)
|
||
|
cnt++
|
||
|
if cnt >= setting.maxNumTxsPerBatch {
|
||
|
break LOOP
|
||
6 years ago
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
6 years ago
|
utxoPoolMutex.Unlock()
|
||
6 years ago
|
|
||
6 years ago
|
log.Debug("[Generator] generated transations", "single-shard", len(txs), "cross-shard", len(crossTxs))
|
||
|
return txs, crossTxs
|
||
6 years ago
|
}
|
||
|
|
||
|
// A utility func that counts the total number of utxos in a pool.
|
||
|
func countNumOfUtxos(utxoPool *blockchain.UTXOPool) int {
|
||
|
countAll := 0
|
||
|
for _, utxoMap := range utxoPool.UtxoMap {
|
||
6 years ago
|
for txIDStr, val := range utxoMap {
|
||
6 years ago
|
_ = val
|
||
6 years ago
|
id, err := hex.DecodeString(txIDStr)
|
||
6 years ago
|
if err != nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
6 years ago
|
txID := [32]byte{}
|
||
|
copy(txID[:], id[:])
|
||
6 years ago
|
for _, utxo := range val {
|
||
|
_ = utxo
|
||
|
countAll++
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return countAll
|
||
|
}
|
||
|
|
||
6 years ago
|
func initClient(clientNode *node.Node, clientPort string, leaders *[]p2p.Peer, nodes *[]*node.Node) {
|
||
|
if clientPort == "" {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
clientNode.Client = client.NewClient(leaders)
|
||
|
|
||
|
// This func is used to update the client's utxopool when new blocks are received from the leaders
|
||
|
updateBlocksFunc := func(blocks []*blockchain.Block) {
|
||
|
log.Debug("Received new block from leader", "len", len(blocks))
|
||
|
for _, block := range blocks {
|
||
|
for _, node := range *nodes {
|
||
|
if node.Consensus.ShardID == block.ShardId {
|
||
|
log.Debug("Adding block from leader", "shardId", block.ShardId)
|
||
|
// Add it to blockchain
|
||
|
utxoPoolMutex.Lock()
|
||
|
node.AddNewBlock(block)
|
||
|
utxoPoolMutex.Unlock()
|
||
|
} else {
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
clientNode.Client.UpdateBlocks = updateBlocksFunc
|
||
|
|
||
|
// Start the client server to listen to leader's message
|
||
|
go func() {
|
||
|
clientNode.StartServer(clientPort)
|
||
|
}()
|
||
|
}
|
||
|
|
||
6 years ago
|
func main() {
|
||
|
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
|
||
6 years ago
|
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100, "number of transactions to send per message")
|
||
6 years ago
|
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
|
||
|
flag.Parse()
|
||
|
|
||
|
// Read the configs
|
||
|
config, _ := configr.ReadConfigFile(*configFile)
|
||
6 years ago
|
leaders, shardIDs := configr.GetLeadersAndShardIds(&config)
|
||
6 years ago
|
|
||
|
// Do cross shard tx if there are more than one shard
|
||
6 years ago
|
setting.crossShard = len(shardIDs) > 1
|
||
6 years ago
|
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch
|
||
|
|
||
|
// TODO(Richard): refactor this chuck to a single method
|
||
|
// Setup a logger to stdout and log file.
|
||
|
logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder)
|
||
|
h := log.MultiHandler(
|
||
|
log.StdoutHandler,
|
||
|
log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file
|
||
|
// log.Must.NetHandler("tcp", ":3000", log.JSONFormat()) // Log to remote
|
||
|
)
|
||
|
log.Root().SetHandler(h)
|
||
|
|
||
|
btcTXIter.Init()
|
||
|
|
||
|
// Nodes containing utxopools to mirror the shards' data in the network
|
||
|
nodes := []*node.Node{}
|
||
6 years ago
|
for _, shardID := range shardIDs {
|
||
|
nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}))
|
||
6 years ago
|
}
|
||
|
|
||
|
// Client/txgenerator server node setup
|
||
|
clientPort := configr.GetClientPort(&config)
|
||
|
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
|
||
|
clientNode := node.New(consensusObj)
|
||
|
|
||
6 years ago
|
initClient(clientNode, clientPort, &leaders, &nodes)
|
||
6 years ago
|
|
||
|
// Transaction generation process
|
||
|
time.Sleep(10 * time.Second) // wait for nodes to be ready
|
||
|
start := time.Now()
|
||
|
totalTime := 300.0 //run for 5 minutes
|
||
|
|
||
|
for true {
|
||
|
t := time.Now()
|
||
|
if t.Sub(start).Seconds() >= totalTime {
|
||
|
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
|
||
|
break
|
||
|
}
|
||
|
|
||
|
allCrossTxs := []*blockchain.Transaction{}
|
||
|
// Generate simulated transactions
|
||
|
for i, leader := range leaders {
|
||
|
txs, crossTxs := generateSimulatedTransactions(i, nodes)
|
||
|
allCrossTxs = append(allCrossTxs, crossTxs...)
|
||
|
|
||
|
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
|
||
6 years ago
|
msg := proto_node.ConstructTransactionListMessage(txs)
|
||
6 years ago
|
p2p.SendMessage(leader, msg)
|
||
|
// Note cross shard txs are later sent in batch
|
||
|
}
|
||
|
|
||
|
if len(allCrossTxs) > 0 {
|
||
|
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
|
||
6 years ago
|
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
|
||
6 years ago
|
p2p.BroadcastMessage(leaders, msg)
|
||
|
|
||
|
// Put cross shard tx into a pending list waiting for proofs from leaders
|
||
|
if clientPort != "" {
|
||
|
clientNode.Client.PendingCrossTxsMutex.Lock()
|
||
|
for _, tx := range allCrossTxs {
|
||
|
clientNode.Client.PendingCrossTxs[tx.ID] = tx
|
||
|
}
|
||
|
clientNode.Client.PendingCrossTxsMutex.Unlock()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
|
||
|
}
|
||
|
|
||
|
// Send a stop message to stop the nodes at the end
|
||
6 years ago
|
msg := proto_node.ConstructStopMessage()
|
||
6 years ago
|
peers := append(configr.GetValidators(*configFile), leaders...)
|
||
|
p2p.BroadcastMessage(peers, msg)
|
||
|
}
|