Add valid transaction simulator based on current utxo pool

pull/7/head
Rongjian Lan 7 years ago
parent d5214f3b3d
commit b1d2eaa52b
  1. 79
      aws-code/transaction_generator.go
  2. 1
      benchmark_main.go
  3. 63
      blockchain/utxopool.go
  4. 9
      node/message.go
  5. 21
      node/node.go
  6. 6
      node/node_handler.go
  7. 2
      node/node_test.go

@ -12,15 +12,63 @@ import (
"os" "os"
"strings" "strings"
"time" "time"
"harmony-benchmark/consensus"
"encoding/hex"
"strconv"
) )
func newRandTransaction() blockchain.Transaction { // Get numTxs number of Fake transactions based on the existing UtxoPool.
txin := blockchain.TXInput{[]byte{}, rand.Intn(100), string(rand.Uint64())} // The transactions are generated by going through the existing utxos and
txout := blockchain.TXOutput{rand.Intn(100), string(rand.Uint64())} // randomly select a subset of them as input to new transactions. The output
// address of the new transaction are randomly selected from 1 - 1000.
// NOTE: the genesis block should contain 1000 coinbase transactions adding
// value to each address in [1 - 1000]. See node.AddMoreFakeTransactions()
func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Transaction {
/*
address - [
txId1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txId2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
var outputs []*blockchain.Transaction
count := 0
countAll := 0
for address, txMap := range dataNode.UtxoPool.UtxoMap {
for txIdStr, utxoMap := range txMap {
txId, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
for index, value := range utxoMap {
countAll++
if rand.Intn(100) <= 20 { // 20% sample rate to select UTXO to use for new transactions
// Spend the money of current UTXO to a random address in [1 - 1000]
txin := blockchain.TXInput{txId, index, address}
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(1000))}
tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}} tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx.SetID() tx.SetID()
return tx if count >= numTxs {
continue
}
outputs = append(outputs, &tx)
count++
}
}
}
}
log.Debug("UTXO", "poolSize", countAll, "numTxsToSend", numTxs)
return outputs
} }
func getValidators(config string) []p2p.Peer { func getValidators(config string) []p2p.Peer {
@ -63,28 +111,35 @@ func readConfigFile(configFile string) [][]string {
} }
func main() { func main() {
// Setup a stdout logger
h := log.CallerFileHandler(log.StdoutHandler)
log.Root().SetHandler(h)
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
numTxsPerBatch := flag.Int("num_txs_per_batch", 100, "number of transactions to send per message") numTxsPerBatch := flag.Int("num_txs_per_batch", 1000, "number of transactions to send per message")
flag.Parse() flag.Parse()
config := readConfigFile(*configFile) config := readConfigFile(*configFile)
// Testing node
dataNode := node.NewNode(&consensus.Consensus{})
dataNode.AddMoreFakeTransactions()
start := time.Now() start := time.Now()
totalTime := 60.0 totalTime := 60.0
txs := make([]blockchain.Transaction, *numTxsPerBatch)
leaders := getLeaders(&config) leaders := getLeaders(&config)
time.Sleep(5 * time.Second) // wait for nodes to run
for true { for true {
t := time.Now() t := time.Now()
if t.Sub(start).Seconds() >= totalTime { if t.Sub(start).Seconds() >= totalTime {
fmt.Println(int(t.Sub(start)), start, totalTime) fmt.Println(int(t.Sub(start)), start, totalTime)
break break
} }
for i := range txs { txsToSend := getNewFakeTransactions(&dataNode, *numTxsPerBatch)
txs[i] = newRandTransaction() msg := node.ConstructTransactionListMessage(txsToSend)
log.Debug("[Generator] Sending txs...", "numTxs", len(txsToSend))
}
msg := node.ConstructTransactionListMessage(txs)
log.Debug("[Generator] Sending txs to leader[s]", "numOfLeader", len(leaders))
p2p.BroadcastMessage(leaders, msg) p2p.BroadcastMessage(leaders, msg)
dataNode.UtxoPool.Update(txsToSend)
time.Sleep(1 * time.Second) // 10 transactions per second time.Sleep(1 * time.Second) // 10 transactions per second
} }
msg := node.ConstructStopMessage() msg := node.ConstructStopMessage()

@ -76,6 +76,7 @@ func main() {
consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader)
node := node.NewNode(&consensus) node := node.NewNode(&consensus)
node.AddMoreFakeTransactions()
if consensus.IsLeader { if consensus.IsLeader {
// Let consensus run // Let consensus run

@ -14,7 +14,20 @@ const (
type UTXOPool struct { type UTXOPool struct {
// Mapping from address to a map of transaction id to a map of the index of output // Mapping from address to a map of transaction id to a map of the index of output
// array in that transaction to that balance. // array in that transaction to that balance.
utxo map[string]map[string]map[int]int /*
The 3-d map's structure:
address - [
txId1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txId2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
UtxoMap map[string]map[string]map[int]int
} }
// VerifyTransactions verifies if a list of transactions valid. // VerifyTransactions verifies if a list of transactions valid.
@ -43,7 +56,7 @@ func (utxoPool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
spentTXOs[in.Address][inTxID][index] = true spentTXOs[in.Address][inTxID][index] = true
// Sum the balance up to the inTotal. // Sum the balance up to the inTotal.
if val, ok := utxoPool.utxo[in.Address][inTxID][index]; ok { if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok {
inTotal += val inTotal += val
} else { } else {
return false return false
@ -73,7 +86,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool {
inTxID := hex.EncodeToString(in.TxID) inTxID := hex.EncodeToString(in.TxID)
index := in.TxOutputIndex index := in.TxOutputIndex
// Check if the transaction with the addres is spent or not. // Check if the transaction with the addres is spent or not.
if val, ok := utxoPool.utxo[in.Address][inTxID][index]; ok { if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok {
inTotal += val inTotal += val
} else { } else {
return false return false
@ -113,19 +126,19 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// Remove // Remove
for _, in := range tx.TxInput { for _, in := range tx.TxInput {
inTxID := hex.EncodeToString(in.TxID) inTxID := hex.EncodeToString(in.TxID)
delete(utxoPool.utxo[in.Address][inTxID], in.TxOutputIndex) delete(utxoPool.UtxoMap[in.Address][inTxID], in.TxOutputIndex)
} }
// Update // Update
for index, out := range tx.TxOutput { for index, out := range tx.TxOutput {
if _, ok := utxoPool.utxo[out.Address]; !ok { if _, ok := utxoPool.UtxoMap[out.Address]; !ok {
utxoPool.utxo[out.Address] = make(map[string]map[int]int) utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int)
utxoPool.utxo[out.Address][txID] = make(map[int]int) utxoPool.UtxoMap[out.Address][txID] = make(map[int]int)
} }
if _, ok := utxoPool.utxo[out.Address][txID]; !ok { if _, ok := utxoPool.UtxoMap[out.Address][txID]; !ok {
utxoPool.utxo[out.Address][txID] = make(map[int]int) utxoPool.UtxoMap[out.Address][txID] = make(map[int]int)
} }
utxoPool.utxo[out.Address][txID][index] = out.Value utxoPool.UtxoMap[out.Address][txID][index] = out.Value
} }
} }
} }
@ -149,7 +162,7 @@ func (utxoPool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool {
return false return false
} }
// Update utxo balances with a list of new transactions. // Update Utxo balances with a list of new transactions.
func (utxoPool *UTXOPool) Update(transactions []*Transaction) { func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
if utxoPool != nil { if utxoPool != nil {
for _, tx := range transactions { for _, tx := range transactions {
@ -158,38 +171,38 @@ func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
// Remove // Remove
for _, in := range tx.TxInput { for _, in := range tx.TxInput {
inTxID := hex.EncodeToString(in.TxID) inTxID := hex.EncodeToString(in.TxID)
delete(utxoPool.utxo[in.Address][inTxID], in.TxOutputIndex) delete(utxoPool.UtxoMap[in.Address][inTxID], in.TxOutputIndex)
} }
// Update // Update
for index, out := range tx.TxOutput { for index, out := range tx.TxOutput {
if _, ok := utxoPool.utxo[out.Address]; !ok { if _, ok := utxoPool.UtxoMap[out.Address]; !ok {
utxoPool.utxo[out.Address] = make(map[string]map[int]int) utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int)
utxoPool.utxo[out.Address][curTxID] = make(map[int]int) utxoPool.UtxoMap[out.Address][curTxID] = make(map[int]int)
} }
if _, ok := utxoPool.utxo[out.Address][curTxID]; !ok { if _, ok := utxoPool.UtxoMap[out.Address][curTxID]; !ok {
utxoPool.utxo[out.Address][curTxID] = make(map[int]int) utxoPool.UtxoMap[out.Address][curTxID] = make(map[int]int)
} }
utxoPool.utxo[out.Address][curTxID][index] = out.Value utxoPool.UtxoMap[out.Address][curTxID][index] = out.Value
} }
} }
} }
} }
// CreateUTXOPoolFromTransaction a utxo pool from a genesis transaction. // CreateUTXOPoolFromTransaction a Utxo pool from a genesis transaction.
func CreateUTXOPoolFromTransaction(tx *Transaction) *UTXOPool { func CreateUTXOPoolFromTransaction(tx *Transaction) *UTXOPool {
var utxoPool UTXOPool var utxoPool UTXOPool
txID := hex.EncodeToString(tx.ID) txID := hex.EncodeToString(tx.ID)
utxoPool.utxo = make(map[string]map[string]map[int]int) utxoPool.UtxoMap = make(map[string]map[string]map[int]int)
for index, out := range tx.TxOutput { for index, out := range tx.TxOutput {
utxoPool.utxo[out.Address] = make(map[string]map[int]int) utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int)
utxoPool.utxo[out.Address][txID] = make(map[int]int) utxoPool.UtxoMap[out.Address][txID] = make(map[int]int)
utxoPool.utxo[out.Address][txID][index] = out.Value utxoPool.UtxoMap[out.Address][txID][index] = out.Value
} }
return &utxoPool return &utxoPool
} }
// CreateUTXOPoolFromGenesisBlockChain a utxo pool from a genesis blockchain. // CreateUTXOPoolFromGenesisBlockChain a Utxo pool from a genesis blockchain.
func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool { func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool {
tx := bc.Blocks[0].Transactions[0] tx := bc.Blocks[0].Transactions[0]
return CreateUTXOPoolFromTransaction(tx) return CreateUTXOPoolFromTransaction(tx)
@ -211,7 +224,7 @@ func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transact
// Used for debugging. // Used for debugging.
func (utxoPool *UTXOPool) String() string { func (utxoPool *UTXOPool) String() string {
res := "" res := ""
for address, v1 := range utxoPool.utxo { for address, v1 := range utxoPool.UtxoMap {
for txid, v2 := range v1 { for txid, v2 := range v1 {
for index, value := range v2 { for index, value := range v2 {
res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value) res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value)

@ -21,12 +21,17 @@ const (
) )
//ConstructTransactionListMessage constructs serialized transactions //ConstructTransactionListMessage constructs serialized transactions
func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte { func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION)) byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(SEND)) byteBuffer.WriteByte(byte(SEND))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(transactions) // Copy over the tx data
txs := make([]blockchain.Transaction, len(transactions))
for i := range txs {
txs[i] = *transactions[i]
}
encoder.Encode(txs)
return byteBuffer.Bytes() return byteBuffer.Bytes()
} }

@ -7,6 +7,7 @@ import (
"net" "net"
"os" "os"
"sync" "sync"
"strconv"
) )
var pendingTxMutex = &sync.Mutex{} var pendingTxMutex = &sync.Mutex{}
@ -18,7 +19,7 @@ type Node struct {
pendingTransactions []*blockchain.Transaction pendingTransactions []*blockchain.Transaction
transactionInConsensus []*blockchain.Transaction transactionInConsensus []*blockchain.Transaction
blockchain *blockchain.Blockchain blockchain *blockchain.Blockchain
utxoPool *blockchain.UTXOPool UtxoPool *blockchain.UTXOPool
log log.Logger log log.Logger
} }
@ -30,7 +31,7 @@ func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) {
func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction { func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction {
pendingTxMutex.Lock() pendingTxMutex.Lock()
selected, unselected := node.utxoPool.SelectTransactionsForNewBlock(node.pendingTransactions) selected, unselected := node.UtxoPool.SelectTransactionsForNewBlock(node.pendingTransactions)
node.pendingTransactions = unselected node.pendingTransactions = unselected
pendingTxMutex.Unlock() pendingTxMutex.Unlock()
return selected return selected
@ -38,6 +39,7 @@ func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction {
// Start a server and process the request by a handler. // Start a server and process the request by a handler.
func (node *Node) StartServer(port string) { func (node *Node) StartServer(port string) {
node.log.Debug("Starting server", "node", node)
node.listenOnPort(port) node.listenOnPort(port)
} }
@ -62,6 +64,18 @@ func (node *Node) String() string {
return node.consensus.String() return node.consensus.String()
} }
// Testing code. Should be deleted for production
// Create in genesis block 1000 transactions assigning 1000 token to each address in [1 - 1000]
func (node *Node) AddMoreFakeTransactions() {
txs := make([]*blockchain.Transaction, 1000)
for i := range txs {
txs[i] = blockchain.NewCoinbaseTX(strconv.Itoa(i), "")
}
node.blockchain.Blocks[0].Transactions = append(node.blockchain.Blocks[0].Transactions, txs...)
node.UtxoPool.Update(txs)
}
// Create a new Node // Create a new Node
func NewNode(consensus *consensus.Consensus) Node { func NewNode(consensus *consensus.Consensus) Node {
node := Node{} node := Node{}
@ -78,10 +92,9 @@ func NewNode(consensus *consensus.Consensus) Node {
node.blockchain = genesisBlock node.blockchain = genesisBlock
// UTXO pool from Genesis block // UTXO pool from Genesis block
node.utxoPool = blockchain.CreateUTXOPoolFromGenesisBlockChain(node.blockchain) node.UtxoPool = blockchain.CreateUTXOPoolFromGenesisBlockChain(node.blockchain)
// Logger // Logger
node.log = node.consensus.Log node.log = node.consensus.Log
node.log.Debug("New node", "node", node)
return node return node
} }

@ -123,14 +123,14 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
// create a new block // create a new block
newBlock := new(blockchain.Block) newBlock := new(blockchain.Block)
for { for {
if len(node.pendingTransactions) >= 100 { if len(node.pendingTransactions) >= 10 {
selectedTxs := node.getTransactionsForNewBlock() selectedTxs := node.getTransactionsForNewBlock()
if len(selectedTxs) == 0 { if len(selectedTxs) == 0 {
node.log.Debug("No transactions is selected for consensus", "pendingTx", len(node.pendingTransactions)) node.log.Debug("No transactions is selected for consensus", "pendingTx", len(node.pendingTransactions))
} else { } else {
node.log.Debug("Creating new block", "node", node) node.log.Debug("Creating new block", "numTxs", len(selectedTxs), "pendingTxs", len(node.pendingTransactions))
newBlock = blockchain.NewBlock(selectedTxs, []byte{}) newBlock = blockchain.NewBlock(selectedTxs, []byte{})
node.log.Debug("Num of Pending Tx", "count", len(node.pendingTransactions))
break break
} }
} }

@ -28,7 +28,7 @@ func TestNewNewNode(test *testing.T) {
test.Error("Coinbase TX is not initialized for the node") test.Error("Coinbase TX is not initialized for the node")
} }
if node.utxoPool == nil { if node.UtxoPool == nil {
test.Error("Utxo pool is not initialized for the node") test.Error("Utxo pool is not initialized for the node")
} }
} }

Loading…
Cancel
Save