You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
373 lines
13 KiB
373 lines
13 KiB
package consensus
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/gob"
|
|
|
|
"github.com/dedis/kyber/sign/schnorr"
|
|
"github.com/harmony-one/harmony/attack"
|
|
"github.com/harmony-one/harmony/blockchain"
|
|
"github.com/harmony-one/harmony/crypto"
|
|
"github.com/harmony-one/harmony/log"
|
|
proto_consensus "github.com/harmony-one/harmony/proto/consensus"
|
|
"github.com/harmony-one/harmony/utils"
|
|
)
|
|
|
|
// ProcessMessageValidator dispatches validator's consensus message.
|
|
func (consensus *Consensus) ProcessMessageValidator(message []byte) {
|
|
msgType, err := proto_consensus.GetConsensusMessageType(message)
|
|
if err != nil {
|
|
consensus.Log.Error("Failed to get consensus message type", "err", err, "consensus", consensus)
|
|
}
|
|
|
|
payload, err := proto_consensus.GetConsensusMessagePayload(message)
|
|
if err != nil {
|
|
consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus)
|
|
}
|
|
|
|
switch msgType {
|
|
case proto_consensus.Announce:
|
|
consensus.processAnnounceMessage(payload)
|
|
case proto_consensus.Challenge:
|
|
consensus.processChallengeMessage(payload, ResponseDone)
|
|
case proto_consensus.FinalChallenge:
|
|
consensus.processChallengeMessage(payload, FinalResponseDone)
|
|
case proto_consensus.CollectiveSig:
|
|
consensus.processCollectiveSigMessage(payload)
|
|
default:
|
|
consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
|
|
}
|
|
}
|
|
|
|
// Processes the announce message sent from the leader
|
|
func (consensus *Consensus) processAnnounceMessage(payload []byte) {
|
|
consensus.Log.Info("Received Announce Message", "Size", len(payload), "nodeID", consensus.nodeID)
|
|
//#### Read payload data
|
|
offset := 0
|
|
// 4 byte consensus id
|
|
consensusID := binary.BigEndian.Uint32(payload[offset : offset+4])
|
|
offset += 4
|
|
|
|
// 32 byte block hash
|
|
blockHash := payload[offset : offset+32]
|
|
offset += 32
|
|
|
|
// 2 byte leader id
|
|
leaderID := binary.BigEndian.Uint16(payload[offset : offset+2])
|
|
offset += 2
|
|
|
|
// n byte of message to cosign
|
|
n := len(payload) - offset - 64 // the number means 64 signature
|
|
blockHeader := payload[offset : offset+n]
|
|
offset += n
|
|
|
|
// 64 byte of signature on previous data
|
|
signature := payload[offset : offset+64]
|
|
offset += 64
|
|
//#### END: Read payload data
|
|
|
|
copy(consensus.blockHash[:], blockHash[:])
|
|
|
|
// Verify block data
|
|
// check leader Id
|
|
myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader)
|
|
if leaderID != myLeaderID {
|
|
consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// Verify signature
|
|
if schnorr.Verify(crypto.Ed25519Curve, consensus.leader.PubKey, payload[:offset-64], signature) != nil {
|
|
consensus.Log.Warn("Received message with invalid signature", "leaderKey", consensus.leader.PubKey, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// check block header is valid
|
|
txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader))
|
|
var blockHeaderObj blockchain.Block // TODO: separate header from block. Right now, this blockHeader data is actually the whole block
|
|
err := txDecoder.Decode(&blockHeaderObj)
|
|
if err != nil {
|
|
consensus.Log.Warn("Unparseable block header data", "consensus", consensus)
|
|
return
|
|
}
|
|
consensus.blockHeader = blockHeader // TODO: think about remove this field and use blocksReceived instead
|
|
consensus.mutex.Lock()
|
|
consensus.blocksReceived[consensusID] = &BlockConsensusStatus{blockHeader, consensus.state}
|
|
consensus.mutex.Unlock()
|
|
|
|
// Add attack model of IncorrectResponse.
|
|
if attack.GetInstance().IncorrectResponse() {
|
|
consensus.Log.Warn("IncorrectResponse attacked")
|
|
return
|
|
}
|
|
|
|
// check consensus Id
|
|
if consensusID != consensus.consensusID {
|
|
consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// check block hash
|
|
if blockHeaderObj.AccountBlock != nil {
|
|
// TODO: deal with block hash of account model
|
|
} else {
|
|
// TODO: totally switch to account model.
|
|
if !bytes.Equal(blockHash[:], blockHeaderObj.CalculateBlockHash()[:]) || !bytes.Equal(blockHeaderObj.Hash[:], blockHeaderObj.CalculateBlockHash()[:]) {
|
|
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
|
|
return
|
|
}
|
|
}
|
|
|
|
// check block data (transactions
|
|
if !consensus.BlockVerifier(&blockHeaderObj) {
|
|
consensus.Log.Warn("Block content is not verified successfully", "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
secret, msgToSend := consensus.constructCommitMessage(proto_consensus.Commit)
|
|
// Store the commitment secret
|
|
consensus.secret[consensusID] = secret
|
|
|
|
consensus.SendMessage(consensus.leader, msgToSend)
|
|
// consensus.Log.Warn("Sending Commit to leader", "state", targetState)
|
|
|
|
// Set state to CommitDone
|
|
consensus.state = CommitDone
|
|
}
|
|
|
|
// Processes the challenge message sent from the leader
|
|
func (consensus *Consensus) processChallengeMessage(payload []byte, targetState State) {
|
|
//#### Read payload data
|
|
offset := 0
|
|
// 4 byte consensus id
|
|
consensusID := binary.BigEndian.Uint32(payload[offset : offset+4])
|
|
offset += 4
|
|
|
|
// 32 byte block hash
|
|
blockHash := payload[offset : offset+32]
|
|
offset += 32
|
|
|
|
// 2 byte leader id
|
|
leaderID := binary.BigEndian.Uint16(payload[offset : offset+2])
|
|
offset += 2
|
|
|
|
// 33 byte of aggregated commit
|
|
aggreCommit := payload[offset : offset+33]
|
|
offset += 33
|
|
|
|
// 33 byte of aggregated key
|
|
aggreKey := payload[offset : offset+33]
|
|
offset += 33
|
|
|
|
// 32 byte of challenge
|
|
challenge := payload[offset : offset+32]
|
|
offset += 32
|
|
|
|
// 64 byte of signature on previous data
|
|
signature := payload[offset : offset+64]
|
|
offset += 64
|
|
//#### END: Read payload data
|
|
|
|
// Update readyByConsensus for attack.
|
|
attack.GetInstance().UpdateConsensusReady(consensusID)
|
|
|
|
// Verify block data and the aggregated signatures
|
|
// check leader Id
|
|
myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader)
|
|
if leaderID != myLeaderID {
|
|
consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// Verify signature
|
|
if schnorr.Verify(crypto.Ed25519Curve, consensus.leader.PubKey, payload[:offset-64], signature) != nil {
|
|
consensus.Log.Warn("Received message with invalid signature", "leaderKey", consensus.leader.PubKey, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// Add attack model of IncorrectResponse.
|
|
if attack.GetInstance().IncorrectResponse() {
|
|
consensus.Log.Warn("IncorrectResponse attacked")
|
|
return
|
|
}
|
|
|
|
consensus.mutex.Lock()
|
|
defer consensus.mutex.Unlock()
|
|
|
|
// check block hash
|
|
if !bytes.Equal(blockHash[:], consensus.blockHash[:]) {
|
|
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// check consensus Id
|
|
if consensusID != consensus.consensusID {
|
|
consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
|
|
if _, ok := consensus.blocksReceived[consensus.consensusID]; !ok {
|
|
return
|
|
}
|
|
consensus.Log.Warn("ROLLING UP", "consensus", consensus)
|
|
// If I received previous block (which haven't been processed. I will roll up to current block if everything checks.
|
|
}
|
|
|
|
aggCommitment := crypto.Ed25519Curve.Point()
|
|
aggCommitment.UnmarshalBinary(aggreCommit[:32]) // TODO: figure out whether it's 33 bytes or 32 bytes
|
|
aggKey := crypto.Ed25519Curve.Point()
|
|
aggKey.UnmarshalBinary(aggreKey[:32])
|
|
|
|
reconstructedChallenge, err := crypto.Challenge(crypto.Ed25519Curve, aggCommitment, aggKey, payload[:36]) // Only consensus Id and block hash
|
|
|
|
if err != nil {
|
|
log.Error("Failed to reconstruct the challenge from commits and keys")
|
|
return
|
|
}
|
|
|
|
// For now, simply return the private key of this node.
|
|
receivedChallenge := crypto.Ed25519Curve.Scalar()
|
|
err = receivedChallenge.UnmarshalBinary(challenge)
|
|
if err != nil {
|
|
log.Error("Failed to deserialize challenge", "err", err)
|
|
return
|
|
}
|
|
|
|
if !reconstructedChallenge.Equal(receivedChallenge) {
|
|
log.Error("The challenge doesn't match the commitments and keys")
|
|
return
|
|
}
|
|
|
|
response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret[consensusID], receivedChallenge)
|
|
if err != nil {
|
|
log.Warn("validator failed to generate response", "err", err, "priKey", consensus.priKey, "nodeID", consensus.nodeID, "secret", consensus.secret[consensusID])
|
|
return
|
|
}
|
|
|
|
msgTypeToSend := proto_consensus.Response
|
|
if targetState == FinalResponseDone {
|
|
msgTypeToSend = proto_consensus.FinalResponse
|
|
}
|
|
msgToSend := consensus.constructResponseMessage(msgTypeToSend, response)
|
|
|
|
consensus.SendMessage(consensus.leader, msgToSend)
|
|
// consensus.Log.Warn("Sending Response to leader", "state", targetState)
|
|
// Set state to target state (ResponseDone, FinalResponseDone)
|
|
consensus.state = targetState
|
|
|
|
if consensus.state == FinalResponseDone {
|
|
// BIG TODO: the block catch up logic is basically a mock now. More checks need to be done to make it correct.
|
|
// The logic is to roll up to the latest blocks one by one to try catching up with the leader.
|
|
for {
|
|
val, ok := consensus.blocksReceived[consensus.consensusID]
|
|
if ok {
|
|
delete(consensus.blocksReceived, consensus.consensusID)
|
|
|
|
consensus.blockHash = [32]byte{}
|
|
delete(consensus.secret, consensusID)
|
|
consensus.consensusID++ // roll up one by one, until the next block is not received yet.
|
|
|
|
// TODO: think about when validators know about the consensus is reached.
|
|
// For now, the blockchain is updated right here.
|
|
|
|
// TODO: reconstruct the whole block from header and transactions
|
|
// For now, we used the stored whole block in consensus.blockHeader
|
|
txDecoder := gob.NewDecoder(bytes.NewReader(val.blockHeader))
|
|
var blockHeaderObj blockchain.Block
|
|
err := txDecoder.Decode(&blockHeaderObj)
|
|
if err != nil {
|
|
consensus.Log.Debug("failed to construct the new block after consensus")
|
|
}
|
|
// check block data (transactions
|
|
if !consensus.BlockVerifier(&blockHeaderObj) {
|
|
consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusID", consensus.consensusID)
|
|
return
|
|
}
|
|
consensus.Log.Info("Finished Response. Adding block to chain", "numTx", len(blockHeaderObj.Transactions))
|
|
consensus.OnConsensusDone(&blockHeaderObj)
|
|
} else {
|
|
break
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// Processes the collective signature message sent from the leader
|
|
func (consensus *Consensus) processCollectiveSigMessage(payload []byte) {
|
|
//#### Read payload data
|
|
offset := 0
|
|
// 4 byte consensus id
|
|
consensusID := binary.BigEndian.Uint32(payload[offset : offset+4])
|
|
offset += 4
|
|
|
|
// 32 byte block hash
|
|
blockHash := payload[offset : offset+32]
|
|
offset += 32
|
|
|
|
// 2 byte leader id
|
|
leaderID := binary.BigEndian.Uint16(payload[offset : offset+2])
|
|
offset += 2
|
|
|
|
// 64 byte of collective signature
|
|
collectiveSig := payload[offset : offset+64]
|
|
offset += 64
|
|
|
|
// N byte of bitmap
|
|
n := len(payload) - offset - 64 // the number means 64 signature
|
|
bitmap := payload[offset : offset+n]
|
|
offset += n
|
|
|
|
// 64 byte of signature on previous data
|
|
signature := payload[offset : offset+64]
|
|
offset += 64
|
|
//#### END: Read payload data
|
|
|
|
copy(consensus.blockHash[:], blockHash[:])
|
|
|
|
// Verify block data
|
|
// check leader Id
|
|
myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader)
|
|
if leaderID != myLeaderID {
|
|
consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// Verify signature
|
|
if schnorr.Verify(crypto.Ed25519Curve, consensus.leader.PubKey, payload[:offset-64], signature) != nil {
|
|
consensus.Log.Warn("Received message with invalid signature", "leaderKey", consensus.leader.PubKey, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// Verify collective signature
|
|
err := crypto.Verify(crypto.Ed25519Curve, consensus.PublicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.PublicKeys)/3)+1))
|
|
if err != nil {
|
|
consensus.Log.Warn("Failed to verify the collective sig message", "consensusID", consensusID, "err", err, "bitmap", bitmap, "NodeID", consensus.nodeID, "#PK", len(consensus.PublicKeys))
|
|
return
|
|
}
|
|
|
|
// Add attack model of IncorrectResponse.
|
|
if attack.GetInstance().IncorrectResponse() {
|
|
consensus.Log.Warn("IncorrectResponse attacked")
|
|
return
|
|
}
|
|
|
|
// check consensus Id
|
|
if consensusID != consensus.consensusID {
|
|
consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
// check block hash
|
|
if !bytes.Equal(blockHash[:], consensus.blockHash[:]) {
|
|
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
|
|
return
|
|
}
|
|
|
|
secret, msgToSend := consensus.constructCommitMessage(proto_consensus.FinalCommit)
|
|
// Store the commitment secret
|
|
consensus.secret[consensusID] = secret
|
|
|
|
consensus.SendMessage(consensus.leader, msgToSend)
|
|
|
|
// Set state to CommitDone
|
|
consensus.state = FinalCommitDone
|
|
}
|
|
|