The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
woop/consensus/consensus_validator.go

290 lines
10 KiB

package consensus
import (
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/attack"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
// sendBFTBlockToStateSyncing will send the latest BFT consensus block to state syncing checkingjjkkkkkkkkkkkkkkkjnjk
func (consensus *Consensus) sendBFTBlockToStateSyncing(consensusID uint32) {
// validator send consensus block to state syncing
if val, ok := consensus.blocksReceived[consensusID]; ok {
consensus.mutex.Lock()
delete(consensus.blocksReceived, consensusID)
consensus.mutex.Unlock()
var blockObj types.Block
err := rlp.DecodeBytes(val.block, &blockObj)
if err != nil {
utils.GetLogInstance().Debug("failed to construct the cached block")
return
}
blockInfo := &BFTBlockInfo{Block: &blockObj, ConsensusID: consensusID}
select {
case consensus.ConsensusBlock <- blockInfo:
default:
utils.GetLogInstance().Warn("consensus block unable to sent to state sync", "height", blockObj.NumberU64(), "blockHash", blockObj.Hash().Hex())
}
}
return
}
// IsValidatorMessage checks if a message is to be sent to a validator.
func (consensus *Consensus) IsValidatorMessage(message *msg_pb.Message) bool {
return message.ReceiverType == msg_pb.ReceiverType_VALIDATOR && message.ServiceType == msg_pb.ServiceType_CONSENSUS
}
// ProcessMessageValidator dispatches validator's consensus message.
func (consensus *Consensus) ProcessMessageValidator(payload []byte) {
message := &msg_pb.Message{}
err := protobuf.Unmarshal(payload, message)
if err != nil {
utils.GetLogInstance().Error("Failed to unmarshal message payload.", "err", err, "consensus", consensus)
}
if !consensus.IsValidatorMessage(message) {
return
}
switch message.Type {
case msg_pb.MessageType_ANNOUNCE:
consensus.processAnnounceMessage(message)
case msg_pb.MessageType_PREPARED:
consensus.processPreparedMessage(message)
case msg_pb.MessageType_COMMITTED:
consensus.processCommittedMessage(message)
default:
utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "consensus", consensus)
}
}
// Processes the announce message sent from the leader
func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) {
utils.GetLogInstance().Info("Received Announce Message", "nodeID", consensus.nodeID)
consensusMsg := message.GetConsensus()
consensusID := consensusMsg.ConsensusId
blockHash := consensusMsg.BlockHash
block := consensusMsg.Payload
// Add block to received block cache
consensus.mutex.Lock()
consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state}
consensus.mutex.Unlock()
copy(consensus.blockHash[:], blockHash[:])
consensus.block = block
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the leader message")
if err == consensus_engine.ErrConsensusIDNotMatch {
utils.GetLogInstance().Debug("sending bft block to state syncing")
consensus.sendBFTBlockToStateSyncing(consensusID)
}
return
}
// check block header is valid
var blockObj types.Block
err := rlp.DecodeBytes(block, &blockObj)
if err != nil {
utils.GetLogInstance().Warn("Unparseable block header data", "error", err)
return
}
// Add attack model of IncorrectResponse
if attack.GetInstance().IncorrectResponse() {
utils.GetLogInstance().Warn("IncorrectResponse attacked")
return
}
// check block data transactions
if !consensus.BlockVerifier(&blockObj) {
utils.GetLogInstance().Warn("Block content is not verified successfully", "consensus", consensus)
return
}
// Construct and send prepare message
msgToSend := consensus.constructPrepareMessage()
if utils.UseLibP2P {
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
consensus.SendMessage(consensus.Leader, msgToSend)
}
consensus.state = PrepareDone
}
// Processes the prepared message sent from the leader
func (consensus *Consensus) processPreparedMessage(message *msg_pb.Message) {
utils.GetLogInstance().Info("Received Prepared Message", "nodeID", consensus.nodeID)
consensusMsg := message.GetConsensus()
consensusID := consensusMsg.ConsensusId
blockHash := consensusMsg.BlockHash
leaderID := consensusMsg.SenderId
messagePayload := consensusMsg.Payload
//#### Read payload data
offset := 0
// 48 byte of multi-sig
multiSig := messagePayload[offset : offset+48]
offset += 48
// bitmap
bitmap := messagePayload[offset:]
//#### END Read payload data
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processPreparedMessage error", "error", err)
return
}
// Add attack model of IncorrectResponse.
if attack.GetInstance().IncorrectResponse() {
utils.GetLogInstance().Warn("IncorrectResponse attacked")
return
}
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
// Verify the multi-sig for prepare phase
deserializedMultiSig := bls.Sign{}
err := deserializedMultiSig.Deserialize(multiSig)
if err != nil {
utils.GetLogInstance().Warn("Failed to deserialize the multi signature for prepare phase", "Error", err, "leader ID", leaderID)
return
}
mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil)
mask.SetMask(bitmap)
if !deserializedMultiSig.VerifyHash(mask.AggregatePublic, blockHash) || err != nil {
utils.GetLogInstance().Warn("Failed to verify the multi signature for prepare phase", "Error", err, "leader ID", leaderID)
return
}
consensus.aggregatedPrepareSig = &deserializedMultiSig
consensus.prepareBitmap = mask
// Construct and send the commit message
multiSigAndBitmap := append(multiSig, bitmap...)
msgToSend := consensus.constructCommitMessage(multiSigAndBitmap)
if utils.UseLibP2P {
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
consensus.SendMessage(consensus.Leader, msgToSend)
}
consensus.state = CommitDone
}
// Processes the committed message sent from the leader
func (consensus *Consensus) processCommittedMessage(message *msg_pb.Message) {
utils.GetLogInstance().Warn("Received Committed Message", "nodeID", consensus.nodeID)
consensusMsg := message.GetConsensus()
consensusID := consensusMsg.ConsensusId
leaderID := consensusMsg.SenderId
messagePayload := consensusMsg.Payload
//#### Read payload data
offset := 0
// 48 byte of multi-sig
multiSig := messagePayload[offset : offset+48]
offset += 48
// bitmap
bitmap := messagePayload[offset:]
//#### END Read payload data
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processCommittedMessage error", "error", err)
return
}
// Add attack model of IncorrectResponse.
if attack.GetInstance().IncorrectResponse() {
utils.GetLogInstance().Warn("IncorrectResponse attacked")
return
}
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
// Verify the multi-sig for commit phase
deserializedMultiSig := bls.Sign{}
err := deserializedMultiSig.Deserialize(multiSig)
if err != nil {
utils.GetLogInstance().Warn("Failed to deserialize the multi signature for commit phase", "Error", err, "leader ID", leaderID)
return
}
mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil)
mask.SetMask(bitmap)
prepareMultiSigAndBitmap := append(consensus.aggregatedPrepareSig.Serialize(), consensus.prepareBitmap.Bitmap...)
if !deserializedMultiSig.VerifyHash(mask.AggregatePublic, prepareMultiSigAndBitmap) || err != nil {
utils.GetLogInstance().Warn("Failed to verify the multi signature for commit phase", "Error", err, "leader ID", leaderID)
return
}
consensus.aggregatedCommitSig = &deserializedMultiSig
consensus.commitBitmap = mask
consensus.state = CommittedDone
// TODO: the block catch up logic is a temporary workaround for full failure node catchup. Implement the full node catchup logic
// The logic is to roll up to the latest blocks one by one to try catching up with the leader.
// but because of checkConsensusMessage, the catchup logic will never be used here
for {
val, ok := consensus.blocksReceived[consensus.consensusID]
if ok {
delete(consensus.blocksReceived, consensus.consensusID)
consensus.blockHash = [32]byte{}
consensus.consensusID = consensusID + 1 // roll up one by one, until the next block is not received yet.
var blockObj types.Block
err := rlp.DecodeBytes(val.block, &blockObj)
if err != nil {
utils.GetLogInstance().Debug("failed to construct the new block after consensus")
}
// check block data (transactions
if !consensus.BlockVerifier(&blockObj) {
utils.GetLogInstance().Debug("[WARNING] Block content is not verified successfully", "consensusID", consensus.consensusID)
return
}
// Put the signatures into the block
copy(blockObj.Header().PrepareSignature[:], consensus.aggregatedPrepareSig.Serialize()[:])
copy(blockObj.Header().PrepareBitmap[:], consensus.prepareBitmap.Bitmap)
copy(blockObj.Header().CommitSignature[:], consensus.aggregatedCommitSig.Serialize()[:])
copy(blockObj.Header().CommitBitmap[:], consensus.commitBitmap.Bitmap)
utils.GetLogInstance().Info("Adding block to chain", "numTx", len(blockObj.Transactions()))
consensus.OnConsensusDone(&blockObj)
consensus.ResetState()
select {
case consensus.VerifiedNewBlock <- &blockObj:
default:
utils.GetLogInstance().Info("[SYNC] consensus verified block send to chan failed", "blockHash", blockObj.Hash())
continue
}
} else {
break
}
}
}