From d997b4271fb509a59d9f43bdae88ec6ad5ba1f6d Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Thu, 13 Sep 2018 20:04:57 -0700 Subject: [PATCH] Lower the threshold from 1/1 to 2/3 --- consensus/consensus.go | 20 +++++++++++++++----- consensus/consensus_leader.go | 32 ++++++++++++++++++++++---------- consensus/consensus_validator.go | 21 ++++++++++----------- p2p/helper.go | 4 ++-- p2p/peer.go | 2 +- 5 files changed, 50 insertions(+), 29 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index fc188610f..83e84c334 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -67,7 +67,7 @@ type Consensus struct { // Blocks received but not done with consensus yet blocksReceived map[uint32]*BlockConsensusStatus // Commitment secret - secret kyber.Scalar + secret map[uint32]kyber.Scalar // Signal channel for starting a new consensus process ReadySignal chan struct{} @@ -116,7 +116,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * // Initialize cosign bitmap allPublicKeys := make([]kyber.Point, 0) - for _, validatorPeer := range consensus.validators { + for _, validatorPeer := range peers { allPublicKeys = append(allPublicKeys, validatorPeer.PubKey) } allPublicKeys = append(allPublicKeys, leader.PubKey) @@ -132,6 +132,8 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * consensus.bitmap = mask consensus.finalBitmap = finalMask + consensus.secret = map[uint32]kyber.Scalar{} + // For now use socket address as 16 byte Id // TODO: populate with correct Id consensus.nodeId = utils.GetUniqueIdFromPeer(p2p.Peer{Ip: ip, Port: port}) @@ -184,12 +186,20 @@ func (consensus *Consensus) GetValidatorPeers() []p2p.Peer { func (consensus *Consensus) ResetState() { consensus.state = FINISHED consensus.commitments = &map[uint16]kyber.Point{} - consensus.bitmap.SetMask([]byte{}) consensus.finalCommitments = &map[uint16]kyber.Point{} - consensus.finalBitmap.SetMask([]byte{}) consensus.responses = &map[uint16]kyber.Scalar{} consensus.finalResponses = &map[uint16]kyber.Scalar{} - consensus.secret = nil + + mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) + finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) + consensus.bitmap = mask + consensus.finalBitmap = finalMask + consensus.bitmap.SetMask([]byte{}) + consensus.finalBitmap.SetMask([]byte{}) + + consensus.aggregatedCommitment = nil + consensus.aggregatedFinalCommitment = nil + consensus.secret = map[uint32]kyber.Scalar{} } // Returns a string representation of this consensus diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index c84b035db..43c372b4e 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -96,7 +96,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { func (consensus *Consensus) commitByLeader(firstRound bool) { // Generate leader's own commitment secret, commitment := crypto.Commit(crypto.Ed25519Curve) - consensus.secret = secret + consensus.secret[consensus.consensusId] = secret if firstRound { (*consensus.commitments)[consensus.nodeId] = commitment consensus.bitmap.SetKey(consensus.pubKey, true) @@ -164,6 +164,9 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con // proceed only when the message is not received before _, ok = (*commitments)[validatorId] shouldProcess := !ok + if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3 + 1) { + shouldProcess = false + } if shouldProcess { point := crypto.Ed25519Curve.Point() point.UnmarshalBinary(commitment) @@ -177,7 +180,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con return } - if len((*commitments)) >= len(consensus.publicKeys) && consensus.state < targetState { + if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state < targetState { consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state) // Broadcast challenge @@ -213,7 +216,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con // Leader commit to the message itself before receiving others commits func (consensus *Consensus) responseByLeader(challenge kyber.Scalar, firstRound bool) { // Generate leader's own commitment - response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret, challenge) + response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret[consensus.consensusId], challenge) if err == nil { if firstRound { (*consensus.responses)[consensus.nodeId] = response @@ -229,6 +232,7 @@ func (consensus *Consensus) responseByLeader(challenge kyber.Scalar, firstRound // Processes the response message sent from validators func (consensus *Consensus) processResponseMessage(payload []byte, targetState ConsensusState) { + consensus.Log.Warn("Received RESPONSE 1") //#### Read payload data offset := 0 // 4 byte consensus id @@ -254,6 +258,8 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C shouldProcess := true consensus.mutex.Lock() + defer consensus.mutex.Unlock() + // check consensus Id if consensusId != consensus.consensusId { shouldProcess = false @@ -276,6 +282,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C return } + consensus.Log.Warn("Received RESPONSE 2") commitments := consensus.commitments // targetState == COLLECTIVE_SIG_DONE responses := consensus.responses bitmap := consensus.bitmap @@ -288,6 +295,12 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C // proceed only when the message is not received before _, ok = (*responses)[validatorId] shouldProcess = shouldProcess && !ok + + if len((*responses)) >= ((len(consensus.publicKeys)*2)/3 + 1) { + consensus.Log.Warn("quiting 3") + shouldProcess = false + } + if shouldProcess { // verify the response matches the received commit responseScalar := crypto.Ed25519Curve.Scalar() @@ -300,19 +313,18 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C (*responses)[validatorId] = responseScalar consensus.Log.Debug("Received new response message", "num", len(*responses)) // Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages - consensus.bitmap.SetKey(value.PubKey, true) + bitmap.SetKey(value.PubKey, true) } - } - consensus.mutex.Unlock() if !shouldProcess { + consensus.Log.Warn("returning 3") return } - if len(*responses) >= len(consensus.publicKeys) && consensus.state != targetState { - consensus.mutex.Lock() - if len(*responses) >= len(consensus.publicKeys) && consensus.state != targetState { + consensus.Log.Warn("Received RESPONSE 3") + if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState { + if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState { consensus.Log.Debug("Enough responses received with signatures", "num", len(*responses), "state", consensus.state) // Aggregate responses responseScalars := []kyber.Scalar{} @@ -329,6 +341,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C if targetState == FINISHED { aggregatedCommitment = consensus.aggregatedFinalCommitment } + collectiveSigAndBitmap, err := crypto.Sign(crypto.Ed25519Curve, aggregatedCommitment, aggregatedResponse, bitmap) if err != nil { @@ -377,7 +390,6 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C consensus.ReadySignal <- struct{}{} } } - consensus.mutex.Unlock() } } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 63ac9c614..bdb36f938 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -124,7 +124,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { secret, msgToSend := consensus.constructCommitMessage(proto_consensus.COMMIT) // Store the commitment secret - consensus.secret = secret + consensus.secret[consensusId] = secret p2p.SendMessage(consensus.leader, msgToSend) @@ -182,19 +182,18 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState return } - consensus.mutex.Lock() - // Add attack model of IncorrectResponse. if attack.GetInstance().IncorrectResponse() { consensus.Log.Warn("IncorrectResponse attacked") - consensus.mutex.Unlock() return } + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + // check block hash if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 { consensus.Log.Warn("Block hash doesn't match", "consensus", consensus) - consensus.mutex.Unlock() return } @@ -202,7 +201,6 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState if consensusId != consensus.consensusId { consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId, "consensus", consensus) if _, ok := consensus.blocksReceived[consensus.consensusId]; !ok { - consensus.mutex.Unlock() return } consensus.Log.Warn("ROLLING UP", "consensus", consensus) @@ -234,7 +232,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState return } - response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret, receivedChallenge) + response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret[consensusId], receivedChallenge) if err != nil { log.Warn("Failed to generate response", "err", err) return @@ -246,7 +244,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState msgToSend := consensus.constructResponseMessage(msgTypeToSend, response) p2p.SendMessage(consensus.leader, msgToSend) - + consensus.Log.Warn("Sending Response", "state", targetState) // Set state to target state (RESPONSE_DONE, FINAL_RESPONSE_DONE) consensus.state = targetState @@ -259,6 +257,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState delete(consensus.blocksReceived, consensus.consensusId) consensus.blockHash = [32]byte{} + delete(consensus.secret, consensusId) consensus.consensusId++ // roll up one by one, until the next block is not received yet. // TODO: think about when validators know about the consensus is reached. @@ -275,9 +274,9 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState // check block data (transactions if !consensus.BlockVerifier(&blockHeaderObj) { consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusId", consensus.consensusId) - consensus.mutex.Unlock() return } + consensus.Log.Info("Finished Response. Adding block to chain", "numTx", len(blockHeaderObj.Transactions)) consensus.OnConsensusDone(&blockHeaderObj) } else { break @@ -285,7 +284,6 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState } } - consensus.mutex.Unlock() } // Processes the collective signature message sent from the leader @@ -338,6 +336,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) { err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.publicKeys)/3)+1)) if err != nil { consensus.Log.Warn("Failed to verify the collective sig message", "consensusId", consensusId, "err", err) + return } // Add attack model of IncorrectResponse. @@ -360,7 +359,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) { secret, msgToSend := consensus.constructCommitMessage(proto_consensus.FINAL_COMMIT) // Store the commitment secret - consensus.secret = secret + consensus.secret[consensusId] = secret p2p.SendMessage(consensus.leader, msgToSend) diff --git a/p2p/helper.go b/p2p/helper.go index 3e52aa37d..d748a0aae 100644 --- a/p2p/helper.go +++ b/p2p/helper.go @@ -66,11 +66,11 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { bytesToRead := binary.BigEndian.Uint32(fourBytes) //log.Printf("The content size is %d bytes.", bytesToRead) - //// Read the content in chunk of 1024 bytes + //// Read the content in chunk of 16 * 1024 bytes tmpBuf := make([]byte, BATCH_SIZE) ILOOP: for { - timeoutDuration := 1 * time.Second + timeoutDuration := 10 * time.Second conn.SetReadDeadline(time.Now().Add(timeoutDuration)) if bytesToRead < BATCH_SIZE { // Read the last number of bytes less than 1024 diff --git a/p2p/peer.go b/p2p/peer.go index ecc913ad0..91f962c6c 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -53,7 +53,7 @@ func BroadcastMessage(peers []Peer, msg []byte) { }() } wg.Wait() - log.Info("Broadcasting Done", "time spent", time.Now().Sub(start).Seconds()) + log.Info("Broadcasting Done", "time spent(s)", time.Now().Sub(start).Seconds()) } func SelectMyPeers(peers []Peer, min int, max int) []Peer {