Merge pull request #359 from harmony-one/rj_branch

Refactor basic message checking code in consensus
pull/363/head
Rongjian Lan 6 years ago committed by GitHub
commit dd17e4c60a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      consensus/consensus.go
  2. 91
      consensus/consensus_leader.go
  3. 40
      consensus/consensus_leader_msg.go
  4. 71
      consensus/consensus_validator.go
  5. 18
      consensus/consensus_validator_msg.go

@ -2,6 +2,7 @@
package consensus // consensus
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
@ -195,6 +196,46 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons
return &consensus
}
// Checks the basic meta of a consensus message.
func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) bool {
consensusID := message.ConsensusId
blockHash := message.BlockHash
// Verify message signature
err := verifyMessageSig(publicKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err)
return false
}
// check consensus Id
if consensusID != consensus.consensusID {
utils.GetLogInstance().Warn("Wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return false
}
if !bytes.Equal(blockHash, consensus.blockHash[:]) {
utils.GetLogInstance().Warn("Wrong blockHash", "consensus", consensus)
return false
}
return true
}
// Gets the validator peer based on validator ID.
func (consensus *Consensus) getValidatorPeerByID(validatorID uint32) *p2p.Peer {
v, ok := consensus.validators.Load(validatorID)
if !ok {
utils.GetLogInstance().Warn("Unrecognized validator", "validatorID", validatorID, "consensus", consensus)
return nil
}
value, ok := v.(p2p.Peer)
if !ok {
utils.GetLogInstance().Warn("Invalid validator", "validatorID", validatorID, "consensus", consensus)
return nil
}
return &value
}
// Verify the signature of the message are valid from the signer's public key.
func verifyMessageSig(signerPubKey *bls.PublicKey, message consensus_proto.Message) error {
signature := message.Signature
@ -229,6 +270,20 @@ func (consensus *Consensus) signMessage(message []byte) []byte {
return signature.Serialize()
}
// Sign on the consensus message signature field.
func (consensus *Consensus) signConsensusMessage(message *consensus_proto.Message) error {
message.Signature = nil
// TODO: use custom serialization method rather than protobuf
marshaledMessage, err := protobuf.Marshal(message)
if err != nil {
return err
}
// 64 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage)
message.Signature = signature
return nil
}
// GetValidatorPeers returns list of validator peers.
func (consensus *Consensus) GetValidatorPeers() []p2p.Peer {
validatorPeers := make([]p2p.Peer, 0)

@ -1,7 +1,6 @@
package consensus
import (
"bytes"
"encoding/hex"
"strconv"
"time"
@ -15,7 +14,6 @@ import (
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/profiler"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
@ -46,12 +44,14 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block) {
startTime = time.Now()
utils.GetLogInstance().Debug("STARTING CONSENSUS", "numTxs", len(newBlock.Transactions()), "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
for consensus.state == Finished {
// time.Sleep(500 * time.Millisecond)
for { // Wait until last consensus is finished
if consensus.state == Finished {
consensus.ResetState()
consensus.startConsensus(newBlock)
break
}
time.Sleep(500 * time.Millisecond)
}
}
}
@ -103,48 +103,24 @@ func (consensus *Consensus) startConsensus(newBlock *types.Block) {
// processPrepareMessage processes the prepare message sent from validators
func (consensus *Consensus) processPrepareMessage(message consensus_proto.Message) {
consensusID := message.ConsensusId
blockHash := message.BlockHash
validatorID := message.SenderId
prepareSig := message.Payload
// Verify signature
v, ok := consensus.validators.Load(validatorID)
if !ok {
utils.GetLogInstance().Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus)
return
}
value, ok := v.(p2p.Peer)
if !ok {
utils.GetLogInstance().Warn("Invalid validator", "validatorID", validatorID, "consensus", consensus)
return
}
// Verify message signature
err := verifyMessageSig(value.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "validatorID", validatorID)
return
}
prepareSigs := consensus.prepareSigs
prepareBitmap := consensus.prepareBitmap
// check consensus Id
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if consensusID != consensus.consensusID {
utils.GetLogInstance().Warn("Received Commit with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return
}
if !bytes.Equal(blockHash, consensus.blockHash[:]) {
utils.GetLogInstance().Warn("Received Commit with wrong blockHash", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
validatorPeer := consensus.getValidatorPeerByID(validatorID)
if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) {
utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID)
return
}
prepareSigs := consensus.prepareSigs
prepareBitmap := consensus.prepareBitmap
// proceed only when the message is not received before
_, ok = (*prepareSigs)[validatorID]
_, ok := (*prepareSigs)[validatorID]
if ok {
utils.GetLogInstance().Debug("Already received prepare message from the validator", "validatorID", validatorID)
return
@ -157,13 +133,13 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag
// Check BLS signature for the multi-sig
var sign bls.Sign
err = sign.Deserialize(prepareSig)
err := sign.Deserialize(prepareSig)
if err != nil {
utils.GetLogInstance().Error("Failed to deserialize bls signature", "validatorID", validatorID)
return
}
if !sign.VerifyHash(value.PubKey, consensus.blockHash[:]) {
if !sign.VerifyHash(validatorPeer.PubKey, consensus.blockHash[:]) {
utils.GetLogInstance().Error("Received invalid BLS signature", "validatorID", validatorID)
return
}
@ -172,7 +148,7 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag
utils.GetLogInstance().Debug("Received new prepare signature", "numReceivedSoFar", len(*prepareSigs), "validatorID", validatorID, "PublicKeys", len(consensus.PublicKeys))
// Set the bitmap indicate this validate signed.
prepareBitmap.SetKey(value.PubKey, true)
prepareBitmap.SetKey(validatorPeer.PubKey, true)
targetState := PreparedDone
if len((*prepareSigs)) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state < targetState {
@ -196,41 +172,16 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag
// Processes the commit message sent from validators
func (consensus *Consensus) processCommitMessage(message consensus_proto.Message) {
consensusID := message.ConsensusId
blockHash := message.BlockHash
validatorID := message.SenderId
commitSig := message.Payload
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
// check consensus Id
if consensusID != consensus.consensusID {
utils.GetLogInstance().Warn("Received Commit with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return
}
if !bytes.Equal(blockHash, consensus.blockHash[:]) {
utils.GetLogInstance().Warn("Received Commit with wrong blockHash", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return
}
// Verify signature
v, ok := consensus.validators.Load(validatorID)
if !ok {
utils.GetLogInstance().Warn("Received message from unrecognized validator", "validatorID", validatorID, "consensus", consensus)
return
}
value, ok := v.(p2p.Peer)
if !ok {
utils.GetLogInstance().Warn("Invalid validator", "validatorID", validatorID, "consensus", consensus)
return
}
validatorPeer := consensus.getValidatorPeerByID(validatorID)
// Verify message signature
err := verifyMessageSig(value.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "validatorID", validatorID)
if !consensus.checkConsensusMessage(message, validatorPeer.PubKey) {
utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID)
return
}
@ -238,7 +189,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
commitBitmap := consensus.commitBitmap
// proceed only when the message is not received before
_, ok = (*commitSigs)[validatorID]
_, ok := (*commitSigs)[validatorID]
if ok {
utils.GetLogInstance().Debug("Already received commit message from the validator", "validatorID", validatorID)
return
@ -249,7 +200,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
}
var sign bls.Sign
err = sign.Deserialize(commitSig)
err := sign.Deserialize(commitSig)
if err != nil {
utils.GetLogInstance().Debug("Failed to deserialize bls signature", "validatorID", validatorID)
return
@ -257,7 +208,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
// Verify the signature on prepare multi-sig and bitmap is correct
aggSig := bls_cosi.AggregateSig(consensus.GetPrepareSigsArray())
if !sign.VerifyHash(value.PubKey, append(aggSig.Serialize(), consensus.prepareBitmap.Bitmap...)) {
if !sign.VerifyHash(validatorPeer.PubKey, append(aggSig.Serialize(), consensus.prepareBitmap.Bitmap...)) {
utils.GetLogInstance().Error("Received invalid BLS signature", "validatorID", validatorID)
return
}
@ -265,7 +216,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
(*commitSigs)[validatorID] = &sign
utils.GetLogInstance().Debug("Received new commit message", "numReceivedSoFar", len(*commitSigs), "validatorID", strconv.Itoa(int(validatorID)))
// Set the bitmap indicate this validate signed.
commitBitmap.SetKey(value.PubKey, true)
commitBitmap.SetKey(validatorPeer.PubKey, true)
targetState := CommittedDone
if len(*commitSigs) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState {

@ -26,21 +26,17 @@ func (consensus *Consensus) constructAnnounceMessage() []byte {
message.SenderId = uint32(consensus.nodeID)
// n byte of block header
message.Payload = consensus.block
message.Payload = consensus.block // TODO: send only block header in the announce phase.
marshaledMessage, err := protobuf.Marshal(&message)
err := consensus.signConsensusMessage(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Announce message", "error", err)
utils.GetLogInstance().Debug("Failed to sign the Announce message", "error", err)
}
// 64 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage)
message.Signature = signature
marshaledMessage, err = protobuf.Marshal(&message)
marshaledMessage, err := protobuf.Marshal(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Announce message", "error", err)
utils.GetLogInstance().Debug("Failed to marshal the Announce message", "error", err)
}
utils.GetLogInstance().Info("New Announce", "NodeID", consensus.nodeID)
return proto.ConstructConsensusMessage(marshaledMessage)
}
@ -71,20 +67,15 @@ func (consensus *Consensus) constructPreparedMessage() ([]byte, *bls.Sign) {
message.Payload = buffer.Bytes()
//// END Payload
// TODO: use custom serialization method rather than protobuf
marshaledMessage, err := protobuf.Marshal(&message)
err := consensus.signConsensusMessage(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Prepared message", "error", err)
utils.GetLogInstance().Debug("Failed to sign the Prepared message", "error", err)
}
// 48 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage)
message.Signature = signature
marshaledMessage, err = protobuf.Marshal(&message)
marshaledMessage, err := protobuf.Marshal(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Prepared message", "error", err)
utils.GetLogInstance().Debug("Failed to marshal the Prepared message", "error", err)
}
utils.GetLogInstance().Info("New Prepared Message", "NodeID", consensus.nodeID, "bitmap", consensus.prepareBitmap)
return proto.ConstructConsensusMessage(marshaledMessage), aggSig
}
@ -114,19 +105,14 @@ func (consensus *Consensus) constructCommittedMessage() ([]byte, *bls.Sign) {
message.Payload = buffer.Bytes()
//// END Payload
// TODO: use custom serialization method rather than protobuf
marshaledMessage, err := protobuf.Marshal(&message)
err := consensus.signConsensusMessage(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Committed message", "error", err)
utils.GetLogInstance().Debug("Failed to sign the Committed message", "error", err)
}
// 48 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage)
message.Signature = signature
marshaledMessage, err = protobuf.Marshal(&message)
marshaledMessage, err := protobuf.Marshal(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Committed message", "error", err)
utils.GetLogInstance().Debug("Failed to marshal the Committed message", "error", err)
}
utils.GetLogInstance().Info("New Prepared Message", "NodeID", consensus.nodeID, "bitmap", consensus.commitBitmap)
return proto.ConstructConsensusMessage(marshaledMessage), aggSig
}

@ -1,8 +1,6 @@
package consensus
import (
"bytes"
"github.com/harmony-one/bls/ffi/go/bls"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
@ -40,29 +38,18 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
consensusID := message.ConsensusId
blockHash := message.BlockHash
leaderID := message.SenderId
block := message.Payload
copy(consensus.blockHash[:], blockHash[:])
// Verify block data
// check leader Id
myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader)
if leaderID != myLeaderID {
utils.GetLogInstance().Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus)
return
}
// Verify message signature
err := verifyMessageSig(consensus.leader.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "leader ID", leaderID)
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) {
utils.GetLogInstance().Debug("Failed to check the leader message")
return
}
// check block header is valid
var blockObj types.Block
err = rlp.DecodeBytes(block, &blockObj)
err := rlp.DecodeBytes(block, &blockObj)
if err != nil {
utils.GetLogInstance().Warn("Unparseable block header data", "error", err)
return
@ -81,13 +68,6 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
return
}
// check block hash
hash := blockObj.Hash()
if !bytes.Equal(blockHash[:], hash[:]) {
utils.GetLogInstance().Warn("Block hash doesn't match", "consensus", consensus)
return
}
// check block data (transactions
if !consensus.BlockVerifier(&blockObj) {
utils.GetLogInstance().Warn("Block content is not verified successfully", "consensus", consensus)
@ -125,18 +105,8 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
// Verify block data and the aggregated signatures
// check leader Id
myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader)
if uint32(leaderID) != myLeaderID {
utils.GetLogInstance().Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus)
return
}
// Verify message signature
err := verifyMessageSig(consensus.leader.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "leader ID", leaderID)
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) {
utils.GetLogInstance().Debug("Failed to check the leader message")
return
}
@ -149,14 +119,8 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
// check block hash
if !bytes.Equal(blockHash[:], consensus.blockHash[:]) {
utils.GetLogInstance().Warn("Block hash doesn't match", "consensus", consensus)
return
}
deserializedMultiSig := bls.Sign{}
err = deserializedMultiSig.Deserialize(multiSig)
err := deserializedMultiSig.Deserialize(multiSig)
if err != nil {
utils.GetLogInstance().Warn("Failed to deserialize the multi signature for prepare phase", "Error", err, "leader ID", leaderID)
return
@ -185,7 +149,6 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess
utils.GetLogInstance().Warn("Received Prepared Message", "nodeID", consensus.nodeID)
consensusID := message.ConsensusId
blockHash := message.BlockHash
leaderID := message.SenderId
messagePayload := message.Payload
@ -201,18 +164,8 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
// Verify block data and the aggregated signatures
// check leader Id
myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader)
if uint32(leaderID) != myLeaderID {
utils.GetLogInstance().Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus)
return
}
// Verify message signature
err := verifyMessageSig(consensus.leader.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err, "leader ID", leaderID)
if !consensus.checkConsensusMessage(message, consensus.leader.PubKey) {
utils.GetLogInstance().Debug("Failed to check the leader message")
return
}
@ -232,14 +185,8 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess
return
}
// check block hash
if !bytes.Equal(blockHash[:], consensus.blockHash[:]) {
utils.GetLogInstance().Warn("Block hash doesn't match", "consensus", consensus)
return
}
deserializedMultiSig := bls.Sign{}
err = deserializedMultiSig.Deserialize(multiSig)
err := deserializedMultiSig.Deserialize(multiSig)
if err != nil {
utils.GetLogInstance().Warn("Failed to deserialize the multi signature for commit phase", "Error", err, "leader ID", leaderID)
return

@ -27,15 +27,12 @@ func (consensus *Consensus) constructPrepareMessage() []byte {
message.Payload = sign.Serialize()
}
marshaledMessage, err := protobuf.Marshal(&message)
err := consensus.signConsensusMessage(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Prepare message", "error", err)
utils.GetLogInstance().Debug("Failed to sign the Prepare message", "error", err)
}
// 64 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage)
message.Signature = signature
marshaledMessage, err = protobuf.Marshal(&message)
marshaledMessage, err := protobuf.Marshal(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Prepare message", "error", err)
}
@ -63,15 +60,12 @@ func (consensus *Consensus) constructCommitMessage(multiSigAndBitmap []byte) []b
message.Payload = sign.Serialize()
}
marshaledMessage, err := protobuf.Marshal(&message)
err := consensus.signConsensusMessage(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Commit message", "error", err)
utils.GetLogInstance().Debug("Failed to sign the Commit message", "error", err)
}
// 64 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage)
message.Signature = signature
marshaledMessage, err = protobuf.Marshal(&message)
marshaledMessage, err := protobuf.Marshal(&message)
if err != nil {
utils.GetLogInstance().Debug("Failed to marshal Commit message", "error", err)
}

Loading…
Cancel
Save