Merge branch 'master' of github.com:harmony-one/harmony into rj_fork

pull/1016/head
Rongjian Lan 6 years ago
commit d2cdd00043
  1. 17
      consensus/consensus_service.go
  2. 217
      consensus/consensus_v2.go
  3. 91
      consensus/view_change.go

@ -11,6 +11,7 @@ import (
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
@ -638,3 +639,19 @@ func (consensus *Consensus) reportMetrics(block types.Block) {
}
profiler.LogMetrics(metrics)
}
// logger returns a sub-logger with consensus contexts added.
func (consensus *Consensus) logger(logger log.Logger) log.Logger {
return logger.New(
"myBlock", consensus.blockNum,
"myViewID", consensus.viewID,
"phase", consensus.phase,
"mode", consensus.mode.Mode(),
)
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() log.Logger {
logger := consensus.logger(utils.GetLogInstance())
return utils.WithCallerSkip(logger, 1)
}

@ -60,11 +60,11 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
func (consensus *Consensus) tryAnnounce(block *types.Block) {
// here we assume the leader should always be update to date
if block.NumberU64() != consensus.blockNum {
utils.GetLogger().Debug("tryAnnounce blockNum not match", "blockNum", block.NumberU64(), "myBlockNum", consensus.blockNum)
consensus.getLogger().Debug("tryAnnounce blockNum not match", "blockNum", block.NumberU64())
return
}
if !consensus.PubKey.IsEqual(consensus.LeaderPubKey) {
utils.GetLogger().Debug("tryAnnounce key not match", "myKey", consensus.PubKey, "leaderKey", consensus.LeaderPubKey)
consensus.getLogger().Debug("tryAnnounce key not match", "myKey", consensus.PubKey, "leaderKey", consensus.LeaderPubKey)
return
}
blockHash := block.Hash()
@ -73,7 +73,7 @@ func (consensus *Consensus) tryAnnounce(block *types.Block) {
// prepare message and broadcast to validators
encodedBlock, err := rlp.EncodeToBytes(block)
if err != nil {
utils.GetLogger().Debug("tryAnnounce Failed encoding block")
consensus.getLogger().Debug("tryAnnounce Failed encoding block")
return
}
consensus.block = encodedBlock
@ -86,7 +86,7 @@ func (consensus *Consensus) tryAnnounce(block *types.Block) {
_ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg)
if err != nil {
utils.GetLogger().Warn("tryAnnounce unable to parse pbft message", "error", err)
consensus.getLogger().Warn("tryAnnounce unable to parse pbft message", "error", err)
return
}
@ -97,37 +97,36 @@ func (consensus *Consensus) tryAnnounce(block *types.Block) {
consensus.prepareSigs[consensus.SelfAddress] = consensus.priKey.SignHash(consensus.blockHash[:])
// Construct broadcast p2p message
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum, "groupID", p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send announce message")
consensus.getLogger().Warn("cannot send announce message", "groupID", p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))
} else {
logger.Debug("sent announce message")
consensus.getLogger().Debug("sent announce message")
}
}
func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
utils.GetLogger().Debug("receive announce message", "phase", consensus.phase, "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("receive announce message")
if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal {
return
}
senderKey, err := consensus.verifySenderKey(msg)
if err != nil {
utils.GetLogger().Debug("onAnnounce verifySenderKey failed", "error", err)
consensus.getLogger().Debug("onAnnounce verifySenderKey failed", "error", err)
return
}
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
utils.GetLogger().Warn("onAnnounce senderKey not match leader PubKey", "senderKey", senderKey.SerializeToHexStr()[:10], "leaderKey", consensus.LeaderPubKey.SerializeToHexStr()[:10])
consensus.getLogger().Warn("onAnnounce senderKey not match leader PubKey", "senderKey", senderKey.SerializeToHexStr(), "leaderKey", consensus.LeaderPubKey.SerializeToHexStr())
return
}
if err = verifyMessageSig(senderKey, msg); err != nil {
utils.GetLogger().Debug("onAnnounce Failed to verify leader signature", "error", err)
consensus.getLogger().Debug("onAnnounce Failed to verify leader signature", "error", err)
return
}
recvMsg, err := ParsePbftMessage(msg)
if err != nil {
utils.GetLogger().Debug("onAnnounce Unparseable leader message", "error", err)
consensus.getLogger().Debug("onAnnounce Unparseable leader message", "error", err)
return
}
block := recvMsg.Payload
@ -136,19 +135,19 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
var blockObj types.Block
err = rlp.DecodeBytes(block, &blockObj)
if err != nil {
utils.GetLogger().Warn("onAnnounce Unparseable block header data", "error", err)
consensus.getLogger().Warn("onAnnounce Unparseable block header data", "error", err)
return
}
if blockObj.NumberU64() != recvMsg.BlockNum || recvMsg.BlockNum < consensus.blockNum {
utils.GetLogger().Warn("blockNum not match", "recvBlockNum", recvMsg.BlockNum, "blockObjNum", blockObj.NumberU64(), "myBlockNum", consensus.blockNum)
consensus.getLogger().Warn("blockNum not match", "msgBlock", recvMsg.BlockNum, "blockNum", blockObj.NumberU64())
return
}
if consensus.mode.Mode() == Normal {
// skip verify header when node is in Syncing mode
if err := consensus.VerifyHeader(consensus.ChainReader, blockObj.Header(), false); err != nil {
utils.GetLogger().Warn("onAnnounce block content is not verified successfully", "error", err, "inChain", consensus.ChainReader.CurrentHeader().Number, "got", blockObj.Header().Number)
consensus.getLogger().Warn("onAnnounce block content is not verified successfully", "error", err, "inChain", consensus.ChainReader.CurrentHeader().Number, "got", blockObj.Header().Number)
return
}
}
@ -164,11 +163,11 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
ctxerror.Log15(utils.GetLogger().Warn, err)
return
}
//blockObj.Logger(utils.GetLogger()).Debug("received announce", "viewID", recvMsg.ViewID, "msgBlockNum", recvMsg.BlockNum)
//blockObj.Logger(consensus.getLogger()).Debug("received announce", "viewID", recvMsg.ViewID, "msgBlockNum", recvMsg.BlockNum)
logMsgs := consensus.pbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID)
if len(logMsgs) > 0 {
if logMsgs[0].BlockHash != blockObj.Header().Hash() {
utils.GetLogger().Debug("onAnnounce leader is malicious", "leaderKey", utils.GetBlsAddress(consensus.LeaderPubKey))
consensus.getLogger().Debug("onAnnounce leader is malicious", "leaderKey", consensus.LeaderPubKey)
consensus.startViewChange(consensus.viewID + 1)
}
return
@ -177,7 +176,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
copy(blockPayload[:], block[:])
consensus.block = blockPayload
consensus.blockHash = recvMsg.BlockHash
utils.GetLogger().Debug("announce block added", "phase", consensus.phase, "myViewID", consensus.viewID, "myBlock", consensus.blockNum, "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.getLogger().Debug("announce block added", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.pbftLog.AddMessage(recvMsg)
consensus.pbftLog.AddBlock(&blockObj)
@ -192,7 +191,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
defer consensus.mutex.Unlock()
if consensus.checkViewID(recvMsg) != nil {
utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
consensus.getLogger().Debug("viewID check failed", "msgViewID", recvMsg.ViewID, "msgBlockNum", recvMsg.BlockNum)
return
}
consensus.tryPrepare(blockObj.Header().Hash())
@ -210,7 +209,7 @@ func (consensus *Consensus) tryPrepare(blockHash common.Hash) {
}
if consensus.blockNum != block.NumberU64() || !consensus.pbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, hash) {
utils.GetLogger().Debug("not match", "myPhase", consensus.phase, "myBlock", consensus.blockNum, "viewID", consensus.viewID)
consensus.getLogger().Debug("blockNum or announce message not match")
return
}
@ -218,12 +217,11 @@ func (consensus *Consensus) tryPrepare(blockHash common.Hash) {
// Construct and send prepare message
msgToSend := consensus.constructPrepareMessage()
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum)
// TODO: this will not return immediatey, may block
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send prepare message")
consensus.getLogger().Warn("cannot send prepare message")
} else {
logger.Info("sent prepare message")
consensus.getLogger().Info("sent prepare message")
}
}
@ -235,28 +233,28 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
senderKey, err := consensus.verifySenderKey(msg)
if err != nil {
utils.GetLogger().Debug("onPrepare verifySenderKey failed", "error", err)
consensus.getLogger().Debug("onPrepare verifySenderKey failed", "error", err)
return
}
if err = verifyMessageSig(senderKey, msg); err != nil {
utils.GetLogger().Debug("onPrepare Failed to verify sender's signature", "error", err)
consensus.getLogger().Debug("onPrepare Failed to verify sender's signature", "error", err)
return
}
recvMsg, err := ParsePbftMessage(msg)
if err != nil {
utils.GetLogger().Debug("[Consensus] onPrepare Unparseable validator message", "error", err)
consensus.getLogger().Debug("[Consensus] onPrepare Unparseable validator message", "error", err)
return
}
if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum {
utils.GetLogger().Debug("onPrepare message not match", "myPhase", consensus.phase, "myViewID", consensus.viewID,
"msgViewID", recvMsg.ViewID, "myBlockNum", consensus.blockNum, "msgBlockNum", recvMsg.BlockNum)
consensus.getLogger().Debug("onPrepare message not match",
"msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
return
}
if !consensus.pbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
utils.GetLogger().Debug("onPrepare no matching announce message", "blockNum", consensus.blockNum, "viewID", consensus.viewID, "blockHash", recvMsg.BlockHash)
consensus.getLogger().Debug("onPrepare no matching announce message", "blockHash", recvMsg.BlockHash)
return
}
@ -278,7 +276,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
// proceed only when the message is not received before
_, ok := prepareSigs[validatorAddress]
if ok {
utils.GetLogger().Debug("Already received prepare message from the validator", "validatorAddress", validatorAddress)
consensus.getLogger().Debug("Already received prepare message from the validator", "validatorAddress", validatorAddress)
return
}
@ -286,19 +284,19 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
var sign bls.Sign
err = sign.Deserialize(prepareSig)
if err != nil {
utils.GetLogger().Error("Failed to deserialize bls signature", "validatorAddress", validatorAddress)
consensus.getLogger().Error("Failed to deserialize bls signature", "validatorAddress", validatorAddress)
return
}
if !sign.VerifyHash(validatorPubKey, consensus.blockHash[:]) {
utils.GetLogger().Error("Received invalid BLS signature", "validatorAddress", validatorAddress)
consensus.getLogger().Error("Received invalid BLS signature", "validatorAddress", validatorAddress)
return
}
utils.GetLogger().Debug("Received new prepare signature", "numReceivedSoFar", len(prepareSigs), "validatorAddress", validatorAddress, "PublicKeys", len(consensus.PublicKeys))
consensus.getLogger().Debug("Received new prepare signature", "numReceivedSoFar", len(prepareSigs), "validatorAddress", validatorAddress, "PublicKeys", len(consensus.PublicKeys))
prepareSigs[validatorAddress] = &sign
// Set the bitmap indicating that this validator signed.
if err := prepareBitmap.SetKey(validatorPubKey, true); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "prepareBitmap.SetKey failed")
ctxerror.Warn(consensus.getLogger(), err, "prepareBitmap.SetKey failed")
}
if len(prepareSigs) >= consensus.Quorum() {
@ -313,16 +311,15 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
_ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg)
if err != nil {
utils.GetLogger().Warn("onPrepare unable to parse pbft message", "error", err)
consensus.getLogger().Warn("onPrepare unable to parse pbft message", "error", err)
return
}
consensus.pbftLog.AddMessage(pbftMsg)
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum)
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send prepared message")
consensus.getLogger().Warn("cannot send prepared message")
} else {
logger.Debug("sent prepared message")
consensus.getLogger().Debug("sent prepared message")
}
// Leader add commit phase signature
@ -335,43 +332,42 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
}
func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
utils.GetLogger().Debug("receive prepared message", "phase", consensus.phase, "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("receive prepared message")
if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal {
return
}
senderKey, err := consensus.verifySenderKey(msg)
if err != nil {
utils.GetLogger().Debug("onPrepared verifySenderKey failed", "error", err)
consensus.getLogger().Debug("onPrepared verifySenderKey failed", "error", err)
return
}
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
utils.GetLogger().Warn("onPrepared senderKey not match leader PubKey")
consensus.getLogger().Warn("onPrepared senderKey not match leader PubKey")
return
}
if err := verifyMessageSig(senderKey, msg); err != nil {
utils.GetLogger().Debug("onPrepared Failed to verify sender's signature", "error", err)
consensus.getLogger().Debug("onPrepared Failed to verify sender's signature", "error", err)
return
}
recvMsg, err := ParsePbftMessage(msg)
if err != nil {
utils.GetLogger().Debug("onPrepared Unparseable validator message", "error", err)
consensus.getLogger().Debug("onPrepared unparseable validator message", "error", err)
return
}
utils.GetLogger().Info("onPrepared received prepared message", "myBlock", consensus.blockNum, "myViewID", consensus.viewID, "recvBlock", recvMsg.BlockNum, "msgViewID", recvMsg.ViewID)
consensus.getLogger().Info("onPrepared received prepared message", "msgBlock", recvMsg.BlockNum, "msgViewID", recvMsg.ViewID)
if recvMsg.BlockNum < consensus.blockNum {
utils.GetLogger().Debug("old block received, ignoring",
"receivedNumber", recvMsg.BlockNum,
"expectedNumber", consensus.blockNum)
consensus.getLogger().Debug("old block received, ignoring",
"msgBlock", recvMsg.BlockNum)
return
}
blockHash := recvMsg.BlockHash
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil {
utils.GetLogger().Error("readSignatureBitmapPayload failed", "error", err)
consensus.getLogger().Error("readSignatureBitmapPayload failed", "error", err)
return
}
@ -380,35 +376,34 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
// check has 2f+1 signatures
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
utils.GetLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
consensus.getLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
return
}
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
myBlockHash := common.Hash{}
myBlockHash.SetBytes(consensus.blockHash[:])
utils.GetLogger().Warn("onPrepared failed to verify multi signature for prepare phase", "blockHash", blockHash, "myBlockHash", myBlockHash)
consensus.getLogger().Warn("onPrepared failed to verify multi signature for prepare phase", "blockHash", blockHash, "myBlockHash", myBlockHash)
return
}
utils.GetLogger().Debug("prepared message added", "phase", consensus.phase, "myViewID", consensus.viewID, "myBlock", consensus.blockNum, "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.getLogger().Debug("prepared message added", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.pbftLog.AddMessage(recvMsg)
if consensus.mode.Mode() == ViewChanging {
utils.GetLogger().Debug("viewchanging mode just exist after viewchanging")
consensus.getLogger().Debug("viewchanging mode just exist after viewchanging")
return
}
consensus.tryCatchup()
if consensus.checkViewID(recvMsg) != nil {
utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
consensus.getLogger().Debug("viewID check failed", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
return
}
if recvMsg.BlockNum > consensus.blockNum {
utils.GetLogger().Debug("future block received, ignoring",
"receivedNumber", recvMsg.BlockNum,
"expectedNumber", consensus.blockNum)
consensus.getLogger().Debug("future block received, ignoring",
"msgBlock", recvMsg.BlockNum)
return
}
@ -420,11 +415,11 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
binary.LittleEndian.PutUint64(blockNumHash, consensus.blockNum)
commitPayload := append(blockNumHash, consensus.blockHash[:]...)
msgToSend := consensus.constructCommitMessage(commitPayload)
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum)
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send commit message")
consensus.getLogger().Warn("cannot send commit message")
} else {
logger.Debug("sent commit message")
consensus.getLogger().Debug("sent commit message")
}
consensus.switchPhase(Commit, true)
@ -440,32 +435,32 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
senderKey, err := consensus.verifySenderKey(msg)
if err != nil {
utils.GetLogger().Debug("onCommit verifySenderKey failed", "error", err)
consensus.getLogger().Debug("onCommit verifySenderKey failed", "error", err)
return
}
if err = verifyMessageSig(senderKey, msg); err != nil {
utils.GetLogger().Debug("onCommit Failed to verify sender's signature", "error", err)
consensus.getLogger().Debug("onCommit Failed to verify sender's signature", "error", err)
return
}
recvMsg, err := ParsePbftMessage(msg)
if err != nil {
utils.GetLogger().Debug("onCommit parse pbft message failed", "error", err)
consensus.getLogger().Debug("onCommit parse pbft message failed", "error", err)
return
}
if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum {
utils.GetLogger().Debug("not match", "myViewID", consensus.viewID, "viewID", recvMsg.ViewID, "myBlock", consensus.blockNum, "block", recvMsg.BlockNum, "myPhase", consensus.phase, "phase", Commit)
consensus.getLogger().Debug("blockNum/viewID not match", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
return
}
if !consensus.pbftLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) {
utils.GetLogger().Debug("cannot find matching blockhash")
consensus.getLogger().Debug("cannot find matching blockhash")
return
}
if !consensus.pbftLog.HasMatchingViewPrepared(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
utils.GetLogger().Debug("cannot find matching prepared message")
consensus.getLogger().Debug("cannot find matching prepared message")
return
}
@ -479,7 +474,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
defer consensus.mutex.Unlock()
if !consensus.IsValidatorInCommittee(validatorAddress) {
utils.GetLogger().Error("Invalid validator", "validatorAddress", validatorAddress)
consensus.getLogger().Error("Invalid validator", "validatorAddress", validatorAddress)
return
}
@ -489,7 +484,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
// proceed only when the message is not received before
_, ok := commitSigs[validatorAddress]
if ok {
utils.GetLogger().Debug("Already received commit message from the validator", "validatorAddress", validatorAddress)
consensus.getLogger().Debug("Already received commit message from the validator", "validatorAddress", validatorAddress)
return
}
@ -502,32 +497,32 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
var sign bls.Sign
err = sign.Deserialize(commitSig)
if err != nil {
utils.GetLogger().Debug("Failed to deserialize bls signature", "validatorAddress", validatorAddress)
consensus.getLogger().Debug("Failed to deserialize bls signature", "validatorAddress", validatorAddress)
return
}
blockNumHash := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumHash, recvMsg.BlockNum)
commitPayload := append(blockNumHash, recvMsg.BlockHash[:]...)
if !sign.VerifyHash(validatorPubKey, commitPayload) {
utils.GetLogger().Error("cannot verify commit message", "viewID", recvMsg.ViewID, "block", recvMsg.BlockNum)
consensus.getLogger().Error("cannot verify commit message", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
return
}
utils.GetLogger().Debug("Received new commit message", "numReceivedSoFar", len(commitSigs), "viewID", recvMsg.ViewID, "block", recvMsg.BlockNum, "validatorAddress", validatorAddress)
consensus.getLogger().Debug("Received new commit message", "numReceivedSoFar", len(commitSigs), "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum, "validatorAddress", validatorAddress)
commitSigs[validatorAddress] = &sign
// Set the bitmap indicating that this validator signed.
if err := commitBitmap.SetKey(validatorPubKey, true); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "commitBitmap.SetKey failed")
ctxerror.Warn(consensus.getLogger(), err, "commitBitmap.SetKey failed")
}
if len(commitSigs) >= consensus.Quorum() {
utils.GetLogger().Info("Enough commits received!", "num", len(commitSigs), "phase", consensus.phase)
consensus.getLogger().Info("Enough commits received!", "num", len(commitSigs))
consensus.finalizeCommits()
}
}
func (consensus *Consensus) finalizeCommits() {
utils.GetLogger().Info("finalizing block", "num", len(consensus.commitSigs), "phase", consensus.phase)
consensus.getLogger().Info("finalizing block", "num", len(consensus.commitSigs))
consensus.switchPhase(Announce, true)
// Construct and broadcast committed message
@ -535,15 +530,15 @@ func (consensus *Consensus) finalizeCommits() {
consensus.aggregatedCommitSig = aggSig
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot send committed message")
ctxerror.Warn(consensus.getLogger(), err, "cannot send committed message")
} else {
utils.GetLogger().Debug("sent committed message", "len", len(msgToSend))
consensus.getLogger().Debug("sent committed message", "len", len(msgToSend))
}
var blockObj types.Block
err := rlp.DecodeBytes(consensus.block, &blockObj)
if err != nil {
utils.GetLogger().Debug("failed to construct the new block after consensus")
consensus.getLogger().Debug("failed to construct the new block after consensus")
}
// Sign the block
@ -557,7 +552,7 @@ func (consensus *Consensus) finalizeCommits() {
select {
case consensus.VerifiedNewBlock <- &blockObj:
default:
utils.GetLogger().Info("[SYNC] Failed to send consensus verified block for state sync", "blockHash", blockObj.Hash())
consensus.getLogger().Info("[SYNC] Failed to send consensus verified block for state sync", "blockHash", blockObj.Hash())
}
consensus.reportMetrics(blockObj)
@ -572,21 +567,21 @@ func (consensus *Consensus) finalizeCommits() {
if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop()
utils.GetLogger().Debug("start consensus timeout; stop bootstrap timeout only once", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("start consensus timeout; stop bootstrap timeout only once")
} else {
utils.GetLogger().Debug("start consensus timeout", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("start consensus timeout")
}
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.OnConsensusDone(&blockObj)
utils.GetLogger().Debug("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!", "viewID", consensus.viewID, "numOfSignatures", len(consensus.commitSigs))
consensus.getLogger().Debug("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!", "numOfSignatures", len(consensus.commitSigs))
// Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- struct{}{}
}
func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
utils.GetLogger().Debug("receive committed message", "phase", consensus.phase, "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("receive committed message")
if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal {
return
@ -594,21 +589,21 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
senderKey, err := consensus.verifySenderKey(msg)
if err != nil {
utils.GetLogger().Warn("onCommitted verifySenderKey failed", "error", err)
consensus.getLogger().Warn("onCommitted verifySenderKey failed", "error", err)
return
}
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
utils.GetLogger().Warn("onCommitted senderKey not match leader PubKey")
consensus.getLogger().Warn("onCommitted senderKey not match leader PubKey")
return
}
if err = verifyMessageSig(senderKey, msg); err != nil {
utils.GetLogger().Warn("onCommitted Failed to verify sender's signature", "error", err)
consensus.getLogger().Warn("onCommitted Failed to verify sender's signature", "error", err)
return
}
recvMsg, err := ParsePbftMessage(msg)
if err != nil {
utils.GetLogger().Warn("onCommitted unable to parse msg", "error", err)
consensus.getLogger().Warn("onCommitted unable to parse msg", "error", err)
return
}
if recvMsg.BlockNum < consensus.blockNum {
@ -617,13 +612,13 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil {
utils.GetLogger().Error("readSignatureBitmapPayload failed", "error", err)
consensus.getLogger().Error("readSignatureBitmapPayload failed", "error", err)
return
}
// check has 2f+1 signatures
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
utils.GetLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
consensus.getLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
return
}
@ -631,16 +626,16 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
binary.LittleEndian.PutUint64(blockNumHash, recvMsg.BlockNum)
commitPayload := append(blockNumHash, recvMsg.BlockHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
utils.GetLogger().Error("Failed to verify the multi signature for commit phase", "blockNum", recvMsg.BlockNum)
consensus.getLogger().Error("Failed to verify the multi signature for commit phase", "msgBlock", recvMsg.BlockNum)
return
}
consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask
utils.GetLogger().Debug("committed message added", "phase", consensus.phase, "myViewID", consensus.viewID, "myBlock", consensus.blockNum, "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.getLogger().Debug("committed message added", "msgViewID", recvMsg.ViewID, "msgBlock", recvMsg.BlockNum)
consensus.pbftLog.AddMessage(recvMsg)
if recvMsg.BlockNum-consensus.blockNum > consensusBlockNumBuffer {
utils.GetLogger().Debug("onCommitted out of sync", "myBlock", consensus.blockNum, "msgBlock", recvMsg.BlockNum)
consensus.getLogger().Debug("onCommitted out of sync", "msgBlock", recvMsg.BlockNum)
go func() {
select {
case consensus.blockNumLowChan <- struct{}{}:
@ -655,7 +650,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
}
// if consensus.checkViewID(recvMsg) != nil {
// utils.GetLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
// consensus.getLogger().Debug("viewID check failed", "viewID", recvMsg.ViewID, "myViewID", consensus.viewID)
// return
// }
@ -665,9 +660,9 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop()
utils.GetLogger().Debug("start consensus timeout; stop bootstrap timeout only once", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("start consensus timeout; stop bootstrap timeout only once")
} else {
utils.GetLogger().Debug("start consensus timeout", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("start consensus timeout")
}
consensus.consensusTimeout[timeoutConsensus].Start()
return
@ -675,7 +670,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
// try to catch up if fall behind
func (consensus *Consensus) tryCatchup() {
utils.GetLogger().Info("tryCatchup: commit new blocks", "blockNum", consensus.blockNum)
consensus.getLogger().Info("tryCatchup: commit new blocks")
// if consensus.phase != Commit && consensus.mode.Mode() == Normal {
// return
// }
@ -686,9 +681,9 @@ func (consensus *Consensus) tryCatchup() {
break
}
if len(msgs) > 1 {
utils.GetLogger().Error("[PBFT] DANGER!!! we should only get one committed message for a given blockNum", "blockNum", consensus.blockNum, "numMsgs", len(msgs))
consensus.getLogger().Error("[PBFT] DANGER!!! we should only get one committed message for a given blockNum", "numMsgs", len(msgs))
}
utils.GetLogger().Info("committed message found", "block", consensus.blockNum)
consensus.getLogger().Info("committed message found")
block := consensus.pbftLog.GetBlockByHash(msgs[0].BlockHash)
if block == nil {
@ -696,17 +691,17 @@ func (consensus *Consensus) tryCatchup() {
}
if block.ParentHash() != consensus.ChainReader.CurrentHeader().Hash() {
utils.GetLogger().Debug("[PBFT] parent block hash not match", "blockNum", consensus.blockNum)
consensus.getLogger().Debug("[PBFT] parent block hash not match")
break
}
utils.GetLogger().Info("block found to commit", "block", consensus.blockNum)
consensus.getLogger().Info("block found to commit")
preparedMsgs := consensus.pbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, msgs[0].BlockNum, msgs[0].BlockHash)
msg := consensus.pbftLog.FindMessageByMaxViewID(preparedMsgs)
if msg == nil {
break
}
utils.GetLogger().Info("prepared message found to commit", "block", consensus.blockNum)
consensus.getLogger().Info("prepared message found to commit")
consensus.blockHash = [32]byte{}
consensus.blockNum = consensus.blockNum + 1
@ -735,14 +730,14 @@ func (consensus *Consensus) tryCatchup() {
block.SetPrepareSig(prepareSig, prepareBitmap)
block.SetCommitSig(aggSig, bitmap)
utils.GetLogger().Info("Adding block to chain", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Info("Adding block to chain")
consensus.OnConsensusDone(block)
consensus.ResetState()
select {
case consensus.VerifiedNewBlock <- block:
default:
utils.GetLogger().Info("[SYNC] consensus verified block send to chan failed", "blockHash", block.Hash())
consensus.getLogger().Info("[SYNC] consensus verified block send to chan failed", "blockHash", block.Hash())
continue
}
@ -767,11 +762,11 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
<-startChannel
}
go func() {
utils.GetLogger().Info("start consensus", "time", time.Now())
consensus.getLogger().Info("start consensus", "time", time.Now())
defer close(stoppedChan)
ticker := time.NewTicker(3 * time.Second)
consensus.consensusTimeout[timeoutBootstrap].Start()
utils.GetLogger().Debug("start bootstrap timeout only once", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("start bootstrap timeout only once", "viewID", consensus.viewID, "block", consensus.blockNum)
for {
select {
case <-ticker.C:
@ -783,11 +778,11 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
continue
}
if k != timeoutViewChange {
utils.GetLogger().Debug("ops", "phase", k, "mode", consensus.mode.Mode(), "viewChangeID", consensus.viewID)
consensus.getLogger().Debug("ops consensus timeout")
consensus.startViewChange(consensus.viewID + 1)
break
} else {
utils.GetLogger().Debug("ops", "phase", k, "mode", consensus.mode.Mode(), "viewChangeID", consensus.mode.ViewID())
consensus.getLogger().Debug("ops view change timeout")
viewID := consensus.mode.ViewID()
consensus.startViewChange(viewID + 1)
break
@ -800,14 +795,14 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
consensus.ignoreViewIDCheck = true
case newBlock := <-blockChannel:
utils.GetLogger().Info("receive newBlock", "blockNum", newBlock.NumberU64())
consensus.getLogger().Info("receive newBlock", "msgBlock", newBlock.NumberU64())
if consensus.ShardID == 0 {
// TODO ek/rj - re-enable this after fixing DRand
//if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation
// // Receive pRnd from DRG protocol
// utils.GetLogger().Debug("[DRG] Waiting for pRnd")
// consensus.getLogger().Debug("[DRG] Waiting for pRnd")
// pRndAndBitmap := <-consensus.PRndChannel
// utils.GetLogger().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap)
// consensus.getLogger().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap)
// pRnd := [32]byte{}
// copy(pRnd[:], pRndAndBitmap[:32])
// bitmap := pRndAndBitmap[32:]
@ -822,15 +817,15 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
if err == nil {
// Verify the randomness
_ = blockHash
utils.GetLogger().Info("Adding randomness into new block", "rnd", rnd)
consensus.getLogger().Info("Adding randomness into new block", "rnd", rnd)
newBlock.AddVdf([258]byte{}) // TODO(HB): add real vdf
} else {
utils.GetLogger().Info("Failed to get randomness", "error", err)
consensus.getLogger().Info("Failed to get randomness", "error", err)
}
}
startTime = time.Now()
utils.GetLogger().Debug("STARTING CONSENSUS", "numTxs", len(newBlock.Transactions()), "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
consensus.getLogger().Debug("STARTING CONSENSUS", "numTxs", len(newBlock.Transactions()), "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
consensus.tryAnnounce(newBlock)
case msg := <-consensus.MsgChan:

@ -97,7 +97,6 @@ func (pm *PbftMode) GetViewID() uint32 {
// switchPhase will switch PbftPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(desirePhase PbftPhase, override bool) {
utils.GetLogInstance().Debug("switchPhase: ", "desirePhase", desirePhase, "myPhase", consensus.phase, "override", override)
if override {
consensus.phase = desirePhase
return
@ -121,7 +120,7 @@ func (consensus *Consensus) switchPhase(desirePhase PbftPhase, override bool) {
func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKey {
idx := consensus.getIndexOfPubKey(consensus.LeaderPubKey)
if idx == -1 {
utils.GetLogInstance().Warn("GetNextLeaderKey: currentLeaderKey not found", "key", consensus.LeaderPubKey.SerializeToHexStr())
consensus.getLogger().Warn("GetNextLeaderKey: currentLeaderKey not found", "key", consensus.LeaderPubKey.SerializeToHexStr())
}
idx = (idx + 1) % len(consensus.PublicKeys)
return consensus.PublicKeys[idx]
@ -173,36 +172,26 @@ func (consensus *Consensus) startViewChange(viewID uint32) {
diff := viewID - consensus.viewID
duration := time.Duration(int64(diff) * int64(viewChangeDuration))
utils.GetLogInstance().Info("startViewChange", "viewID", viewID, "timeoutDuration", duration, "nextLeader", consensus.LeaderPubKey.SerializeToHexStr()[:10])
consensus.getLogger().Info("startViewChange", "viewID", viewID, "timeoutDuration", duration, "nextLeader", consensus.LeaderPubKey.SerializeToHexStr())
msgToSend := consensus.constructViewChangeMessage()
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
consensus.consensusTimeout[timeoutViewChange].Start()
utils.GetLogger().Debug("start view change timeout", "viewID", consensus.viewID, "block", consensus.blockNum, "viewChangingID", consensus.mode.ViewID())
}
// new leader send new view message
func (consensus *Consensus) startNewView() {
utils.GetLogInstance().Info("startNewView", "viewID", consensus.mode.GetViewID())
consensus.mode.SetMode(Normal)
consensus.switchPhase(Announce, false)
msgToSend := consensus.constructNewViewMessage()
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.getLogger().Debug("start view change timeout", "viewChangingID", consensus.mode.ViewID())
}
func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
senderKey, validatorAddress, err := consensus.verifyViewChangeSenderKey(msg)
if err != nil {
utils.GetLogInstance().Debug("onViewChange verifySenderKey failed", "error", err)
consensus.getLogger().Debug("onViewChange verifySenderKey failed", "error", err)
return
}
recvMsg, err := ParseViewChangeMessage(msg)
if err != nil {
utils.GetLogInstance().Warn("onViewChange unable to parse viewchange message")
consensus.getLogger().Warn("onViewChange unable to parse viewchange message")
return
}
newLeaderKey := recvMsg.LeaderPubkey
@ -210,14 +199,14 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
utils.GetLogInstance().Warn("onViewChange received", "viewChangeID", recvMsg.ViewID, "myCurrentID", consensus.viewID, "ValidatorAddress", consensus.SelfAddress)
consensus.getLogger().Warn("onViewChange received", "msgViewID", recvMsg.ViewID)
if consensus.blockNum > recvMsg.BlockNum {
return
}
if consensus.blockNum < recvMsg.BlockNum {
utils.GetLogger().Warn("new leader has lower blocknum", "have", consensus.blockNum, "got", recvMsg.BlockNum)
consensus.getLogger().Warn("new leader has lower blocknum", "msgBlock", recvMsg.BlockNum)
return
}
@ -225,7 +214,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
if err = verifyMessageSig(senderKey, msg); err != nil {
utils.GetLogInstance().Debug("onViewChange Failed to verify sender's signature", "error", err)
consensus.getLogger().Debug("onViewChange Failed to verify sender's signature", "error", err)
return
}
@ -267,12 +256,12 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
if len(recvMsg.Payload) == 0 {
_, ok := consensus.nilSigs[validatorAddress]
if ok {
utils.GetLogInstance().Debug("onViewChange already received m2 message from the validator", "validatorAddress", validatorAddress)
consensus.getLogger().Debug("onViewChange already received m2 message from the validator", "validatorAddress", validatorAddress)
return
}
if !recvMsg.ViewchangeSig.VerifyHash(senderKey, NIL) {
utils.GetLogInstance().Warn("onViewChange failed to verify signature for m2 type viewchange message")
consensus.getLogger().Warn("onViewChange failed to verify signature for m2 type viewchange message")
return
}
consensus.nilSigs[validatorAddress] = recvMsg.ViewchangeSig
@ -280,35 +269,35 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} else { // m1 type message
_, ok := consensus.bhpSigs[validatorAddress]
if ok {
utils.GetLogInstance().Debug("onViewChange already received m1 message from the validator", "validatorAddress", validatorAddress)
consensus.getLogger().Debug("onViewChange already received m1 message from the validator", "validatorAddress", validatorAddress)
return
}
if !recvMsg.ViewchangeSig.VerifyHash(recvMsg.SenderPubkey, recvMsg.Payload) {
utils.GetLogInstance().Warn("onViewChange failed to verify signature for m1 type viewchange message")
consensus.getLogger().Warn("onViewChange failed to verify signature for m1 type viewchange message")
return
}
// first time receive m1 type message, need verify validity of prepared message
if len(consensus.m1Payload) == 0 || !bytes.Equal(consensus.m1Payload, recvMsg.Payload) {
if len(recvMsg.Payload) <= 32 {
utils.GetLogger().Debug("m1 recvMsg payload not enough length", "len", len(recvMsg.Payload))
consensus.getLogger().Debug("m1 recvMsg payload not enough length", "len", len(recvMsg.Payload))
return
}
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
utils.GetLogger().Error("m1 recvMsg payload read error", "error", err)
consensus.getLogger().Error("m1 recvMsg payload read error", "error", err)
return
}
// check has 2f+1 signature in m1 type message
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
utils.GetLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
consensus.getLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
return
}
// Verify the multi-sig for prepare phase
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
utils.GetLogInstance().Warn("onViewChange failed to verify multi signature for m1 prepared payload", "blockHash", blockHash)
consensus.getLogger().Warn("onViewChange failed to verify multi signature for m1 prepared payload", "blockHash", blockHash)
return
}
if len(consensus.m1Payload) == 0 {
@ -322,13 +311,13 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
// check and add viewID (m3 type) message signature
_, ok := consensus.viewIDSigs[validatorAddress]
if ok {
utils.GetLogInstance().Debug("onViewChange already received m3 viewID message from the validator", "validatorAddress", validatorAddress)
consensus.getLogger().Debug("onViewChange already received m3 viewID message from the validator", "validatorAddress", validatorAddress)
return
}
viewIDHash := make([]byte, 4)
binary.LittleEndian.PutUint32(viewIDHash, recvMsg.ViewID)
if !recvMsg.ViewidSig.VerifyHash(recvMsg.SenderPubkey, viewIDHash) {
utils.GetLogInstance().Warn("onViewChange failed to verify viewID signature", "viewID", recvMsg.ViewID)
consensus.getLogger().Warn("onViewChange failed to verify viewID signature", "msgViewID", recvMsg.ViewID)
return
}
consensus.viewIDSigs[validatorAddress] = recvMsg.ViewidSig
@ -347,7 +336,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
copy(consensus.blockHash[:], consensus.m1Payload[:32])
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
utils.GetLogger().Error("readSignatureBitmapPayload fail", "error", err)
consensus.getLogger().Error("readSignatureBitmapPayload fail", "error", err)
return
}
consensus.aggregatedPrepareSig = aggSig
@ -363,43 +352,43 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.mode.SetViewID(recvMsg.ViewID)
msgToSend := consensus.constructNewViewMessage()
utils.GetLogInstance().Warn("onViewChange", "sent newview message", len(msgToSend))
consensus.getLogger().Warn("onViewChange sent newview message")
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.viewID = recvMsg.ViewID
consensus.ResetViewChangeState()
consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.consensusTimeout[timeoutConsensus].Start()
utils.GetLogger().Debug("new leader start consensus timeout and stop view change timeout", "viewID", consensus.viewID, "block", consensus.blockNum, "viewChangingID", consensus.mode.ViewID())
utils.GetLogger().Debug("I am the new leader", "myKey", consensus.PubKey.SerializeToHexStr()[:20], "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("new leader start consensus timeout and stop view change timeout", "viewChangingID", consensus.mode.ViewID())
consensus.getLogger().Debug("I am the new leader", "myKey", consensus.PubKey.SerializeToHexStr(), "viewID", consensus.viewID, "block", consensus.blockNum)
}
utils.GetLogInstance().Debug("onViewChange", "numSigs", len(consensus.viewIDSigs), "needed", consensus.Quorum())
consensus.getLogger().Debug("onViewChange", "numSigs", len(consensus.viewIDSigs), "needed", consensus.Quorum())
}
// TODO: move to consensus_leader.go later
func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
utils.GetLogInstance().Debug("onNewView received new view message")
consensus.getLogger().Debug("onNewView received new view message")
senderKey, _, err := consensus.verifyViewChangeSenderKey(msg)
if err != nil {
utils.GetLogInstance().Warn("onNewView verifySenderKey failed", "error", err)
consensus.getLogger().Warn("onNewView verifySenderKey failed", "error", err)
return
}
recvMsg, err := consensus.ParseNewViewMessage(msg)
if err != nil {
utils.GetLogInstance().Warn("onViewChange unable to parse viewchange message")
consensus.getLogger().Warn("onViewChange unable to parse viewchange message")
return
}
if err = verifyMessageSig(senderKey, msg); err != nil {
utils.GetLogInstance().Error("onNewView failed to verify new leader's signature", "error", err)
consensus.getLogger().Error("onNewView failed to verify new leader's signature", "error", err)
return
}
consensus.vcLock.Lock()
defer consensus.vcLock.Unlock()
if recvMsg.M3AggSig == nil || recvMsg.M3Bitmap == nil {
utils.GetLogInstance().Error("onNewView M3AggSig or M3Bitmap is nil")
consensus.getLogger().Error("onNewView M3AggSig or M3Bitmap is nil")
return
}
m3Sig := recvMsg.M3AggSig
@ -408,12 +397,12 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
binary.LittleEndian.PutUint32(viewIDHash, recvMsg.ViewID)
// check total number of sigs >= 2f+1
if count := utils.CountOneBits(m3Mask.Bitmap); count < consensus.Quorum() {
utils.GetLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
consensus.getLogger().Debug("not have enough signature", "need", consensus.Quorum(), "have", count)
return
}
if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDHash) {
utils.GetLogInstance().Warn("onNewView unable to verify aggregated signature of m3 payload", "m3Sig", m3Sig.SerializeToHexStr()[:10], "m3Mask", m3Mask.Bitmap, "viewID", recvMsg.ViewID)
consensus.getLogger().Warn("onNewView unable to verify aggregated signature of m3 payload", "m3Sig", m3Sig.SerializeToHexStr(), "m3Mask", m3Mask.Bitmap, "msgViewID", recvMsg.ViewID)
return
}
@ -421,20 +410,20 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
if recvMsg.M2AggSig != nil {
m2Sig := recvMsg.M2AggSig
if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) {
utils.GetLogInstance().Warn("onNewView unable to verify aggregated signature of m2 payload")
consensus.getLogger().Warn("onNewView unable to verify aggregated signature of m2 payload")
return
}
}
if m3Mask == nil || m2Mask == nil {
utils.GetLogInstance().Error("onNewView m3Mask or m2Mask is nil")
consensus.getLogger().Error("onNewView m3Mask or m2Mask is nil")
return
}
// check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty
if utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap) {
if len(recvMsg.Payload) <= 32 {
utils.GetLogger().Debug("we should have m1 message payload non-empty and valid")
consensus.getLogger().Debug("we should have m1 message payload non-empty and valid")
return
}
}
@ -444,11 +433,11 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
utils.GetLogger().Error("unable to read signature/bitmap", "error", err)
consensus.getLogger().Error("unable to read signature/bitmap", "error", err)
return
}
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash) {
utils.GetLogInstance().Warn("onNewView failed to verify signature for prepared message")
consensus.getLogger().Warn("onNewView failed to verify signature for prepared message")
return
}
copy(consensus.blockHash[:], blockHash)
@ -473,7 +462,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
// change view and leaderKey to keep in sync with network
if consensus.blockNum != recvMsg.BlockNum {
utils.GetLogger().Debug("new leader changed", "newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr()[:20], "viewID", consensus.viewID, "myBlock", consensus.blockNum, "newViewBlockNum", recvMsg.BlockNum)
consensus.getLogger().Debug("new leader changed", "newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr(), "msgBlock", recvMsg.BlockNum)
return
}
@ -484,15 +473,15 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
commitPayload := append(blockNumHash, consensus.blockHash[:]...)
msgToSend := consensus.constructCommitMessage(commitPayload)
utils.GetLogInstance().Info("onNewView === commit", "sent commit message", len(msgToSend), "viewID", consensus.viewID)
consensus.getLogger().Info("onNewView === commit")
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.switchPhase(Commit, true)
} else {
consensus.ResetState()
utils.GetLogInstance().Info("onNewView === announce")
consensus.getLogger().Info("onNewView === announce")
}
utils.GetLogger().Debug("new leader changed", "newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr()[:20], "viewID", consensus.viewID, "block", consensus.blockNum)
utils.GetLogger().Debug("validator start consensus timeout and stop view change timeout", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.getLogger().Debug("new leader changed", "newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr())
consensus.getLogger().Debug("validator start consensus timeout and stop view change timeout")
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.consensusTimeout[timeoutViewChange].Stop()
}

Loading…
Cancel
Save