Synchronization bug fixes on consensus

pull/8/head
Rongjian Lan 7 years ago
parent 31f111ed5d
commit 1b3ceccf2d
  1. 4
      aws-code/transaction_generator.go
  2. 102
      consensus/consensus.go
  3. 90
      consensus/consensus_leader.go
  4. 100
      consensus/consensus_validator.go
  5. 3
      node/node.go
  6. 2
      node/node_handler.go

@ -125,7 +125,7 @@ func main() {
dataNode.AddMoreFakeTransactions()
start := time.Now()
totalTime := 60.0
totalTime := 600.0
time.Sleep(3 * time.Second) // wait for nodes to be ready
for true {
t := time.Now()
@ -140,7 +140,7 @@ func main() {
// Update local utxo pool to mirror the utxo pool of a real node
dataNode.UtxoPool.Update(txsToSend)
time.Sleep(1 * time.Second) // Send a batch of transactions every second
time.Sleep(2 * time.Second) // Send a batch of transactions every second
}
// Send a stop message to stop the nodes at the end

@ -9,58 +9,97 @@ import (
"harmony-benchmark/p2p"
"regexp"
"strconv"
"sync"
)
// Consensus data containing all info related to one consensus process
type Consensus struct {
state ConsensusState
commits map[string]string // Signatures collected from validators
responses map[string]string // Signatures collected from validators
data string // Actual block data to reach consensus on
validators []p2p.Peer // List of validators
leader p2p.Peer // Leader
priKey string // private key of current node
IsLeader bool // Whether I am leader. False means I am validator
nodeId uint16 // Leader or validator Id - 2 byte
consensusId uint32 // Consensus Id (View Id) - 4 byte
blockHash [32]byte // Blockhash - 32 byte
blockHeader []byte // BlockHeader to run consensus on
ShardIDShardID uint32 // Shard Id which this node belongs to
ReadySignal chan int // Signal channel for starting a new consensus process
BlockVerifier func(*blockchain.Block) bool // The verifier func passed from Node object
OnConsensusDone func(*blockchain.Block) // The post-consensus processing func passed from Node object. Called when consensus on a new block is done
msgCategory byte // Network related fields
actionType byte
Log log.Logger
state ConsensusState
// Signatures collected from validators
commits map[string]string
// Signatures collected from validators
responses map[string]string
// Actual block data to reach consensus on
data string
// List of validators
validators []p2p.Peer
// Leader
leader p2p.Peer
// private key of current node
priKey string
// Whether I am leader. False means I am validator
IsLeader bool
// Leader or validator Id - 2 byte
nodeId uint16
// Consensus Id (View Id) - 4 byte
consensusId uint32
// Blockhash - 32 byte
blockHash [32]byte
// BlockHeader to run consensus on
blockHeader []byte
// Shard Id which this node belongs to
ShardIDShardID uint32
// global consensus mutex
mutex sync.Mutex
// Validator specific fields
// Blocks received but not done with consensus yet
blocksReceived map[uint32]*BlockConsensusStatus
// Signal channel for starting a new consensus process
ReadySignal chan int
// The verifier func passed from Node object
BlockVerifier func(*blockchain.Block)bool
// The post-consensus processing func passed from Node object
// Called when consensus on a new block is done
OnConsensusDone func(*blockchain.Block)
//// Network related fields
msgCategory byte
actionType byte
Log log.Logger
}
// This used to keep track of the consensus status of multiple blocks received so far
// This is mainly used in the case that this node is lagging behind and needs to catch up.
// For example, the consensus moved to round N and this node received message(N).
// However, this node may still not finished with round N-1, so the newly received message(N)
// should be stored in this temporary structure. In case the round N-1 finishes, it can catch
// up to the latest state of round N by using this structure.
type BlockConsensusStatus struct {
// BlockHeader to run consensus on
blockHeader []byte
state ConsensusState
}
// Consensus state enum for both leader and validator
// States for leader:
// READY, ANNOUNCE_DONE, CHALLENGE_DONE, FINISHED
// FINISHED, ANNOUNCE_DONE, CHALLENGE_DONE
// States for validator:
// READY, COMMIT_DONE, RESPONSE_DONE, FINISHED
// FINISHED, COMMIT_DONE, RESPONSE_DONE
type ConsensusState int
const (
READY ConsensusState = iota
FINISHED ConsensusState = iota // initial state or state after previous consensus is done.
ANNOUNCE_DONE
COMMIT_DONE
CHALLENGE_DONE
RESPONSE_DONE
FINISHED
)
// Returns string name for the ConsensusState enum
func (state ConsensusState) String() string {
names := [...]string{
"READY",
"FINISHED",
"ANNOUNCE_DONE",
"COMMIT_DONE",
"CHALLENGE_DONE",
"RESPONSE_DONE",
"FINISHED"}
"RESPONSE_DONE"}
if state < READY || state > RESPONSE_DONE {
if state < FINISHED || state > RESPONSE_DONE {
return "Unknown"
}
return names[state]
@ -98,6 +137,9 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) C
}
consensus.ShardIDShardID = uint32(myShardIDShardID)
// For validators
consensus.blocksReceived = make(map[uint32]*BlockConsensusStatus)
// For now use socket address as 16 byte Id
// TODO: populate with correct Id
socketId := reg.ReplaceAllString(consensus.priKey, "")
@ -121,7 +163,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) C
// Reset the state of the consensus
func (consensus *Consensus) ResetState() {
consensus.state = READY
consensus.state = FINISHED
consensus.commits = make(map[string]string)
consensus.responses = make(map[string]string)
}
@ -134,5 +176,5 @@ func (consensus *Consensus) String() string {
} else {
duty = "VLD" // validator
}
return fmt.Sprintf("[%s, %s, %v, %v]", duty, consensus.priKey, consensus.ShardIDShardID, consensus.nodeId)
return fmt.Sprintf("[%s, %s, %v, %v, %s]", duty, consensus.priKey, consensus.ShardIDShardID, consensus.nodeId, consensus.state)
}

@ -1,8 +1,6 @@
package consensus
import (
"sync"
"bytes"
"crypto/sha256"
"encoding/binary"
@ -10,18 +8,21 @@ import (
"harmony-benchmark/p2p"
"strings"
"encoding/gob"
"fmt"
"time"
)
var mutex = &sync.Mutex{}
// WaitForNewBlock waits for a new block.
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus)
for { // keep waiting for new blocks
newBlock := <-blockChannel
// TODO: think about potential race condition
if consensus.state == READY {
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus)
for consensus.state == FINISHED {
time.Sleep(500 * time.Millisecond)
consensus.startConsensus(&newBlock)
break
}
}
}
@ -72,13 +73,15 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.blockHeader = byteBuffer.Bytes()
msgToSend := consensus.constructAnnounceMessage()
fmt.Printf("BROADCAST ANNOUNCE: %d\n", consensus.consensusId)
p2p.BroadcastMessage(consensus.validators, msgToSend)
fmt.Printf("BROADCAST ANNOUNCE DONE: %d\n", consensus.consensusId)
// Set state to ANNOUNCE_DONE
consensus.state = ANNOUNCE_DONE
p2p.BroadcastMessage(consensus.validators, msgToSend)
}
// Construct the announce message to send to validators
func (consensus Consensus) constructAnnounceMessage() []byte {
func (consensus *Consensus) constructAnnounceMessage() []byte {
buffer := bytes.NewBuffer([]byte{})
// 4 byte consensus id
@ -106,6 +109,7 @@ func (consensus Consensus) constructAnnounceMessage() []byte {
signature := signMessage(buffer.Bytes())
buffer.Write(signature)
consensus.Log.Debug("SENDING ANNOUNCE")
return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes())
}
@ -145,36 +149,45 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
_ = commit
_ = signature
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("[ERROR] Received COMMIT with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return
}
// proceed only when the message is not received before and this consensus phase is not done.
mutex.Lock()
consensus.mutex.Lock()
_, ok := consensus.commits[validatorId]
shouldProcess := !ok && consensus.state == ANNOUNCE_DONE
shouldProcess := !ok
if shouldProcess {
consensus.commits[validatorId] = validatorId
//consensus.Log.Debug("Number of commits received", "count", len(consensus.commits))
fmt.Printf("Number of COMMITS received %d\n", len(consensus.commits))
}
mutex.Unlock()
consensus.mutex.Unlock()
if !shouldProcess {
return
}
mutex.Lock()
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 {
consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits))
if consensus.state == ANNOUNCE_DONE {
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE {
consensus.mutex.Lock()
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 && consensus.state < CHALLENGE_DONE {
consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits))
// Broadcast challenge
msgToSend := consensus.constructChallengeMessage()
//fmt.Printf("BROADCAST CHALLENGE: %d\n", consensus.consensusId)
p2p.BroadcastMessage(consensus.validators, msgToSend)
//fmt.Printf("BROADCAST CHALLENGE DONE: %d\n", consensus.consensusId)
// Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE
}
// Broadcast challenge
msgToSend := consensus.constructChallengeMessage()
p2p.BroadcastMessage(consensus.validators, msgToSend)
consensus.mutex.Unlock()
}
mutex.Unlock()
}
// Construct the challenge message to send to validators
func (consensus Consensus) constructChallengeMessage() []byte {
func (consensus *Consensus) constructChallengeMessage() []byte {
buffer := bytes.NewBuffer([]byte{})
// 4 byte consensus id
@ -262,31 +275,38 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
_ = blockHash
_ = response
_ = signature
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("[ERROR] Received RESPONSE with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return
}
// proceed only when the message is not received before and this consensus phase is not done.
mutex.Lock()
consensus.mutex.Lock()
_, ok := consensus.responses[validatorId]
shouldProcess := !ok && consensus.state == CHALLENGE_DONE
shouldProcess := !ok
if shouldProcess {
consensus.responses[validatorId] = validatorId
//consensus.Log.Debug("Number of responses received", "count", len(consensus.responses))
//consensus.Log.Debug("Number of responses received", "count", len(consensus.responses), "consensudId", consensusId)
fmt.Printf("Number of RESPONSES received %d\n", len(consensus.responses))
}
mutex.Unlock()
consensus.mutex.Unlock()
if !shouldProcess {
return
}
mutex.Lock()
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses))
if consensus.state == CHALLENGE_DONE {
// Set state to FINISHED
consensus.state = FINISHED
// TODO: do followups on the consensus
consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "numOfNodes", len(consensus.validators))
//consensus.Log.Debug("RECEIVED RESPONSE", "consensusId", consensusId)
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 && consensus.state != FINISHED {
consensus.mutex.Lock()
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 && consensus.state != FINISHED {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses))
// Reset state to FINISHED, and clear other data.
consensus.ResetState()
consensus.consensusId++
consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "consensusId", consensus.consensusId)
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
@ -297,12 +317,12 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
consensus.Log.Debug("failed to construct the new block after consensus")
}
consensus.OnConsensusDone(&blockHeaderObj)
consensus.consensusId++
// Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- 1
}
consensus.mutex.Unlock()
// TODO: composes new block and broadcast the new block to validators
}
mutex.Unlock()
}

@ -8,6 +8,7 @@ import (
"regexp"
"encoding/gob"
"harmony-benchmark/blockchain"
"fmt"
)
// Validator's consensus message dispatcher
@ -76,19 +77,13 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
copy(consensus.blockHash[:], blockHash[:])
// Verify block data
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId)
return
}
// check leader Id
leaderPrivKey := consensus.leader.Ip + consensus.leader.Port
reg, _ := regexp.Compile("[^0-9]+")
socketId := reg.ReplaceAllString(leaderPrivKey, "")
value, _ := strconv.Atoi(socketId)
if leaderId != uint16(value) {
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId)
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId, "consensus", consensus)
return
}
@ -97,26 +92,36 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
var blockHeaderObj blockchain.Block // TODO: separate header from block. Right now, this blockHeader data is actually the whole block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("[ERROR] Unparseable block header data")
consensus.Log.Debug("[ERROR] Unparseable block header data", "consensus", consensus)
return
}
consensus.blockHeader = blockHeader // TODO: think about remove this field and use blocksReceived instead
consensus.mutex.Lock()
consensus.blocksReceived[consensusId] = &BlockConsensusStatus{blockHeader, consensus.state}
consensus.mutex.Unlock()
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return
}
consensus.blockHeader = blockHeader
// check block hash
if bytes.Compare(blockHash[:], blockHeaderObj.HashTransactions()[:]) != 0 || bytes.Compare(blockHeaderObj.Hash[:], blockHeaderObj.HashTransactions()[:]) != 0 {
consensus.Log.Debug("[ERROR] Block hash doesn't match")
consensus.Log.Debug("[ERROR] Block hash doesn't match", "consensus", consensus)
return
}
// check block data (transactions
if !consensus.BlockVerifier(&blockHeaderObj) {
consensus.Log.Debug("[ERROR] Block content is not verified successfully")
consensus.Log.Debug("[ERROR] Block content is not verified successfully", "consensus", consensus)
return
}
// TODO: return the signature(commit) to leader
// For now, simply return the private key of this node.
msgToSend := consensus.constructCommitMessage()
fmt.Printf("SENDING COMMIT: %d\n", consensus.consensusId)
p2p.SendMessage(consensus.leader, msgToSend)
// Set state to COMMIT_DONE
@ -124,7 +129,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
}
// Construct the commit message to send to leader (assumption the consensus data is already verified)
func (consensus Consensus) constructCommitMessage() []byte {
func (consensus *Consensus) constructCommitMessage() []byte {
buffer := bytes.NewBuffer([]byte{})
// 4 byte consensus id
@ -197,12 +202,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
_ = challenge
_ = signature
// erify block data and the aggregated signatures
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId)
return
}
// Verify block data and the aggregated signatures
// check leader Id
leaderPrivKey := consensus.leader.Ip + consensus.leader.Port
@ -210,44 +210,78 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
socketId := reg.ReplaceAllString(leaderPrivKey, "")
value, _ := strconv.Atoi(socketId)
if leaderId != uint16(value) {
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId)
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId, "consensus", consensus)
return
}
// check block hash
if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 {
consensus.Log.Debug("[ERROR] Block hash doesn't match")
consensus.Log.Debug("[ERROR] Block hash doesn't match", "consensus", consensus)
return
}
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus)
return
}
// TODO: verify aggregated commits with real schnor cosign verification
// TODO: return the signature(response) to leader
// For now, simply return the private key of this node.
msgToSend := consensus.constructResponseMessage()
//consensus.Log.Debug("sh......")
//time.Sleep(100 * time.Millisecond)
fmt.Printf("SENDING RESPONSE: %d\n", consensus.consensusId)
p2p.SendMessage(consensus.leader, msgToSend)
// Set state to RESPONSE_DONE
consensus.state = RESPONSE_DONE
consensus.consensusId++
// TODO: think about when validators know about the consensus is reached.
// For now, the blockchain is updated right here.
// BIG TODO: the block catch up logic is basically a mock now. More checks need to be done to make it correct.
// The logic is roll up to the latest blocks received one by one to try catching up with the leader.
for {
consensus.mutex.Lock()
val, ok := consensus.blocksReceived[consensus.consensusId]
consensus.mutex.Unlock()
if ok {
consensus.mutex.Lock()
delete(consensus.blocksReceived, consensus.consensusId)
consensus.consensusId++ // roll up one by one, until the next block is not received yet.
consensus.mutex.Unlock()
// TODO: think about when validators know about the consensus is reached.
// For now, the blockchain is updated right here.
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(val.blockHeader))
var blockHeaderObj blockchain.Block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
}
// check block data (transactions
if !consensus.BlockVerifier(&blockHeaderObj) {
consensus.Log.Debug("[ERROR] Block content is not verified successfully", "consensusId", consensus.consensusId)
return
}
consensus.OnConsensusDone(&blockHeaderObj)
} else {
break
}
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader))
var blockHeaderObj blockchain.Block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
}
consensus.OnConsensusDone(&blockHeaderObj)
}
// Construct the response message to send to leader (assumption the consensus data is already verified)
func (consensus Consensus) constructResponseMessage() []byte {
func (consensus *Consensus) constructResponseMessage() []byte {
buffer := bytes.NewBuffer([]byte{})
// 4 byte consensus id

@ -26,9 +26,12 @@ type Node struct {
// Add new transactions to the pending transaction list
func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) {
pendingTxMutex.Lock()
node.pendingTransactions = append(node.pendingTransactions, newTxs...)
pendingTxMutex.Unlock()
node.log.Debug("Got more transactions", "num", len(newTxs))
node.log.Debug("Total pending transactions", "num", len(node.pendingTransactions))
}
// Take out a subset of valid transactions from the pending transaction list

@ -133,11 +133,11 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
}
// If not enough transactions to run consensus,
// periodically check whether we have enough transactions to package into block.
time.Sleep(1 * time.Second)
}
// Send the new block to consensus so it can be confirmed.
node.BlockChannel <- *newBlock
time.Sleep(2 * time.Second)
}
}

Loading…
Cancel
Save