package consensus
import (
"bytes"
"github.com/dedis/kyber/sign/schnorr"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
consensus_proto "github.com/harmony-one/harmony/api/consensus"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/internal/attack"
"github.com/harmony-one/harmony/internal/utils"
)
// ProcessMessageValidator dispatches validator's consensus message.
func ( consensus * Consensus ) ProcessMessageValidator ( payload [ ] byte ) {
message := consensus_proto . Message { }
err := message . XXX_Unmarshal ( payload )
if err != nil {
consensus . Log . Error ( "Failed to unmarshal message payload." , "err" , err , "consensus" , consensus )
}
switch message . Type {
case consensus_proto . MessageType_ANNOUNCE :
consensus . processAnnounceMessage ( message )
case consensus_proto . MessageType_CHALLENGE :
consensus . processChallengeMessage ( message , ResponseDone )
case consensus_proto . MessageType_FINAL_CHALLENGE :
consensus . processChallengeMessage ( message , FinalResponseDone )
case consensus_proto . MessageType_COLLECTIVE_SIG :
consensus . processCollectiveSigMessage ( message )
default :
consensus . Log . Error ( "Unexpected message type" , "msgType" , message . Type , "consensus" , consensus )
}
}
// Processes the announce message sent from the leader
func ( consensus * Consensus ) processAnnounceMessage ( message consensus_proto . Message ) {
consensus . Log . Info ( "Received Announce Message" , "nodeID" , consensus . nodeID )
consensusID := message . ConsensusId
blockHash := message . BlockHash
leaderID := message . SenderId
block := message . Payload
signature := message . Signature
copy ( consensus . blockHash [ : ] , blockHash [ : ] )
// Verify block data
// check leader Id
myLeaderID := utils . GetUniqueIDFromPeer ( consensus . leader )
if leaderID != myLeaderID {
consensus . Log . Warn ( "Received message from wrong leader" , "myLeaderID" , myLeaderID , "receivedLeaderId" , leaderID , "consensus" , consensus )
return
}
// Verify signature
message . Signature = nil
messageBytes , err := message . XXX_Marshal ( [ ] byte { } , true )
if err != nil {
consensus . Log . Warn ( "Failed to marshal the announce message" , "error" , err )
}
if schnorr . Verify ( crypto . Ed25519Curve , consensus . leader . PubKey , messageBytes , signature ) != nil {
consensus . Log . Warn ( "Received message with invalid signature" , "leaderKey" , consensus . leader . PubKey , "consensus" , consensus )
return
}
// check block header is valid
var blockObj types . Block
err = rlp . DecodeBytes ( block , & blockObj )
if err != nil {
consensus . Log . Warn ( "Unparseable block header data" , "error" , err )
return
}
consensus . block = block
// Add block to received block cache
consensus . mutex . Lock ( )
consensus . blocksReceived [ consensusID ] = & BlockConsensusStatus { block , consensus . state }
consensus . mutex . Unlock ( )
// Add attack model of IncorrectResponse.
if attack . GetInstance ( ) . IncorrectResponse ( ) {
consensus . Log . Warn ( "IncorrectResponse attacked" )
return
}
// check block hash
hash := blockObj . Hash ( )
if ! bytes . Equal ( blockHash [ : ] , hash [ : ] ) {
consensus . Log . Warn ( "Block hash doesn't match" , "consensus" , consensus )
return
}
// check block data (transactions
if ! consensus . BlockVerifier ( & blockObj ) {
consensus . Log . Warn ( "Block content is not verified successfully" , "consensus" , consensus )
return
}
// Commit and store the commit
secret , msgToSend := consensus . constructCommitMessage ( consensus_proto . MessageType_COMMIT )
consensus . secret [ consensusID ] = secret
consensus . SendMessage ( consensus . leader , msgToSend )
// consensus.Log.Warn("Sending Commit to leader", "state", targetState)
// Set state to CommitDone
consensus . state = CommitDone
}
// Processes the challenge message sent from the leader
func ( consensus * Consensus ) processChallengeMessage ( message consensus_proto . Message , targetState State ) {
consensus . Log . Info ( "Received Challenge Message" , "nodeID" , consensus . nodeID )
consensusID := message . ConsensusId
blockHash := message . BlockHash
leaderID := message . SenderId
messagePayload := message . Payload
signature := message . Signature
//#### Read payload data
// TODO: use BLS-based multi-sig
offset := 0
// 33 byte of aggregated commit
aggreCommit := messagePayload [ offset : offset + 33 ]
offset += 33
// 33 byte of aggregated key
aggreKey := messagePayload [ offset : offset + 33 ]
offset += 33
// 32 byte of challenge
challenge := messagePayload [ offset : offset + 32 ]
offset += 32
// Update readyByConsensus for attack.
attack . GetInstance ( ) . UpdateConsensusReady ( consensusID )
// Verify block data and the aggregated signatures
// check leader Id
myLeaderID := utils . GetUniqueIDFromPeer ( consensus . leader )
if uint32 ( leaderID ) != myLeaderID {
consensus . Log . Warn ( "Received message from wrong leader" , "myLeaderID" , myLeaderID , "receivedLeaderId" , leaderID , "consensus" , consensus )
return
}
// Verify signature
message . Signature = nil
messageBytes , err := message . XXX_Marshal ( [ ] byte { } , true )
if err != nil {
consensus . Log . Warn ( "Failed to marshal the announce message" , "error" , err )
}
if schnorr . Verify ( crypto . Ed25519Curve , consensus . leader . PubKey , messageBytes , signature ) != nil {
consensus . Log . Warn ( "Received message with invalid signature" , "leaderKey" , consensus . leader . PubKey , "consensus" , consensus )
return
}
// Add attack model of IncorrectResponse.
if attack . GetInstance ( ) . IncorrectResponse ( ) {
consensus . Log . Warn ( "IncorrectResponse attacked" )
return
}
consensus . mutex . Lock ( )
defer consensus . mutex . Unlock ( )
// check block hash
if ! bytes . Equal ( blockHash [ : ] , consensus . blockHash [ : ] ) {
consensus . Log . Warn ( "Block hash doesn't match" , "consensus" , consensus )
return
}
aggCommitment := crypto . Ed25519Curve . Point ( )
aggCommitment . UnmarshalBinary ( aggreCommit [ : 32 ] )
aggKey := crypto . Ed25519Curve . Point ( )
aggKey . UnmarshalBinary ( aggreKey [ : 32 ] )
reconstructedChallenge , err := crypto . Challenge ( crypto . Ed25519Curve , aggCommitment , aggKey , blockHash )
if err != nil {
log . Error ( "Failed to reconstruct the challenge from commits and keys" )
return
}
// For now, simply return the private key of this node.
receivedChallenge := crypto . Ed25519Curve . Scalar ( )
err = receivedChallenge . UnmarshalBinary ( challenge )
if err != nil {
log . Error ( "Failed to deserialize challenge" , "err" , err )
return
}
if ! reconstructedChallenge . Equal ( receivedChallenge ) {
log . Error ( "The challenge doesn't match the commitments and keys" )
return
}
response , err := crypto . Response ( crypto . Ed25519Curve , consensus . priKey , consensus . secret [ consensusID ] , receivedChallenge )
if err != nil {
log . Warn ( "validator failed to generate response" , "err" , err , "priKey" , consensus . priKey , "nodeID" , consensus . nodeID , "secret" , consensus . secret [ consensusID ] )
return
}
msgTypeToSend := consensus_proto . MessageType_RESPONSE
if targetState == FinalResponseDone {
msgTypeToSend = consensus_proto . MessageType_FINAL_RESPONSE
}
msgToSend := consensus . constructResponseMessage ( msgTypeToSend , response )
consensus . SendMessage ( consensus . leader , msgToSend )
// consensus.Log.Warn("Sending Response to leader", "state", targetState)
// Set state to target state (ResponseDone, FinalResponseDone)
consensus . state = targetState
if consensus . state == FinalResponseDone {
// TODO: the block catch up logic is a temporary workaround for full failure node catchup. Implement the full node catchup logic
// The logic is to roll up to the latest blocks one by one to try catching up with the leader.
for {
val , ok := consensus . blocksReceived [ consensus . consensusID ]
if ok {
delete ( consensus . blocksReceived , consensus . consensusID )
consensus . blockHash = [ 32 ] byte { }
delete ( consensus . secret , consensusID )
consensus . consensusID = consensusID + 1 // roll up one by one, until the next block is not received yet.
var blockObj types . Block
err := rlp . DecodeBytes ( val . block , & blockObj )
if err != nil {
consensus . Log . Warn ( "Unparseable block header data" , "error" , err )
return
}
if err != nil {
consensus . Log . Debug ( "failed to construct the new block after consensus" )
}
// check block data (transactions
if ! consensus . BlockVerifier ( & blockObj ) {
consensus . Log . Debug ( "[WARNING] Block content is not verified successfully" , "consensusID" , consensus . consensusID )
return
}
consensus . Log . Info ( "Finished Response. Adding block to chain" , "numTx" , len ( blockObj . Transactions ( ) ) )
consensus . OnConsensusDone ( & blockObj )
} else {
break
}
}
}
}
// Processes the collective signature message sent from the leader
func ( consensus * Consensus ) processCollectiveSigMessage ( message consensus_proto . Message ) {
consensusID := message . ConsensusId
blockHash := message . BlockHash
leaderID := message . SenderId
messagePayload := message . Payload
signature := message . Signature
//#### Read payload data
collectiveSig := messagePayload [ 0 : 64 ]
bitmap := messagePayload [ 64 : ]
//#### END: Read payload data
// Verify block data
// check leader Id
myLeaderID := utils . GetUniqueIDFromPeer ( consensus . leader )
if uint32 ( leaderID ) != myLeaderID {
consensus . Log . Warn ( "Received message from wrong leader" , "myLeaderID" , myLeaderID , "receivedLeaderId" , leaderID , "consensus" , consensus )
return
}
// Verify signature
message . Signature = nil
messageBytes , err := message . XXX_Marshal ( [ ] byte { } , true )
if err != nil {
consensus . Log . Warn ( "Failed to marshal the announce message" , "error" , err )
}
if schnorr . Verify ( crypto . Ed25519Curve , consensus . leader . PubKey , messageBytes , signature ) != nil {
consensus . Log . Warn ( "Received message with invalid signature" , "leaderKey" , consensus . leader . PubKey , "consensus" , consensus )
return
}
// Verify collective signature
err = crypto . Verify ( crypto . Ed25519Curve , consensus . PublicKeys , blockHash , append ( collectiveSig , bitmap ... ) , crypto . NewThresholdPolicy ( ( 2 * len ( consensus . PublicKeys ) / 3 ) + 1 ) )
if err != nil {
consensus . Log . Warn ( "Failed to verify the collective sig message" , "consensusID" , consensusID , "err" , err , "bitmap" , bitmap , "NodeID" , consensus . nodeID , "#PK" , len ( consensus . PublicKeys ) )
return
}
// Add attack model of IncorrectResponse.
if attack . GetInstance ( ) . IncorrectResponse ( ) {
consensus . Log . Warn ( "IncorrectResponse attacked" )
return
}
// check consensus Id
if consensusID != consensus . consensusID {
consensus . Log . Warn ( "Received message with wrong consensus Id" , "myConsensusId" , consensus . consensusID , "theirConsensusId" , consensusID , "consensus" , consensus )
return
}
// check block hash
if ! bytes . Equal ( blockHash [ : ] , consensus . blockHash [ : ] ) {
consensus . Log . Warn ( "Block hash doesn't match" , "consensus" , consensus )
return
}
secret , msgToSend := consensus . constructCommitMessage ( consensus_proto . MessageType_FINAL_COMMIT )
// Store the commitment secret
consensus . secret [ consensusID ] = secret
consensus . SendMessage ( consensus . leader , msgToSend )
// Set state to CommitDone
consensus . state = FinalCommitDone
}