Richard Liu 6 years ago
commit 66a3f2bbf0
  1. 20
      blockchain/merkle_tree.go
  2. 21
      blockchain/merkle_tree_test.go
  3. 78
      blockchain/utxopool.go
  4. 1
      client/client.go
  5. 28
      client/txgen/main.go
  6. 2
      go_executable_build.sh
  7. 12
      node/node_handler.go
  8. 6
      utils/utils.go

@ -18,6 +18,9 @@ type MerkleNode struct {
// NewMerkleTree creates a new Merkle tree from a sequence of data // NewMerkleTree creates a new Merkle tree from a sequence of data
func NewMerkleTree(data [][]byte) *MerkleTree { func NewMerkleTree(data [][]byte) *MerkleTree {
if len(data) == 0 {
return nil
}
var nodes []*MerkleNode var nodes []*MerkleNode
for _, datum := range data { for _, datum := range data {
@ -49,15 +52,16 @@ func NewMerkleTree(data [][]byte) *MerkleTree {
func NewMerkleNode(left, right *MerkleNode, data []byte) *MerkleNode { func NewMerkleNode(left, right *MerkleNode, data []byte) *MerkleNode {
mNode := MerkleNode{} mNode := MerkleNode{}
if left == nil && right == nil { prevHashes := []byte{}
hash := sha256.Sum256(data) if left != nil {
mNode.Data = hash[:] prevHashes = append(prevHashes, left.Data...)
} else {
prevHashes := append(left.Data, right.Data...)
hash := sha256.Sum256(prevHashes)
mNode.Data = hash[:]
} }
if right != nil {
prevHashes = append(prevHashes, right.Data...)
}
prevHashes = append(prevHashes, data...)
hash := sha256.Sum256(prevHashes)
mNode.Data = hash[:]
mNode.Left = left mNode.Left = left
mNode.Right = right mNode.Right = right

@ -13,6 +13,7 @@ func TestNewMerkleNode(t *testing.T) {
[]byte("node3"), []byte("node3"),
} }
fmt.Println("TEting")
// Level 1 // Level 1
n1 := NewMerkleNode(nil, nil, data[0]) n1 := NewMerkleNode(nil, nil, data[0])
@ -59,3 +60,23 @@ func TestNewMerkleTree(t *testing.T) {
t.Errorf("Merkle tree root hash is incorrect") t.Errorf("Merkle tree root hash is incorrect")
} }
} }
func TestNewMerkleTree2(t *testing.T) {
data := [][]byte{
[]byte("node1"),
[]byte("node2"),
}
// Level 1
n1 := NewMerkleNode(nil, nil, data[0])
n2 := NewMerkleNode(nil, nil, data[1])
// Level 2
n3 := NewMerkleNode(n1, n2, nil)
rootHash := fmt.Sprintf("%x", n3.Data)
mTree := NewMerkleTree(data)
if rootHash != fmt.Sprintf("%x", mTree.RootNode.Data) {
t.Errorf("Merkle tree root hash is incorrect")
}
}

@ -115,8 +115,9 @@ func (utxoPool *UTXOPool) VerifyStateBlock(stateBlock *Block) bool {
} }
// VerifyOneTransaction verifies if a list of transactions valid. // VerifyOneTransaction verifies if a list of transactions valid.
// Add another sanity check function (e.g. spending the same utxo) called before this one.
func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (err error, crossShard bool) { func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (err error, crossShard bool) {
if len(tx.Proofs) != 0 { if len(tx.Proofs) > 1 {
return utxoPool.VerifyUnlockTransaction(tx) return utxoPool.VerifyUnlockTransaction(tx)
} }
@ -223,7 +224,7 @@ func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
// UpdateOneTransaction updates utxoPool in respect to the new Transaction. // UpdateOneTransaction updates utxoPool in respect to the new Transaction.
func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) { func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
isUnlockTx := len(tx.Proofs) != 0 isUnlockTx := len(tx.Proofs) > 1
unlockToCommit := true unlockToCommit := true
if isUnlockTx { if isUnlockTx {
for _, proof := range tx.Proofs { for _, proof := range tx.Proofs {
@ -286,7 +287,6 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:]) inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
if _, ok := utxoPool.LockedUtxoMap[in.Address]; !ok { if _, ok := utxoPool.LockedUtxoMap[in.Address]; !ok {
utxoPool.LockedUtxoMap[in.Address] = make(TXHash2Vout2AmountMap) utxoPool.LockedUtxoMap[in.Address] = make(TXHash2Vout2AmountMap)
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(Vout2AmountMap)
} }
if _, ok := utxoPool.LockedUtxoMap[in.Address][inTxID]; !ok { if _, ok := utxoPool.LockedUtxoMap[in.Address][inTxID]; !ok {
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(Vout2AmountMap) utxoPool.LockedUtxoMap[in.Address][inTxID] = make(Vout2AmountMap)
@ -300,16 +300,17 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// Update // Update
if !isCrossShard || isUnlockTx { if !isCrossShard || isUnlockTx {
if !unlockToCommit { if !unlockToCommit {
if isValidCrossShard { // unlock-to-abort, bring back (unlock) the utxo input
// unlock-to-abort, bring back (unlock) the utxo input for _, in := range tx.TxInput {
for _, in := range tx.TxInput { // Only unlock the input for my own shard.
// Only unlock the input for my own shard. if in.ShardID != utxoPool.ShardID {
if in.ShardID != utxoPool.ShardID { continue
continue }
}
// Simply bring back the locked (removed) utxo inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
if utxoPool.LockedUtxoExists(in.Address, inTxID, in.PreviousOutPoint.Index) {
// bring back the locked (removed) utxo
if _, ok := utxoPool.UtxoMap[in.Address]; !ok { if _, ok := utxoPool.UtxoMap[in.Address]; !ok {
utxoPool.UtxoMap[in.Address] = make(TXHash2Vout2AmountMap) utxoPool.UtxoMap[in.Address] = make(TXHash2Vout2AmountMap)
utxoPool.UtxoMap[in.Address][inTxID] = make(Vout2AmountMap) utxoPool.UtxoMap[in.Address][inTxID] = make(Vout2AmountMap)
@ -339,6 +340,17 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
} }
utxoPool.UtxoMap[out.Address][txID][uint32(index)] = out.Amount utxoPool.UtxoMap[out.Address][txID][uint32(index)] = out.Amount
} }
if isUnlockTx { // for unlock-to-commit transaction, also need to delete the locked utxo
for _, in := range tx.TxInput {
// Only unlock the input for my own shard.
if in.ShardID != utxoPool.ShardID {
continue
}
inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
utxoPool.DeleteOneLockedUtxo(in.Address, inTxID, in.PreviousOutPoint.Index)
}
}
} }
} // If it's a cross shard locking Tx, then don't update so the input UTXOs are locked (removed), and the money is not spendable until unlock-to-commit or unlock-to-abort } // If it's a cross shard locking Tx, then don't update so the input UTXOs are locked (removed), and the money is not spendable until unlock-to-commit or unlock-to-abort
} }
@ -397,12 +409,13 @@ func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transact
if len(selected) < maxNumTxs { if len(selected) < maxNumTxs {
if err == nil || crossShard { if err == nil || crossShard {
selected = append(selected, tx)
if crossShard { if crossShard {
proof := CrossShardTxProof{Accept: err == nil, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardID)} proof := CrossShardTxProof{Accept: err == nil, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardID)}
txAndProof := CrossShardTxAndProof{tx, &proof} txAndProof := CrossShardTxAndProof{tx, &proof}
crossShardTxs = append(crossShardTxs, &txAndProof) crossShardTxs = append(crossShardTxs, &txAndProof)
tx.Proofs = append(tx.Proofs, proof)
} }
selected = append(selected, tx)
} else { } else {
invalid = append(invalid, tx) invalid = append(invalid, tx)
} }
@ -434,6 +447,23 @@ func (utxoPool *UTXOPool) DeleteOneUtxo(address [20]byte, txID string, index uin
} }
} }
// DeleteOneBalanceItem deletes one balance item of UTXOPool and clean up if possible.
func (utxoPool *UTXOPool) LockedUtxoExists(address [20]byte, txID string, index uint32) bool {
_, ok := utxoPool.LockedUtxoMap[address]
if !ok {
return false
}
_, ok = utxoPool.LockedUtxoMap[address][txID]
if !ok {
return false
}
_, ok = utxoPool.LockedUtxoMap[address][txID][index]
if !ok {
return false
}
return true
}
// DeleteOneBalanceItem deletes one balance item of UTXOPool and clean up if possible. // DeleteOneBalanceItem deletes one balance item of UTXOPool and clean up if possible.
func (utxoPool *UTXOPool) DeleteOneLockedUtxo(address [20]byte, txID string, index uint32) { func (utxoPool *UTXOPool) DeleteOneLockedUtxo(address [20]byte, txID string, index uint32) {
delete(utxoPool.LockedUtxoMap[address][txID], index) delete(utxoPool.LockedUtxoMap[address][txID], index)
@ -466,8 +496,17 @@ func (utxoPool *UTXOPool) CleanUp() {
// Used for debugging. // Used for debugging.
func (utxoPool *UTXOPool) String() string { func (utxoPool *UTXOPool) String() string {
return printUtxos(&utxoPool.UtxoMap)
}
// Used for debugging.
func (utxoPool *UTXOPool) StringOfLockedUtxos() string {
return printUtxos(&utxoPool.LockedUtxoMap)
}
func printUtxos(utxos *UtxoMap) string {
res := "" res := ""
for address, v1 := range utxoPool.UtxoMap { for address, v1 := range *utxos {
for txid, v2 := range v1 { for txid, v2 := range v1 {
for index, value := range v2 { for index, value := range v2 {
res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value) res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value)
@ -490,8 +529,17 @@ func (utxoPool *UTXOPool) GetSizeInByteOfUtxoMap() int {
// A utility func that counts the total number of utxos in a pool. // A utility func that counts the total number of utxos in a pool.
func (utxoPool *UTXOPool) CountNumOfUtxos() int { func (utxoPool *UTXOPool) CountNumOfUtxos() int {
return countNumOfUtxos(&utxoPool.UtxoMap)
}
// A utility func that counts the total number of locked utxos in a pool.
func (utxoPool *UTXOPool) CountNumOfLockedUtxos() int {
return countNumOfUtxos(&utxoPool.LockedUtxoMap)
}
func countNumOfUtxos(utxos *UtxoMap) int {
countAll := 0 countAll := 0
for _, utxoMap := range utxoPool.UtxoMap { for _, utxoMap := range *utxos {
for txIdStr, val := range utxoMap { for txIdStr, val := range utxoMap {
_, err := hex.DecodeString(txIdStr) _, err := hex.DecodeString(txIdStr)
if err != nil { if err != nil {

@ -57,6 +57,7 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) {
func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) { func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) {
txsToSend := []blockchain.Transaction{} txsToSend := []blockchain.Transaction{}
//fmt.Printf("PENDING CLIENT TX - %d\n", len(client.PendingCrossTxs))
// Loop through the newly received list of proofs // Loop through the newly received list of proofs
client.PendingCrossTxsMutex.Lock() client.PendingCrossTxsMutex.Lock()
for _, proof := range *proofs { for _, proof := range *proofs {

@ -74,7 +74,7 @@ type TxInfo struct {
// Returns: // Returns:
// all single-shard txs // all single-shard txs
// all cross-shard txs // all cross-shard txs
func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) { func generateSimulatedTransactions(subsetId, numSubset int, shardId int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/* /*
UTXO map structure: UTXO map structure:
address - [ address - [
@ -91,13 +91,13 @@ func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNod
utxoPoolMutex.Lock() utxoPoolMutex.Lock()
txInfo := TxInfo{} txInfo := TxInfo{}
txInfo.shardID = shardID txInfo.shardID = shardId
txInfo.dataNodes = dataNodes txInfo.dataNodes = dataNodes
txInfo.txCount = 0 txInfo.txCount = 0
UTXOLOOP: UTXOLOOP:
// Loop over all addresses // Loop over all addresses
for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { for address, txMap := range dataNodes[shardId].UtxoPool.UtxoMap {
if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time
txInfo.address = address txInfo.address = address
// Loop over all txIds for the address // Loop over all txIds for the address
@ -128,6 +128,8 @@ UTXOLOOP:
} }
} }
} }
//fmt.Printf("UTXO CLIENT - %d\n", shardId)
//fmt.Println(dataNodes[shardId].UtxoPool.CountNumOfUtxos())
utxoPoolMutex.Unlock() utxoPoolMutex.Unlock()
log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs)) log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs))
return txInfo.txs, txInfo.crossTxs return txInfo.txs, txInfo.crossTxs
@ -135,8 +137,14 @@ UTXOLOOP:
func generateCrossShardTx(txInfo *TxInfo) { func generateCrossShardTx(txInfo *TxInfo) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
crossShardId := nodeShardID
// a random shard to spend money to // a random shard to spend money to
crossShardId := rand.Intn(len(txInfo.dataNodes)) for true {
crossShardId = uint32(rand.Intn(len(txInfo.dataNodes)))
if crossShardId != nodeShardID {
break
}
}
crossShardNode := txInfo.dataNodes[crossShardId] crossShardNode := txInfo.dataNodes[crossShardId]
crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address] crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address]
@ -156,7 +164,7 @@ func generateCrossShardTx(txInfo *TxInfo) {
for crossShardIndex, crossShardValue := range crossShardUtxos { for crossShardIndex, crossShardValue := range crossShardUtxos {
crossUtxoValue = crossShardValue crossUtxoValue = crossShardValue
crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, uint32(crossShardId)) crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, crossShardId)
break break
} }
if crossTxin != nil { if crossTxin != nil {
@ -180,7 +188,7 @@ func generateCrossShardTx(txInfo *TxInfo) {
// Spend the utxo from the other shard, if any, to a random address in [0 - N) // Spend the utxo from the other shard, if any, to a random address in [0 - N)
if crossTxin != nil { if crossTxin != nil {
crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: uint32(crossShardId)} crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardId}
txOutputs = append(txOutputs, crossTxout) txOutputs = append(txOutputs, crossTxout)
} }
@ -232,7 +240,7 @@ func printVersion(me string) {
func main() { func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 10000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately") numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately")
duration := flag.Int("duration", 60, "duration of the tx generation in second. If it's negative, the experiment runs forever.") duration := flag.Int("duration", 60, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
@ -323,7 +331,7 @@ func main() {
constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort) constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort)
} }
batchCounter++ batchCounter++
time.Sleep(2000 * time.Millisecond) time.Sleep(5000 * time.Millisecond)
} }
// Send a stop message to stop the nodes at the end // Send a stop message to stop the nodes at the end
@ -340,13 +348,15 @@ func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p
txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes) txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...) allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs)) log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessage(txs) msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg) p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch // Note cross shard txs are later sent in batch
if len(allCrossTxs) > 0 { if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs)) log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
//fmt.Printf("SENDING CLIENT TXS: %d\n", shardId)
//fmt.Println(allCrossTxs)
msg := proto_node.ConstructTransactionListMessage(allCrossTxs) msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg) p2p.BroadcastMessage(leaders, msg)

@ -10,7 +10,7 @@ BINDIR=bin
BUCKET=unique-bucket-bin BUCKET=unique-bucket-bin
GOOS=linux GOOS=linux
GOARCH=amd64 GOARCH=amd64
FOLDER= FOLDER=/$(whoami)
function usage function usage
{ {

@ -22,7 +22,7 @@ const (
// The max number of transaction per a block. // The max number of transaction per a block.
MaxNumberOfTransactionsPerBlock = 10000 MaxNumberOfTransactionsPerBlock = 10000
// The number of blocks allowed before generating state block // The number of blocks allowed before generating state block
NumBlocksBeforeStateBlock = 100 NumBlocksBeforeStateBlock = 1000
) )
// NodeHandler handles a new incoming connection. // NodeHandler handles a new incoming connection.
@ -360,4 +360,14 @@ func (node *Node) UpdateUtxoAndState(newBlock *blockchain.Block) {
} }
// Clear transaction-in-Consensus list // Clear transaction-in-Consensus list
node.transactionInConsensus = []*blockchain.Transaction{} node.transactionInConsensus = []*blockchain.Transaction{}
//if node.Consensus.IsLeader {
// fmt.Printf("TX in New BLOCK - %d %s\n", node.UtxoPool.ShardID, newBlock.IsStateBlock())
// //fmt.Println(newBlock.Transactions)
// fmt.Printf("LEADER CURRENT UTXO - %d\n", node.UtxoPool.ShardID)
// fmt.Println(node.UtxoPool.CountNumOfUtxos())
// //fmt.Println(node.UtxoPool)
// fmt.Printf("LEADER LOCKED UTXO - %d\n", node.UtxoPool.ShardID)
// fmt.Println(node.UtxoPool.CountNumOfLockedUtxos())
// //fmt.Println(node.UtxoPool.StringOfLockedUtxos())
//}
} }

@ -3,7 +3,10 @@ package utils
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors"
"fmt"
"log" "log"
"os"
"os/exec" "os/exec"
"regexp" "regexp"
"strconv" "strconv"
@ -34,6 +37,9 @@ func GetUniqueIdFromPeer(peer p2p.Peer) uint16 {
// RunCmd Runs command `name` with arguments `args` // RunCmd Runs command `name` with arguments `args`
func RunCmd(name string, args ...string) error { func RunCmd(name string, args ...string) error {
if _, err := os.Stat(name); os.IsNotExist(err) {
return errors.New(fmt.Sprintf("file: %v doesn't exist", name))
}
cmd := exec.Command(name, args...) cmd := exec.Command(name, args...)
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
log.Fatal(err) log.Fatal(err)

Loading…
Cancel
Save