Merge pull request #51 from simple-rules/ricl-txgen

Ricl txgen
pull/55/head
Richard Liu 6 years ago committed by GitHub
commit 5715fbcdc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 275
      client/btctxgen/btctxgen.go
  2. 38
      client/btctxiter/btctxiter.go
  3. 1
      client/txgen/main.go

@ -0,0 +1,275 @@
package main
import (
"encoding/hex"
"flag"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/client"
"harmony-benchmark/client/btctxiter"
"harmony-benchmark/configr"
"harmony-benchmark/consensus"
"harmony-benchmark/log"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
"math/rand"
"sync"
"time"
)
type txGenSettings struct {
numOfAddress int
crossShard bool
maxNumTxsPerBatch int
}
var (
utxoPoolMutex sync.Mutex
setting txGenSettings
btcTXIter btctxiter.BTCTXIterator
)
type TxInfo struct {
// Global Input
shardID int
dataNodes []*node.Node
// Temp Input
id [32]byte
index int
value int
address string
// Output
txs []*blockchain.Transaction
crossTxs []*blockchain.Transaction
txCount int
}
// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards.
// The transactions are generated by going through the existing utxos and
// randomly select a subset of them as the input for each new transaction. The output
// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses.
//
// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard.
// Similarly, generate another utxo output in that second shard.
//
// NOTE: the genesis block should contain N coinbase transactions which add
// token (1000) to each address in [0 - N). See node.AddTestingAddresses()
//
// Params:
// shardID - the shardID for current shard
// dataNodes - nodes containing utxopools of all shards
// Returns:
// all single-shard txs
// all cross-shard txs
func generateSimulatedTransactions(shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/*
UTXO map structure:
address - [
txId1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txId2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
utxoPoolMutex.Lock()
txInfo := TxInfo{}
txInfo.shardID = shardID
txInfo.dataNodes = dataNodes
txInfo.txCount = 0
UTXOLOOP:
// Loop over all addresses
for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap {
txInfo.address = address
// Loop over all txIds for the address
for txIdStr, utxoMap := range txMap {
// Parse TxId
id, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
copy(txInfo.id[:], id[:])
// Loop over all utxos for the txId
for index, value := range utxoMap {
txInfo.index = index
txInfo.value = value
randNum := rand.Intn(100)
// 30% sample rate to select UTXO to use for new transactions
if randNum >= 30 {
continue
}
generateSingleShardTx(&txInfo)
if txInfo.txCount >= setting.maxNumTxsPerBatch {
break UTXOLOOP
}
}
}
}
utxoPoolMutex.Unlock()
log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs))
return txInfo.txs, txInfo.crossTxs
}
func generateSingleShardTx(txInfo *TxInfo) {
// nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
// blk := btcTXIter.IterateBTCTX()
// Add the utxo as new tx input
// txin := blockchain.TXInput{txInfo.id, txInfo.index, txInfo.address, nodeShardID}
// // Spend the utxo to a random address in [0 - N)
// txout := blockchain.TXOutput{txInfo.value, strconv.Itoa(rand.Intn(setting.numOfAddress)), nodeShardID}
// tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}, nil}
// tx.SetID()
// txInfo.txs = append(txInfo.txs, &tx)
txInfo.txCount++
}
// A utility func that counts the total number of utxos in a pool.
func countNumOfUtxos(utxoPool *blockchain.UTXOPool) int {
countAll := 0
for _, utxoMap := range utxoPool.UtxoMap {
for txIdStr, val := range utxoMap {
_ = val
id, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
txId := [32]byte{}
copy(txId[:], id[:])
for _, utxo := range val {
_ = utxo
countAll++
}
}
}
return countAll
}
func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
flag.Parse()
// Read the configs
config, _ := configr.ReadConfigFile(*configFile)
leaders, shardIds := configr.GetLeadersAndShardIds(&config)
setting.numOfAddress = 10000
// Do cross shard tx if there are more than one shard
setting.crossShard = len(shardIds) > 1
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder)
h := log.MultiHandler(
log.StdoutHandler,
log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file
// log.Must.NetHandler("tcp", ":3000", log.JSONFormat()) // Log to remote
)
log.Root().SetHandler(h)
btcTXIter.Init()
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardId := range shardIds {
node := node.New(&consensus.Consensus{ShardID: shardId})
// // Assign many fake addresses so we have enough address to play with at first
// node.AddTestingAddresses(setting.numOfAddress)
nodes = append(nodes, node)
}
// Client/txgenerator server node setup
clientPort := configr.GetClientPort(&config)
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj)
if clientPort != "" {
clientNode.Client = client.NewClient(&leaders)
// This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
log.Debug("Received new block from leader", "len", len(blocks))
for _, block := range blocks {
for _, node := range nodes {
if node.Consensus.ShardID == block.ShardId {
log.Debug("Adding block from leader", "shardId", block.ShardId)
// Add it to blockchain
utxoPoolMutex.Lock()
node.AddNewBlock(block)
utxoPoolMutex.Unlock()
} else {
continue
}
}
}
}
clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
clientNode.StartServer(clientPort)
}()
}
// Transaction generation process
time.Sleep(10 * time.Second) // wait for nodes to be ready
start := time.Now()
totalTime := 300.0 //run for 5 minutes
for true {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break
}
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
for i, leader := range leaders {
txs, crossTxs := generateSimulatedTransactions(i, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
msg := node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
}
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
msg := node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
}
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
}
// Send a stop message to stop the nodes at the end
msg := node.ConstructStopMessage()
peers := append(configr.GetValidators(*configFile), leaders...)
p2p.BroadcastMessage(peers, msg)
}

@ -0,0 +1,38 @@
package btctxiter
import (
"log"
"github.com/piotrnar/gocoin/lib/btc"
"github.com/piotrnar/gocoin/lib/others/blockdb"
)
type BTCTXIterator struct {
index int
blockDatabase *blockdb.BlockDB
}
func (iter *BTCTXIterator) Init() {
// Set real Bitcoin network
Magic := [4]byte{0xF9, 0xBE, 0xB4, 0xD9}
// Specify blocks directory
iter.blockDatabase = blockdb.NewBlockDB("/Users/ricl/Library/Application Support/Bitcoin/blocks", Magic)
iter.index = -1
}
func (iter *BTCTXIterator) IterateBTCTX() *btc.Block {
iter.index++
dat, err := iter.blockDatabase.FetchNextBlock()
if dat == nil || err != nil {
log.Println("END of DB file")
}
blk, err := btc.NewBlock(dat[:])
if err != nil {
println("Block inconsistent:", err.Error())
}
blk.BuildTxList()
return blk
}

@ -118,6 +118,7 @@ UTXOLOOP:
} }
utxoPoolMutex.Unlock() utxoPoolMutex.Unlock()
log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs))
return txInfo.txs, txInfo.crossTxs return txInfo.txs, txInfo.crossTxs
} }

Loading…
Cancel
Save