Merge branch 'master' of github.com:simple-rules/harmony-benchmark

pull/15/head
alok 7 years ago
commit fa42bc3db2
  1. 134
      aws-code/transaction_generator.go
  2. 8
      blockchain/blockchain.go
  3. 6
      blockchain/transaction.go
  4. 4
      blockchain/utxopool.go
  5. 47
      client/client.go
  6. 6
      client/message.go

@ -24,7 +24,7 @@ import (
// address of the new transaction are randomly selected from 1 - 1000.
// NOTE: the genesis block should contain 1000 coinbase transactions adding
// value to each address in [1 - 1000]. See node.AddMoreFakeTransactions()
func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Transaction {
func getNewFakeTransactions(shardId int, dataNodes *[]node.Node, numTxs int, numShards int, crossShard bool) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/*
UTXO map structure:
address - [
@ -38,63 +38,83 @@ func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Trans
]
]
*/
var outputs []*blockchain.Transaction
var txs []*blockchain.Transaction
var crossTxs []*blockchain.Transaction
count := 0
countAll := 0
for address, txMap := range dataNode.UtxoPool.UtxoMap {
for address, txMap := range (*dataNodes)[shardId].UtxoPool.UtxoMap {
for txIdStr, utxoMap := range txMap {
txId, err := hex.DecodeString(txIdStr)
id, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
txId := [32]byte{}
copy(txId[:], id[:])
for index, value := range utxoMap {
countAll++
if rand.Intn(100) < 30 { // 30% sample rate to select UTXO to use for new transactions
// Sharding related test code. To be deleted soon
//if dataNode.Consensus.ShardID == 0 {
// txin := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID}
// txin2 := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID+1}
// txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))}
// tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin, txin2}, []blockchain.TXOutput{txout}}
// tx.SetID()
//
// if count >= numTxs {
// continue
// }
// outputs = append(outputs, &tx)
// count++
//} else {
// txin := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID}
// txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))}
// tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
// tx.SetID()
//
// if count >= numTxs {
// continue
// }
// outputs = append(outputs, &tx)
// count++
//}
randNum := rand.Intn(100)
if randNum < 30 { // 30% sample rate to select UTXO to use for new transactions
// Spend the money of current UTXO to a random address in [1 - 1000]
txin := blockchain.TXInput{txId, index, address, dataNode.Consensus.ShardID}
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))}
tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx.SetID()
if randNum < 10 && crossShard { // 30% cross shard transactions: add another txinput from another shard
// Get the cross shard utxo from another shard
crossShardId := (int((*dataNodes)[shardId].Consensus.ShardID) + 1) % numShards // shard with neighboring Id
crossShardNode := (*dataNodes)[crossShardId]
crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[address]
crossTxin := new(blockchain.TXInput)
crossValue := 0
for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap {
id, err := hex.DecodeString(crossTxIdStr)
if err != nil {
continue
}
crossTxId := [32]byte{}
copy(crossTxId[:], id[:])
for crossShardIndex, crossShardValue := range crossShardUtxos {
crossValue = crossShardValue
crossTxin = &blockchain.TXInput{crossTxId, crossShardIndex, address, uint32(crossShardId)}
}
}
// Fill in the utxo from current shard
txin := blockchain.TXInput{txId, index, address, (*dataNodes)[shardId].Consensus.ShardID}
txInputs := []blockchain.TXInput{txin}
if crossTxin != nil {
txInputs = append(txInputs, *crossTxin)
}
txout := blockchain.TXOutput{value + crossValue, strconv.Itoa(rand.Intn(10000))}
tx := blockchain.Transaction{[32]byte{}, txInputs, []blockchain.TXOutput{txout}}
tx.SetID()
if count >= numTxs {
continue
}
crossTxs = append(crossTxs, &tx)
count++
} else {
txin := blockchain.TXInput{txId, index, address, (*dataNodes)[shardId].Consensus.ShardID}
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(10000))}
tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx.SetID()
if count >= numTxs {
continue
if count >= numTxs {
continue
}
txs = append(txs, &tx)
count++
}
outputs = append(outputs, &tx)
count++
}
}
}
}
log.Debug("UTXO", "poolSize", countAll, "numTxsToSend", numTxs)
return outputs
return txs, crossTxs
}
func getValidators(config string) []p2p.Peer {
@ -157,6 +177,7 @@ func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
numTxsPerBatch := flag.Int("num_txs_per_batch", 10000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
crossShard := flag.Bool("cross_shard", false, "whether to send cross shard txs")
flag.Parse()
config := readConfigFile(*configFile)
leaders, shardIds := getLeadersAndShardIds(&config)
@ -172,10 +193,10 @@ func main() {
// Client server setup
clientPort := getClientPort(&config)
if clientPort != "" {
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.NewNode(&consensusObj)
clientNode := node.NewNode(&consensusObj)
if clientPort != "" {
clientNode.Client = client.NewClient()
go func() {
clientNode.StartServer(clientPort)
@ -202,17 +223,34 @@ func main() {
}
t = time.Now()
allCrossTxs := []*blockchain.Transaction{}
for i, leader := range leaders {
txsToSend := getNewFakeTransactions(&nodes[i], *numTxsPerBatch)
msg := node.ConstructTransactionListMessage(txsToSend)
txs, crossTxs := getNewFakeTransactions(i, &nodes, *numTxsPerBatch, len(shardIds), *crossShard)
allCrossTxs = append(allCrossTxs, crossTxs...)
msg := node.ConstructTransactionListMessage(txs)
fmt.Printf("[Generator] Creating fake txs for leader took %s", time.Since(t))
log.Debug("[Generator] Sending txs ...", "leader", leader, "numTxs", len(txsToSend))
log.Debug("[Generator] Sending txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
p2p.SendMessage(leader, msg)
// Update local utxo pool to mirror the utxo pool of a real node
nodes[i].UtxoPool.Update(txsToSend)
// Update local utxo pool to mirror the utxo pool of a real node (within-shard tx)
nodes[i].UtxoPool.Update(txs)
}
msg := node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
for _, node := range nodes {
// Update local utxo pool to mirror the utxo pool of a real node (cross-shard tx)
node.UtxoPool.Update(allCrossTxs)
}
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = &client.CrossShardTxAndProofs{Transaction: *tx}
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
}

@ -51,7 +51,7 @@ func (bc *Blockchain) FindUnspentTransactions(address string) []Transaction {
for _, txInput := range tx.TxInput {
if address == txInput.Address {
ID := hex.EncodeToString(txInput.TxID)
ID := hex.EncodeToString(txInput.TxID[:])
spentTXOs[ID] = append(spentTXOs[ID], txInput.TxOutputIndex)
}
}
@ -115,13 +115,15 @@ func (bc *Blockchain) NewUTXOTransaction(from, to string, amount int, shardId ui
// Build a list of inputs
for txid, outs := range validOutputs {
txID, err := hex.DecodeString(txid)
id, err := hex.DecodeString(txid)
if err != nil {
return nil
}
txId := [32]byte{}
copy(txId[:], id[:])
for _, out := range outs {
input := TXInput{txID, out, from, shardId}
input := TXInput{txId, out, from, shardId}
inputs = append(inputs, input)
}
}

@ -24,7 +24,7 @@ type TXOutput struct {
// TXInput is the struct of transaction input (a UTXO) in a transaction.
type TXInput struct {
TxID []byte
TxID [32]byte
TxOutputIndex int
Address string
ShardId uint32 // The Id of the shard where this UTXO belongs
@ -67,7 +67,7 @@ func NewCoinbaseTX(to, data string, shardId uint32) *Transaction {
data = fmt.Sprintf("Reward to '%s'", to)
}
txin := TXInput{[]byte{}, -1, data, shardId}
txin := TXInput{[32]byte{}, -1, data, shardId}
txout := TXOutput{DefaultCoinbaseValue, to}
tx := Transaction{[32]byte{}, []TXInput{txin}, []TXOutput{txout}}
tx.SetID()
@ -76,7 +76,7 @@ func NewCoinbaseTX(to, data string, shardId uint32) *Transaction {
// Used for debuging.
func (txInput *TXInput) String() string {
res := fmt.Sprintf("TxID: %v, ", hex.EncodeToString(txInput.TxID))
res := fmt.Sprintf("TxID: %v, ", hex.EncodeToString(txInput.TxID[:]))
res += fmt.Sprintf("TxOutputIndex: %v, ", txInput.TxOutputIndex)
res += fmt.Sprintf("Address: %v", txInput.Address)
return res

@ -56,7 +56,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[s
continue
}
inTxID := hex.EncodeToString(in.TxID)
inTxID := hex.EncodeToString(in.TxID[:])
index := in.TxOutputIndex
// Check if the transaction with the addres is spent or not.
if val, ok := (*spentTXOs)[in.Address][inTxID][index]; ok {
@ -126,7 +126,7 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
continue
}
inTxID := hex.EncodeToString(in.TxID)
inTxID := hex.EncodeToString(in.TxID[:])
utxoPool.DeleteOneBalanceItem(in.Address, inTxID, in.TxOutputIndex)
}

@ -3,13 +3,16 @@ package client
import (
"bytes"
"encoding/gob"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/log"
"sync"
)
// A client represent a entity/user which send transactions and receive responses from the harmony network
type Client struct {
pendingCrossTxs map[[32]byte]*blockchain.Transaction // map of TxId to pending cross shard txs
PendingCrossTxs map[[32]byte]*CrossShardTxAndProofs // map of TxId to pending cross shard txs
PendingCrossTxsMutex sync.Mutex
log log.Logger // Log utility
}
@ -26,14 +29,52 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) {
client.log.Error("Failed deserializing cross transaction proof list")
}
// TODO: process the proof list
txsToSend := []CrossShardTxAndProofs{}
client.PendingCrossTxsMutex.Lock()
for _, proof := range *proofList {
txAndProofs, ok := client.PendingCrossTxs[proof.TxID]
readyToUnlock := true
if ok {
txAndProofs.Proofs = append(txAndProofs.Proofs, proof)
txInputs := make(map[blockchain.TXInput]bool)
for _, txInput := range txAndProofs.Transaction.TxInput {
txInputs[txInput] = true
}
for _, curProof := range txAndProofs.Proofs {
for _, txInput := range curProof.TxInput {
val, ok := txInputs[*txInput]
if !ok || !val {
readyToUnlock = false
}
}
}
}
if readyToUnlock {
txsToSend = append(txsToSend, *txAndProofs)
}
}
for _, txToSend := range txsToSend {
delete(client.PendingCrossTxs, txToSend.Transaction.ID)
}
client.PendingCrossTxsMutex.Unlock()
if txsToSend != nil {
client.sendCrossShardTxUnlockMessage(&txsToSend)
}
}
}
func (client *Client) sendCrossShardTxUnlockMessage(txsToSend *[]CrossShardTxAndProofs) {
// TODO: Send unlock message back to output shards
fmt.Println("SENDING UNLOCK MESSAGE")
}
// Create a new Node
func NewClient() *Client {
client := Client{}
client.PendingCrossTxs = make(map[[32]byte]*CrossShardTxAndProofs)
// Logger
client.log = log.New()
return &client

@ -22,6 +22,12 @@ const (
CROSS_TX TransactionMessageType = iota // The proof of accept or reject returned by the leader to the cross shard transaction client.
)
// Used to aggregated proofs and unlock utxos in cross shard tx
type CrossShardTxAndProofs struct {
Transaction blockchain.Transaction // The cross shard tx
Proofs []blockchain.CrossShardTxProof // The proofs
}
//ConstructStopMessage is STOP message
func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.CLIENT)})

Loading…
Cancel
Save