Fix cross shard tx unlocking issue, fix txgen random shardId duplication issue, add some more utility funcs in utxoPool

pull/69/head
Rongjian Lan 6 years ago
parent 23f953492d
commit e4c6c303c8
  1. 78
      blockchain/utxopool.go
  2. 1
      client/client.go
  3. 44
      client/txgen/main.go
  4. 12
      node/node_handler.go

@ -115,8 +115,9 @@ func (utxoPool *UTXOPool) VerifyStateBlock(stateBlock *Block) bool {
} }
// VerifyOneTransaction verifies if a list of transactions valid. // VerifyOneTransaction verifies if a list of transactions valid.
// Add another sanity check function (e.g. spending the same utxo) called before this one.
func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (err error, crossShard bool) { func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (err error, crossShard bool) {
if len(tx.Proofs) != 0 { if len(tx.Proofs) > 1 {
return utxoPool.VerifyUnlockTransaction(tx) return utxoPool.VerifyUnlockTransaction(tx)
} }
@ -223,7 +224,7 @@ func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
// UpdateOneTransaction updates utxoPool in respect to the new Transaction. // UpdateOneTransaction updates utxoPool in respect to the new Transaction.
func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) { func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
isUnlockTx := len(tx.Proofs) != 0 isUnlockTx := len(tx.Proofs) > 1
unlockToCommit := true unlockToCommit := true
if isUnlockTx { if isUnlockTx {
for _, proof := range tx.Proofs { for _, proof := range tx.Proofs {
@ -286,7 +287,6 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:]) inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
if _, ok := utxoPool.LockedUtxoMap[in.Address]; !ok { if _, ok := utxoPool.LockedUtxoMap[in.Address]; !ok {
utxoPool.LockedUtxoMap[in.Address] = make(TXHash2Vout2AmountMap) utxoPool.LockedUtxoMap[in.Address] = make(TXHash2Vout2AmountMap)
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(Vout2AmountMap)
} }
if _, ok := utxoPool.LockedUtxoMap[in.Address][inTxID]; !ok { if _, ok := utxoPool.LockedUtxoMap[in.Address][inTxID]; !ok {
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(Vout2AmountMap) utxoPool.LockedUtxoMap[in.Address][inTxID] = make(Vout2AmountMap)
@ -300,16 +300,17 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// Update // Update
if !isCrossShard || isUnlockTx { if !isCrossShard || isUnlockTx {
if !unlockToCommit { if !unlockToCommit {
if isValidCrossShard { // unlock-to-abort, bring back (unlock) the utxo input
// unlock-to-abort, bring back (unlock) the utxo input for _, in := range tx.TxInput {
for _, in := range tx.TxInput { // Only unlock the input for my own shard.
// Only unlock the input for my own shard. if in.ShardID != utxoPool.ShardID {
if in.ShardID != utxoPool.ShardID { continue
continue }
}
// Simply bring back the locked (removed) utxo inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
if utxoPool.LockedUtxoExists(in.Address, inTxID, in.PreviousOutPoint.Index) {
// bring back the locked (removed) utxo
if _, ok := utxoPool.UtxoMap[in.Address]; !ok { if _, ok := utxoPool.UtxoMap[in.Address]; !ok {
utxoPool.UtxoMap[in.Address] = make(TXHash2Vout2AmountMap) utxoPool.UtxoMap[in.Address] = make(TXHash2Vout2AmountMap)
utxoPool.UtxoMap[in.Address][inTxID] = make(Vout2AmountMap) utxoPool.UtxoMap[in.Address][inTxID] = make(Vout2AmountMap)
@ -339,6 +340,17 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
} }
utxoPool.UtxoMap[out.Address][txID][uint32(index)] = out.Amount utxoPool.UtxoMap[out.Address][txID][uint32(index)] = out.Amount
} }
if isUnlockTx { // for unlock-to-commit transaction, also need to delete the locked utxo
for _, in := range tx.TxInput {
// Only unlock the input for my own shard.
if in.ShardID != utxoPool.ShardID {
continue
}
inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
utxoPool.DeleteOneLockedUtxo(in.Address, inTxID, in.PreviousOutPoint.Index)
}
}
} }
} // If it's a cross shard locking Tx, then don't update so the input UTXOs are locked (removed), and the money is not spendable until unlock-to-commit or unlock-to-abort } // If it's a cross shard locking Tx, then don't update so the input UTXOs are locked (removed), and the money is not spendable until unlock-to-commit or unlock-to-abort
} }
@ -397,12 +409,13 @@ func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transact
if len(selected) < maxNumTxs { if len(selected) < maxNumTxs {
if err == nil || crossShard { if err == nil || crossShard {
selected = append(selected, tx)
if crossShard { if crossShard {
proof := CrossShardTxProof{Accept: err == nil, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardID)} proof := CrossShardTxProof{Accept: err == nil, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardID)}
txAndProof := CrossShardTxAndProof{tx, &proof} txAndProof := CrossShardTxAndProof{tx, &proof}
crossShardTxs = append(crossShardTxs, &txAndProof) crossShardTxs = append(crossShardTxs, &txAndProof)
tx.Proofs = append(tx.Proofs, proof)
} }
selected = append(selected, tx)
} else { } else {
invalid = append(invalid, tx) invalid = append(invalid, tx)
} }
@ -434,6 +447,23 @@ func (utxoPool *UTXOPool) DeleteOneUtxo(address [20]byte, txID string, index uin
} }
} }
// DeleteOneBalanceItem deletes one balance item of UTXOPool and clean up if possible.
func (utxoPool *UTXOPool) LockedUtxoExists(address [20]byte, txID string, index uint32) bool {
_, ok := utxoPool.LockedUtxoMap[address]
if !ok {
return false
}
_, ok = utxoPool.LockedUtxoMap[address][txID]
if !ok {
return false
}
_, ok = utxoPool.LockedUtxoMap[address][txID][index]
if !ok {
return false
}
return true
}
// DeleteOneBalanceItem deletes one balance item of UTXOPool and clean up if possible. // DeleteOneBalanceItem deletes one balance item of UTXOPool and clean up if possible.
func (utxoPool *UTXOPool) DeleteOneLockedUtxo(address [20]byte, txID string, index uint32) { func (utxoPool *UTXOPool) DeleteOneLockedUtxo(address [20]byte, txID string, index uint32) {
delete(utxoPool.LockedUtxoMap[address][txID], index) delete(utxoPool.LockedUtxoMap[address][txID], index)
@ -466,8 +496,17 @@ func (utxoPool *UTXOPool) CleanUp() {
// Used for debugging. // Used for debugging.
func (utxoPool *UTXOPool) String() string { func (utxoPool *UTXOPool) String() string {
return printUtxos(&utxoPool.UtxoMap)
}
// Used for debugging.
func (utxoPool *UTXOPool) StringOfLockedUtxos() string {
return printUtxos(&utxoPool.LockedUtxoMap)
}
func printUtxos(utxos *UtxoMap) string {
res := "" res := ""
for address, v1 := range utxoPool.UtxoMap { for address, v1 := range *utxos {
for txid, v2 := range v1 { for txid, v2 := range v1 {
for index, value := range v2 { for index, value := range v2 {
res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value) res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value)
@ -490,8 +529,17 @@ func (utxoPool *UTXOPool) GetSizeInByteOfUtxoMap() int {
// A utility func that counts the total number of utxos in a pool. // A utility func that counts the total number of utxos in a pool.
func (utxoPool *UTXOPool) CountNumOfUtxos() int { func (utxoPool *UTXOPool) CountNumOfUtxos() int {
return countNumOfUtxos(&utxoPool.UtxoMap)
}
// A utility func that counts the total number of locked utxos in a pool.
func (utxoPool *UTXOPool) CountNumOfLockedUtxos() int {
return countNumOfUtxos(&utxoPool.LockedUtxoMap)
}
func countNumOfUtxos(utxos *UtxoMap) int {
countAll := 0 countAll := 0
for _, utxoMap := range utxoPool.UtxoMap { for _, utxoMap := range *utxos {
for txIdStr, val := range utxoMap { for txIdStr, val := range utxoMap {
_, err := hex.DecodeString(txIdStr) _, err := hex.DecodeString(txIdStr)
if err != nil { if err != nil {

@ -57,6 +57,7 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) {
func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) { func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) {
txsToSend := []blockchain.Transaction{} txsToSend := []blockchain.Transaction{}
//fmt.Printf("PENDING CLIENT TX - %d\n", len(client.PendingCrossTxs))
// Loop through the newly received list of proofs // Loop through the newly received list of proofs
client.PendingCrossTxsMutex.Lock() client.PendingCrossTxsMutex.Lock()
for _, proof := range *proofs { for _, proof := range *proofs {

@ -73,7 +73,7 @@ type TxInfo struct {
// Returns: // Returns:
// all single-shard txs // all single-shard txs
// all cross-shard txs // all cross-shard txs
func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) { func generateSimulatedTransactions(subsetId, numSubset int, shardId int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/* /*
UTXO map structure: UTXO map structure:
address - [ address - [
@ -90,13 +90,13 @@ func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNod
utxoPoolMutex.Lock() utxoPoolMutex.Lock()
txInfo := TxInfo{} txInfo := TxInfo{}
txInfo.shardID = shardID txInfo.shardID = shardId
txInfo.dataNodes = dataNodes txInfo.dataNodes = dataNodes
txInfo.txCount = 0 txInfo.txCount = 0
UTXOLOOP: UTXOLOOP:
// Loop over all addresses // Loop over all addresses
for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { for address, txMap := range dataNodes[shardId].UtxoPool.UtxoMap {
if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time
txInfo.address = address txInfo.address = address
// Loop over all txIds for the address // Loop over all txIds for the address
@ -115,18 +115,22 @@ UTXOLOOP:
randNum := rand.Intn(100) randNum := rand.Intn(100)
if setting.crossShard && randNum < 30 { // 1/3 cross shard transactions: add another txinput from another shard if randNum < 30 {
generateCrossShardTx(&txInfo) if setting.crossShard && randNum < 10 { // 1/3 cross shard transactions: add another txinput from another shard
} else { generateCrossShardTx(&txInfo)
generateSingleShardTx(&txInfo) } else {
} generateSingleShardTx(&txInfo)
if txInfo.txCount >= setting.maxNumTxsPerBatch { }
break UTXOLOOP if txInfo.txCount >= setting.maxNumTxsPerBatch {
break UTXOLOOP
}
} }
} }
} }
} }
} }
//fmt.Printf("UTXO CLIENT - %d\n", shardId)
//fmt.Println(dataNodes[shardId].UtxoPool.CountNumOfUtxos())
utxoPoolMutex.Unlock() utxoPoolMutex.Unlock()
log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs)) log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs))
return txInfo.txs, txInfo.crossTxs return txInfo.txs, txInfo.crossTxs
@ -134,8 +138,14 @@ UTXOLOOP:
func generateCrossShardTx(txInfo *TxInfo) { func generateCrossShardTx(txInfo *TxInfo) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
crossShardId := nodeShardID
// a random shard to spend money to // a random shard to spend money to
crossShardId := rand.Intn(len(txInfo.dataNodes)) for true {
crossShardId = uint32(rand.Intn(len(txInfo.dataNodes)))
if crossShardId != nodeShardID {
break
}
}
crossShardNode := txInfo.dataNodes[crossShardId] crossShardNode := txInfo.dataNodes[crossShardId]
crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address] crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address]
@ -155,7 +165,7 @@ func generateCrossShardTx(txInfo *TxInfo) {
for crossShardIndex, crossShardValue := range crossShardUtxos { for crossShardIndex, crossShardValue := range crossShardUtxos {
crossUtxoValue = crossShardValue crossUtxoValue = crossShardValue
crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, uint32(crossShardId)) crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, crossShardId)
break break
} }
if crossTxin != nil { if crossTxin != nil {
@ -179,7 +189,7 @@ func generateCrossShardTx(txInfo *TxInfo) {
// Spend the utxo from the other shard, if any, to a random address in [0 - N) // Spend the utxo from the other shard, if any, to a random address in [0 - N)
if crossTxin != nil { if crossTxin != nil {
crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: uint32(crossShardId)} crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardId}
txOutputs = append(txOutputs, crossTxout) txOutputs = append(txOutputs, crossTxout)
} }
@ -231,7 +241,7 @@ func printVersion(me string) {
func main() { func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 10000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately") numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately")
duration := flag.Int("duration", 60, "duration of the tx generation in second") duration := flag.Int("duration", 60, "duration of the tx generation in second")
@ -320,7 +330,7 @@ func main() {
constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort) constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort)
} }
batchCounter++ batchCounter++
time.Sleep(2000 * time.Millisecond) time.Sleep(5000 * time.Millisecond)
} }
// Send a stop message to stop the nodes at the end // Send a stop message to stop the nodes at the end
@ -337,13 +347,15 @@ func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p
txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes) txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...) allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs)) log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessage(txs) msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg) p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch // Note cross shard txs are later sent in batch
if len(allCrossTxs) > 0 { if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs)) log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
//fmt.Printf("SENDING CLIENT TXS: %d\n", shardId)
//fmt.Println(allCrossTxs)
msg := proto_node.ConstructTransactionListMessage(allCrossTxs) msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg) p2p.BroadcastMessage(leaders, msg)

@ -22,7 +22,7 @@ const (
// The max number of transaction per a block. // The max number of transaction per a block.
MaxNumberOfTransactionsPerBlock = 10000 MaxNumberOfTransactionsPerBlock = 10000
// The number of blocks allowed before generating state block // The number of blocks allowed before generating state block
NumBlocksBeforeStateBlock = 100 NumBlocksBeforeStateBlock = 1000
) )
// NodeHandler handles a new incoming connection. // NodeHandler handles a new incoming connection.
@ -360,4 +360,14 @@ func (node *Node) UpdateUtxoAndState(newBlock *blockchain.Block) {
} }
// Clear transaction-in-Consensus list // Clear transaction-in-Consensus list
node.transactionInConsensus = []*blockchain.Transaction{} node.transactionInConsensus = []*blockchain.Transaction{}
//if node.Consensus.IsLeader {
// fmt.Printf("TX in New BLOCK - %d %s\n", node.UtxoPool.ShardID, newBlock.IsStateBlock())
// //fmt.Println(newBlock.Transactions)
// fmt.Printf("LEADER CURRENT UTXO - %d\n", node.UtxoPool.ShardID)
// fmt.Println(node.UtxoPool.CountNumOfUtxos())
// //fmt.Println(node.UtxoPool)
// fmt.Printf("LEADER LOCKED UTXO - %d\n", node.UtxoPool.ShardID)
// fmt.Println(node.UtxoPool.CountNumOfLockedUtxos())
// //fmt.Println(node.UtxoPool.StringOfLockedUtxos())
//}
} }

Loading…
Cancel
Save