Fix utxopool udpate bug for invalid cross shard tx (finally the consensus is stable with 30% cross shard tx); reset consensus before retry

pull/16/head
Rongjian Lan 7 years ago
parent 0b4c365b2f
commit c3f65d4402
  1. 2
      aws-code/txgen/main.go
  2. 102
      blockchain/utxopool.go
  3. 2
      consensus/consensus.go
  4. 1
      node/node_handler.go

@ -219,7 +219,7 @@ func main() {
config := readConfigFile(*configFile) config := readConfigFile(*configFile)
leaders, shardIds := getLeadersAndShardIds(&config) leaders, shardIds := getLeadersAndShardIds(&config)
crossShard := false //len(shardIds) > 1 crossShard := len(shardIds) > 1
// Setup a logger to stdout and log file. // Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder) logFileName := fmt.Sprintf("./%v/tx-generator.log", *logFolder)

@ -155,6 +155,21 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
} }
} }
isValidCrossShard := true
if isCrossShard {
// Check whether for this shard this cross transaction is valid or not.
for _, in := range tx.TxInput {
// Only check the input for my own shard.
if in.ShardId != utxoPool.ShardId {
continue
}
inTxID := hex.EncodeToString(in.TxID[:])
if _, ok := utxoPool.UtxoMap[in.Address][inTxID][in.TxOutputIndex]; !ok {
isValidCrossShard = false
}
}
}
utxoPool.mutex.Lock() utxoPool.mutex.Lock()
defer utxoPool.mutex.Unlock() defer utxoPool.mutex.Unlock()
if utxoPool != nil { if utxoPool != nil {
@ -162,27 +177,29 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// Remove // Remove
if !isUnlockTx { if !isUnlockTx {
for _, in := range tx.TxInput { if isValidCrossShard {
// Only check the input for my own shard. for _, in := range tx.TxInput {
if in.ShardId != utxoPool.ShardId { // Only check the input for my own shard.
continue if in.ShardId != utxoPool.ShardId {
} continue
}
// NOTE: for the locking phase of cross tx, the utxo is simply removed from the pool. // NOTE: for the locking phase of cross tx, the utxo is simply removed from the pool.
inTxID := hex.EncodeToString(in.TxID[:])
value := utxoPool.UtxoMap[in.Address][inTxID][in.TxOutputIndex]
utxoPool.DeleteOneUtxo(in.Address, inTxID, in.TxOutputIndex)
if isCrossShard {
// put the delete (locked) utxo into a separate locked utxo pool
inTxID := hex.EncodeToString(in.TxID[:]) inTxID := hex.EncodeToString(in.TxID[:])
if _, ok := utxoPool.LockedUtxoMap[in.Address]; !ok { value := utxoPool.UtxoMap[in.Address][inTxID][in.TxOutputIndex]
utxoPool.LockedUtxoMap[in.Address] = make(map[string]map[int]int) utxoPool.DeleteOneUtxo(in.Address, inTxID, in.TxOutputIndex)
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(map[int]int) if isCrossShard {
// put the delete (locked) utxo into a separate locked utxo pool
inTxID := hex.EncodeToString(in.TxID[:])
if _, ok := utxoPool.LockedUtxoMap[in.Address]; !ok {
utxoPool.LockedUtxoMap[in.Address] = make(map[string]map[int]int)
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(map[int]int)
}
if _, ok := utxoPool.LockedUtxoMap[in.Address][inTxID]; !ok {
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(map[int]int)
}
utxoPool.LockedUtxoMap[in.Address][inTxID][in.TxOutputIndex] = value
} }
if _, ok := utxoPool.LockedUtxoMap[in.Address][inTxID]; !ok {
utxoPool.LockedUtxoMap[in.Address][inTxID] = make(map[int]int)
}
utxoPool.LockedUtxoMap[in.Address][inTxID][in.TxOutputIndex] = value
} }
} }
} }
@ -190,26 +207,28 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// Update // Update
if !isCrossShard || isUnlockTx { if !isCrossShard || isUnlockTx {
if !unlockToCommit { if !unlockToCommit {
// unlock-to-abort, bring back (unlock) the utxo input if isValidCrossShard {
for _, in := range tx.TxInput { // unlock-to-abort, bring back (unlock) the utxo input
// Only unlock the input for my own shard. for _, in := range tx.TxInput {
if in.ShardId != utxoPool.ShardId { // Only unlock the input for my own shard.
continue if in.ShardId != utxoPool.ShardId {
continue
}
// Simply bring back the locked (removed) utxo
inTxID := hex.EncodeToString(in.TxID[:])
if _, ok := utxoPool.UtxoMap[in.Address]; !ok {
utxoPool.UtxoMap[in.Address] = make(map[string]map[int]int)
utxoPool.UtxoMap[in.Address][inTxID] = make(map[int]int)
}
if _, ok := utxoPool.UtxoMap[in.Address][inTxID]; !ok {
utxoPool.UtxoMap[in.Address][inTxID] = make(map[int]int)
}
value := utxoPool.LockedUtxoMap[in.Address][inTxID][in.TxOutputIndex]
utxoPool.UtxoMap[in.Address][inTxID][in.TxOutputIndex] = value
utxoPool.DeleteOneLockedUtxo(in.Address, inTxID, in.TxOutputIndex)
} }
// Simply bring back the locked (removed) utxo
inTxID := hex.EncodeToString(in.TxID[:])
if _, ok := utxoPool.UtxoMap[in.Address]; !ok {
utxoPool.UtxoMap[in.Address] = make(map[string]map[int]int)
utxoPool.UtxoMap[in.Address][inTxID] = make(map[int]int)
}
if _, ok := utxoPool.UtxoMap[in.Address][inTxID]; !ok {
utxoPool.UtxoMap[in.Address][inTxID] = make(map[int]int)
}
value := utxoPool.LockedUtxoMap[in.Address][inTxID][in.TxOutputIndex]
utxoPool.UtxoMap[in.Address][inTxID][in.TxOutputIndex] = value
utxoPool.DeleteOneLockedUtxo(in.Address, inTxID, in.TxOutputIndex)
} }
} else { } else {
// normal utxo output update // normal utxo output update
@ -237,16 +256,9 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// VerifyOneTransactionAndUpdate verifies and update a valid transaction. // VerifyOneTransactionAndUpdate verifies and update a valid transaction.
// Return false if the transaction is not valid. // Return false if the transaction is not valid.
func (utxoPool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool { func (utxoPool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool {
if valid, crossShard := utxoPool.VerifyOneTransaction(tx, nil); valid { if valid, _ := utxoPool.VerifyOneTransaction(tx, nil); valid {
utxoPool.UpdateOneTransaction(tx) utxoPool.UpdateOneTransaction(tx)
if crossShard {
// TODO: send proof-of-accceptance
}
return true return true
} else if crossShard {
if crossShard {
// TODO: send proof-of-rejection
}
} }
return false return false
} }

@ -19,8 +19,6 @@ type Consensus struct {
commits map[string]string commits map[string]string
// Signatures collected from validators // Signatures collected from validators
responses map[string]string responses map[string]string
// Actual block data to reach consensus on
data string
// List of validators // List of validators
validators []p2p.Peer validators []p2p.Peer
// Leader // Leader

@ -190,6 +190,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up. time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up.
case <-time.After(8 * time.Second): case <-time.After(8 * time.Second):
retry = true retry = true
node.Consensus.ResetState()
timeoutCount++ timeoutCount++
node.log.Debug("Consensus timeout, retry!", "count", timeoutCount, "node", node) node.log.Debug("Consensus timeout, retry!", "count", timeoutCount, "node", node)
} }

Loading…
Cancel
Save