Add lock on UTXO pool operations; fix validator catch up logic

pull/10/head
Rongjian Lan 7 years ago
parent 9b5e6df9cb
commit 9baa7c8503
  1. 7
      blockchain/utxopool.go
  2. 16
      consensus/consensus_leader.go
  3. 19
      consensus/consensus_validator.go
  4. 2
      node/node_handler.go

@ -3,6 +3,7 @@ package blockchain
import ( import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"sync"
) )
// UTXOPool stores transactions and balance associated with each address. // UTXOPool stores transactions and balance associated with each address.
@ -25,6 +26,7 @@ type UTXOPool struct {
UtxoMap map[string]map[string]map[int]int UtxoMap map[string]map[string]map[int]int
ShardId uint32 ShardId uint32
mutex sync.Mutex
} }
// VerifyTransactions verifies if a list of transactions valid for this shard. // VerifyTransactions verifies if a list of transactions valid for this shard.
@ -72,11 +74,14 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[s
(*spentTXOs)[in.Address][inTxID][index] = true (*spentTXOs)[in.Address][inTxID][index] = true
// Sum the balance up to the inTotal. // Sum the balance up to the inTotal.
utxoPool.mutex.Lock()
if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok { if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok {
inTotal += val inTotal += val
} else { } else {
utxoPool.mutex.Unlock()
return false, crossShard return false, crossShard
} }
utxoPool.mutex.Unlock()
} }
outTotal := 0 outTotal := 0
@ -102,6 +107,8 @@ func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
// UpdateOneTransaction updates utxoPool in respect to the new Transaction. // UpdateOneTransaction updates utxoPool in respect to the new Transaction.
func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) { func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
utxoPool.mutex.Lock()
defer utxoPool.mutex.Unlock()
if utxoPool != nil { if utxoPool != nil {
txID := hex.EncodeToString(tx.ID[:]) txID := hex.EncodeToString(tx.ID[:])

@ -143,32 +143,31 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
_ = signature _ = signature
// check consensus Id // check consensus Id
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if consensusId != consensus.consensusId { if consensusId != consensus.consensusId {
consensus.Log.Warn("Received COMMIT with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus) consensus.Log.Warn("Received COMMIT with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return return
} }
if bytes.Compare(blockHash, consensus.blockHash[:]) != 0 { if bytes.Compare(blockHash, consensus.blockHash[:]) != 0 {
consensus.Log.Warn("Received COMMIT with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus) consensus.Log.Warn("Received COMMIT with wrong blockHash", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return return
} }
// proceed only when the message is not received before // proceed only when the message is not received before
consensus.mutex.Lock()
_, ok := consensus.commits[validatorId] _, ok := consensus.commits[validatorId]
shouldProcess := !ok shouldProcess := !ok
if shouldProcess { if shouldProcess {
consensus.commits[validatorId] = validatorId consensus.commits[validatorId] = validatorId
//consensus.Log.Debug("Number of commits received", "count", len(consensus.commits)) //consensus.Log.Debug("Number of commits received", "consensusId", consensus.consensusId, "count", len(consensus.commits))
} }
consensus.mutex.Unlock()
if !shouldProcess { if !shouldProcess {
return return
} }
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE { if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE {
consensus.mutex.Lock()
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE { if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE {
consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits)) consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits))
@ -179,7 +178,6 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
// Set state to CHALLENGE_DONE // Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE consensus.state = CHALLENGE_DONE
} }
consensus.mutex.Unlock()
} }
} }
@ -281,11 +279,17 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
consensus.Log.Warn("Received RESPONSE with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus) consensus.Log.Warn("Received RESPONSE with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
} }
if bytes.Compare(blockHash, consensus.blockHash[:]) != 0 {
consensus.Log.Warn("Received RESPONSE with wrong blockHash", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return
}
// proceed only when the message is not received before // proceed only when the message is not received before
_, ok := consensus.responses[validatorId] _, ok := consensus.responses[validatorId]
shouldProcess = shouldProcess && !ok shouldProcess = shouldProcess && !ok
if shouldProcess { if shouldProcess {
consensus.responses[validatorId] = validatorId consensus.responses[validatorId] = validatorId
//consensus.Log.Debug("Number of responses received", "consensusId", consensus.consensusId, "count", len(consensus.responses))
} }
consensus.mutex.Unlock() consensus.mutex.Unlock()

@ -120,6 +120,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
// TODO: return the signature(commit) to leader // TODO: return the signature(commit) to leader
// For now, simply return the private key of this node. // For now, simply return the private key of this node.
msgToSend := consensus.constructCommitMessage() msgToSend := consensus.constructCommitMessage()
// consensus.Log.Debug("SENDING COMMIT", "consensusId", consensus.consensusId, "consensus", consensus)
p2p.SendMessage(consensus.leader, msgToSend) p2p.SendMessage(consensus.leader, msgToSend)
// Set state to COMMIT_DONE // Set state to COMMIT_DONE
@ -212,16 +213,23 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
return return
} }
consensus.mutex.Lock()
// check block hash // check block hash
if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 { if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 {
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus) consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
consensus.mutex.Unlock()
return return
} }
// check consensus Id // check consensus Id
if consensusId != consensus.consensusId { if consensusId != consensus.consensusId {
consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus) consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return if _, ok := consensus.blocksReceived[consensus.consensusId]; !ok {
consensus.mutex.Unlock()
return
}
consensus.Log.Warn("ROLLING UP", "consensus", consensus)
// If I received previous block (which haven't been processed. I will roll up to current block if everything checks.
} }
// TODO: verify aggregated commits with real schnor cosign verification // TODO: verify aggregated commits with real schnor cosign verification
@ -229,6 +237,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
// TODO: return the signature(response) to leader // TODO: return the signature(response) to leader
// For now, simply return the private key of this node. // For now, simply return the private key of this node.
msgToSend := consensus.constructResponseMessage() msgToSend := consensus.constructResponseMessage()
// consensus.Log.Debug("SENDING RESPONSE", "consensusId", consensus.consensusId, "consensus", consensus)
p2p.SendMessage(consensus.leader, msgToSend) p2p.SendMessage(consensus.leader, msgToSend)
// Set state to RESPONSE_DONE // Set state to RESPONSE_DONE
@ -237,15 +246,12 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
// BIG TODO: the block catch up logic is basically a mock now. More checks need to be done to make it correct. // BIG TODO: the block catch up logic is basically a mock now. More checks need to be done to make it correct.
// The logic is to roll up to the latest blocks one by one to try catching up with the leader. // The logic is to roll up to the latest blocks one by one to try catching up with the leader.
for { for {
consensus.mutex.Lock()
val, ok := consensus.blocksReceived[consensus.consensusId] val, ok := consensus.blocksReceived[consensus.consensusId]
consensus.mutex.Unlock()
if ok { if ok {
consensus.mutex.Lock()
delete(consensus.blocksReceived, consensus.consensusId) delete(consensus.blocksReceived, consensus.consensusId)
consensus.blockHash = [32]byte{}
consensus.consensusId++ // roll up one by one, until the next block is not received yet. consensus.consensusId++ // roll up one by one, until the next block is not received yet.
consensus.mutex.Unlock()
// TODO: think about when validators know about the consensus is reached. // TODO: think about when validators know about the consensus is reached.
// For now, the blockchain is updated right here. // For now, the blockchain is updated right here.
@ -261,6 +267,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
// check block data (transactions // check block data (transactions
if !consensus.BlockVerifier(&blockHeaderObj) { if !consensus.BlockVerifier(&blockHeaderObj) {
consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusId", consensus.consensusId) consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusId", consensus.consensusId)
consensus.mutex.Unlock()
return return
} }
consensus.OnConsensusDone(&blockHeaderObj) consensus.OnConsensusDone(&blockHeaderObj)
@ -269,7 +276,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
} }
} }
consensus.mutex.Unlock()
} }
// Construct the response message to send to leader (assumption the consensus data is already verified) // Construct the response message to send to leader (assumption the consensus data is already verified)

@ -66,7 +66,7 @@ func (node *Node) NodeHandler(conn net.Conn) {
case common.CONTROL: case common.CONTROL:
controlType := msgPayload[0] controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP { if ControlMessageType(controlType) == STOP {
node.log.Debug("Stopping Node", "node", node, "numTxsProcessed", node.countNumTransactionsInBlockchain()) node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain())
os.Exit(0) os.Exit(0)
} }

Loading…
Cancel
Save