Replace all logs in censensus module to use zerolog

pull/1192/head
Kai Lee 5 years ago committed by Minh Doan
parent 9c0b47c73a
commit fc22a66037
  1. 20
      consensus/consensus.go
  2. 6
      consensus/consensus_leader_msg.go
  3. 81
      consensus/consensus_service.go
  4. 412
      consensus/consensus_v2.go
  5. 4
      consensus/consensus_validator_msg.go
  6. 17
      consensus/consensus_viewchange_msg.go
  7. 18
      consensus/pbft_log.go
  8. 176
      consensus/view_change.go

@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/common/denominations"
@ -251,9 +250,9 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
if blsPriKey != nil { if blsPriKey != nil {
consensus.priKey = blsPriKey consensus.priKey = blsPriKey
consensus.PubKey = blsPriKey.GetPublicKey() consensus.PubKey = blsPriKey.GetPublicKey()
utils.GetLogInstance().Info("my pubkey is", "pubkey", consensus.PubKey.SerializeToHexStr()) utils.Logger().Info().Str("publicKey", consensus.PubKey.SerializeToHexStr()).Msg("My Public Key")
} else { } else {
utils.GetLogInstance().Error("the bls key is nil") utils.Logger().Error().Msg("the bls key is nil")
return nil, fmt.Errorf("nil bls key, aborting") return nil, fmt.Errorf("nil bls key, aborting")
} }
@ -281,8 +280,6 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
func accumulateRewards( func accumulateRewards(
bc consensus_engine.ChainReader, state *state.DB, header *types.Header, bc consensus_engine.ChainReader, state *state.DB, header *types.Header,
) error { ) error {
logger := header.Logger(utils.GetLogInstance())
getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) }
blockNum := header.Number.Uint64() blockNum := header.Number.Uint64()
if blockNum == 0 { if blockNum == 0 {
// Epoch block has no parent to reward. // Epoch block has no parent to reward.
@ -354,10 +351,11 @@ func accumulateRewards(
totalAmount = new(big.Int).Add(totalAmount, diff) totalAmount = new(big.Int).Add(totalAmount, diff)
last = cur last = cur
} }
getLogger().Debug("【Block Reward] Successfully paid out block reward", utils.Logger().Debug().
"NumAccounts", numAccounts, Str("NumAccounts", numAccounts.String()).
"TotalAmount", totalAmount, Str("TotalAmount", totalAmount.String()).
"Signers", signers) Strs("Signers", signers).
Msg("[Block Reward] Successfully paid out block reward")
return nil return nil
} }
@ -377,9 +375,7 @@ func (f *GenesisStakeInfoFinder) FindStakeInfoByNodeKey(
) []*structs.StakeInfo { ) []*structs.StakeInfo {
var pk types.BlsPublicKey var pk types.BlsPublicKey
if err := pk.FromLibBLSPublicKey(key); err != nil { if err := pk.FromLibBLSPublicKey(key); err != nil {
ctxerror.Log15(utils.GetLogInstance().Warn, ctxerror.New( utils.Logger().Warn().Err(err).Msg("cannot convert BLS public key")
"cannot convert BLS public key",
).WithCause(err))
return nil return nil
} }
l, _ := f.byNodeKey[pk] l, _ := f.byNodeKey[pk]

@ -25,7 +25,7 @@ func (consensus *Consensus) constructAnnounceMessage() []byte {
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to sign and marshal the Announce message", "error", err) utils.Logger().Error().Err(err).Msg("Failed to sign and marshal the Announce message")
} }
return proto.ConstructConsensusMessage(marshaledMessage) return proto.ConstructConsensusMessage(marshaledMessage)
} }
@ -60,7 +60,7 @@ func (consensus *Consensus) constructPreparedMessage() ([]byte, *bls.Sign) {
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to sign and marshal the Prepared message", "error", err) utils.Logger().Error().Err(err).Msg("Failed to sign and marshal the Prepared message")
} }
return proto.ConstructConsensusMessage(marshaledMessage), aggSig return proto.ConstructConsensusMessage(marshaledMessage), aggSig
} }
@ -93,7 +93,7 @@ func (consensus *Consensus) constructCommittedMessage() ([]byte, *bls.Sign) {
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to sign and marshal the Committed message", "error", err) utils.Logger().Error().Err(err).Msg("Failed to sign and marshal the Committed message")
} }
return proto.ConstructConsensusMessage(marshaledMessage), aggSig return proto.ConstructConsensusMessage(marshaledMessage), aggSig
} }

@ -10,11 +10,11 @@ import (
"github.com/harmony-one/harmony/crypto/hash" "github.com/harmony-one/harmony/crypto/hash"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto" protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/bls/ffi/go/bls"
libp2p_peer "github.com/libp2p/go-libp2p-peer" libp2p_peer "github.com/libp2p/go-libp2p-peer"
"github.com/rs/zerolog"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
@ -73,7 +73,7 @@ func (consensus *Consensus) SealHash(header *types.Header) (hash common.Hash) {
header.Time, header.Time,
header.Extra, header.Extra,
}); err != nil { }); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "rlp.Encode failed") utils.Logger().Warn().Err(err).Msg("rlp.Encode failed")
} }
hasher.Sum(hash[:0]) hasher.Sum(hash[:0])
return hash return hash
@ -109,7 +109,9 @@ func (consensus *Consensus) populateMessageFields(request *msg_pb.ConsensusReque
// sender address // sender address
request.SenderPubkey = consensus.PubKey.Serialize() request.SenderPubkey = consensus.PubKey.Serialize()
consensus.getLogger().Debug("[populateMessageFields]", "SenderKey", consensus.PubKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("senderKey", consensus.PubKey.SerializeToHexStr()).
Msg("[populateMessageFields]")
} }
// Signs the consensus message and returns the marshaled message. // Signs the consensus message and returns the marshaled message.
@ -158,12 +160,11 @@ func (consensus *Consensus) GetViewID() uint64 {
// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus // DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
func (consensus *Consensus) DebugPrintPublicKeys() { func (consensus *Consensus) DebugPrintPublicKeys() {
var keys []string
for _, k := range consensus.PublicKeys { for _, k := range consensus.PublicKeys {
str := fmt.Sprintf("%s", hex.EncodeToString(k.Serialize())) keys = append(keys, hex.EncodeToString(k.Serialize()))
utils.GetLogInstance().Debug("pk:", "string", str)
} }
utils.Logger().Debug().Strs("PublicKeys", keys).Int("count", len(keys)).Msgf("Debug Public Keys")
utils.GetLogInstance().Debug("PublicKeys:", "#", len(consensus.PublicKeys))
} }
// UpdatePublicKeys updates the PublicKeys variable, protected by a mutex // UpdatePublicKeys updates the PublicKeys variable, protected by a mutex
@ -171,16 +172,16 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys []*bls.PublicKey) int {
consensus.pubKeyLock.Lock() consensus.pubKeyLock.Lock()
consensus.PublicKeys = append(pubKeys[:0:0], pubKeys...) consensus.PublicKeys = append(pubKeys[:0:0], pubKeys...)
consensus.CommitteePublicKeys = map[string]bool{} consensus.CommitteePublicKeys = map[string]bool{}
utils.GetLogInstance().Info("My Committee") utils.Logger().Info().Msg("My Committee")
for _, pubKey := range consensus.PublicKeys { for _, pubKey := range consensus.PublicKeys {
utils.GetLogInstance().Info("Member", "BlsPubKey", pubKey.SerializeToHexStr()) utils.Logger().Info().Str("BlsPubKey", pubKey.SerializeToHexStr()).Msg("Member")
consensus.CommitteePublicKeys[pubKey.SerializeToHexStr()] = true consensus.CommitteePublicKeys[pubKey.SerializeToHexStr()] = true
} }
// TODO: use pubkey to identify leader rather than p2p.Peer. // TODO: use pubkey to identify leader rather than p2p.Peer.
consensus.leader = p2p.Peer{ConsensusPubKey: pubKeys[0]} consensus.leader = p2p.Peer{ConsensusPubKey: pubKeys[0]}
consensus.LeaderPubKey = pubKeys[0] consensus.LeaderPubKey = pubKeys[0]
utils.GetLogInstance().Info("My Leader", "info", consensus.LeaderPubKey.SerializeToHexStr()) utils.Logger().Info().Str("info", consensus.LeaderPubKey.SerializeToHexStr()).Msg("My Leader")
consensus.pubKeyLock.Unlock() consensus.pubKeyLock.Unlock()
// reset states after update public keys // reset states after update public keys
consensus.ResetState() consensus.ResetState()
@ -377,7 +378,9 @@ func (consensus *Consensus) GetViewIDSigsArray() []*bls.Sign {
// ResetState resets the state of the consensus // ResetState resets the state of the consensus
func (consensus *Consensus) ResetState() { func (consensus *Consensus) ResetState() {
consensus.getLogger().Debug("[ResetState] Resetting consensus state", "Phase", consensus.phase) consensus.getLogger().Debug().
Str("Phase", consensus.phase.String()).
Msg("[ResetState] Resetting consensus state")
consensus.switchPhase(Announce, true) consensus.switchPhase(Announce, true)
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
consensus.blockHeader = []byte{} consensus.blockHeader = []byte{}
@ -494,8 +497,14 @@ func (consensus *Consensus) checkViewID(msg *PbftMessage) error {
consensus.LeaderPubKey = msg.SenderPubkey consensus.LeaderPubKey = msg.SenderPubkey
consensus.ignoreViewIDCheck = false consensus.ignoreViewIDCheck = false
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
utils.GetLogger().Debug("viewID and leaderKey override", "viewID", consensus.viewID, "leaderKey", consensus.LeaderPubKey.SerializeToHexStr()[:20]) utils.Logger().Debug().
utils.GetLogger().Debug("Start consensus timer", "viewID", consensus.viewID, "block", consensus.blockNum) Uint64("viewID", consensus.viewID).
Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()[:20]).
Msg("viewID and leaderKey override")
utils.Logger().Debug().
Uint64("viewID", consensus.viewID).
Uint64("block", consensus.blockNum).
Msg("Start consensus timer")
return nil return nil
} else if msg.ViewID > consensus.viewID { } else if msg.ViewID > consensus.viewID {
return consensus_engine.ErrViewIDNotMatch return consensus_engine.ErrViewIDNotMatch
@ -543,11 +552,11 @@ func readSignatureBitmapByPublicKeys(recvPayload []byte, publicKeys []*bls.Publi
} }
mask, err := bls_cosi.NewMask(publicKeys, nil) mask, err := bls_cosi.NewMask(publicKeys, nil)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("onNewView unable to setup mask for prepared message", "err", err) utils.Logger().Warn().Err(err).Msg("onNewView unable to setup mask for prepared message")
return nil, nil, errors.New("unable to setup mask from payload") return nil, nil, errors.New("unable to setup mask from payload")
} }
if err := mask.SetMask(bitmap); err != nil { if err := mask.SetMask(bitmap); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "mask.SetMask failed") utils.Logger().Warn().Err(err).Msg("mask.SetMask failed")
} }
return &aggSig, mask, nil return &aggSig, mask, nil
} }
@ -557,13 +566,14 @@ func (consensus *Consensus) reportMetrics(block types.Block) {
timeElapsed := endTime.Sub(startTime) timeElapsed := endTime.Sub(startTime)
numOfTxs := len(block.Transactions()) numOfTxs := len(block.Transactions())
tps := float64(numOfTxs) / timeElapsed.Seconds() tps := float64(numOfTxs) / timeElapsed.Seconds()
utils.GetLogInstance().Info("TPS Report", utils.Logger().Info().
"numOfTXs", numOfTxs, Int("numOfTXs", numOfTxs).
"startTime", startTime, Time("startTime", startTime).
"endTime", endTime, Time("endTime", endTime).
"timeElapsed", timeElapsed, Dur("timeElapsed", endTime.Sub(startTime)).
"TPS", tps, Float64("TPS", tps).
"consensus", consensus) Interface("consensus", consensus).
Msg("TPS Report")
// Post metrics // Post metrics
profiler := profiler.GetProfiler() profiler := profiler.GetProfiler()
@ -588,20 +598,15 @@ func (consensus *Consensus) reportMetrics(block types.Block) {
profiler.LogMetrics(metrics) profiler.LogMetrics(metrics)
} }
// logger returns a sub-logger with consensus contexts added.
func (consensus *Consensus) logger(logger log.Logger) log.Logger {
return logger.New(
"myBlock", consensus.blockNum,
"myViewID", consensus.viewID,
"phase", consensus.phase,
"mode", consensus.mode.Mode(),
)
}
// getLogger returns logger for consensus contexts added // getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() log.Logger { func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := consensus.logger(utils.GetLogInstance()) logger := utils.Logger().With().
return logger Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.viewID).
Interface("phase", consensus.phase).
Str("mode", consensus.mode.Mode().String()).
Logger()
return &logger
} }
// retrieve corresponding blsPublicKey from Coinbase Address // retrieve corresponding blsPublicKey from Coinbase Address
@ -644,10 +649,12 @@ func (consensus *Consensus) updateConsensusInformation() {
consensus.SetViewID(header.ViewID.Uint64() + 1) consensus.SetViewID(header.ViewID.Uint64() + 1)
leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(header) leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(header)
if err != nil || leaderPubKey == nil { if err != nil || leaderPubKey == nil {
consensus.getLogger().Debug("[SYNC] Unable to get leaderPubKey from coinbase", "error", err) consensus.getLogger().Debug().Err(err).Msg("[SYNC] Unable to get leaderPubKey from coinbase")
consensus.ignoreViewIDCheck = true consensus.ignoreViewIDCheck = true
} else { } else {
consensus.getLogger().Debug("[SYNC] Most Recent LeaderPubKey Updated Based on BlockChain", "leaderPubKey", leaderPubKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("leaderPubKey", leaderPubKey.SerializeToHexStr()).
Msg("[SYNC] Most Recent LeaderPubKey Updated Based on BlockChain")
consensus.LeaderPubKey = leaderPubKey consensus.LeaderPubKey = leaderPubKey
} }
} }

@ -28,7 +28,7 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
msg := &msg_pb.Message{} msg := &msg_pb.Message{}
err := protobuf.Unmarshal(payload, msg) err := protobuf.Unmarshal(payload, msg)
if err != nil { if err != nil {
utils.GetLogger().Error("Failed to unmarshal message payload.", "err", err, "consensus", consensus) utils.Logger().Error().Err(err).Interface("consensus", consensus).Msg("Failed to unmarshal message payload.")
return return
} }
@ -40,14 +40,18 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
if msg.Type == msg_pb.MessageType_VIEWCHANGE || msg.Type == msg_pb.MessageType_NEWVIEW { if msg.Type == msg_pb.MessageType_VIEWCHANGE || msg.Type == msg_pb.MessageType_NEWVIEW {
if msg.GetViewchange() != nil && msg.GetViewchange().ShardId != consensus.ShardID { if msg.GetViewchange() != nil && msg.GetViewchange().ShardId != consensus.ShardID {
consensus.getLogger().Warn("Received view change message from different shard", consensus.getLogger().Warn().
"myShardId", consensus.ShardID, "receivedShardId", msg.GetViewchange().ShardId) Uint32("myShardId", consensus.ShardID).
Uint32("receivedShardId", msg.GetViewchange().ShardId).
Msg("Received view change message from different shard")
return return
} }
} else { } else {
if msg.GetConsensus() != nil && msg.GetConsensus().ShardId != consensus.ShardID { if msg.GetConsensus() != nil && msg.GetConsensus().ShardId != consensus.ShardID {
consensus.getLogger().Warn("Received consensus message from different shard", consensus.getLogger().Warn().
"myShardId", consensus.ShardID, "receivedShardId", msg.GetConsensus().ShardId) Uint32("myShardId", consensus.ShardID).
Uint32("receivedShardId", msg.GetConsensus().ShardId).
Msg("Received consensus message from different shard")
return return
} }
} }
@ -68,7 +72,6 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
case msg_pb.MessageType_NEWVIEW: case msg_pb.MessageType_NEWVIEW:
consensus.onNewView(msg) consensus.onNewView(msg)
} }
} }
// TODO: move to consensus_leader.go later // TODO: move to consensus_leader.go later
@ -79,12 +82,12 @@ func (consensus *Consensus) announce(block *types.Block) {
// prepare message and broadcast to validators // prepare message and broadcast to validators
encodedBlock, err := rlp.EncodeToBytes(block) encodedBlock, err := rlp.EncodeToBytes(block)
if err != nil { if err != nil {
consensus.getLogger().Debug("[Announce] Failed encoding block") consensus.getLogger().Debug().Msg("[Announce] Failed encoding block")
return return
} }
encodedBlockHeader, err := rlp.EncodeToBytes(block.Header()) encodedBlockHeader, err := rlp.EncodeToBytes(block.Header())
if err != nil { if err != nil {
consensus.getLogger().Debug("[Announce] Failed encoding block header") consensus.getLogger().Debug().Msg("[Announce] Failed encoding block header")
return return
} }
@ -98,55 +101,73 @@ func (consensus *Consensus) announce(block *types.Block) {
_ = protobuf.Unmarshal(msgPayload, msg) _ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg) pbftMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[Announce] Unable to parse pbft message", "error", err) consensus.getLogger().Warn().Err(err).Msg("[Announce] Unable to parse pbft message")
return return
} }
consensus.PbftLog.AddMessage(pbftMsg) consensus.PbftLog.AddMessage(pbftMsg)
consensus.getLogger().Debug("[Announce] Added Announce message in pbftLog", "MsgblockHash", pbftMsg.BlockHash, "MsgViewID", pbftMsg.ViewID, "MsgBlockNum", pbftMsg.BlockNum) consensus.getLogger().Debug().
Bytes("MsgblockHash", pbftMsg.BlockHash[:]).
Uint64("MsgViewID", pbftMsg.ViewID).
Uint64("MsgBlockNum", pbftMsg.BlockNum).
Msg("[Announce] Added Announce message in pbftLog")
consensus.PbftLog.AddBlock(block) consensus.PbftLog.AddBlock(block)
// Leader sign the block hash itself // Leader sign the block hash itself
consensus.prepareSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(consensus.blockHash[:]) consensus.prepareSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(consensus.blockHash[:])
if err := consensus.prepareBitmap.SetKey(consensus.PubKey, true); err != nil { if err := consensus.prepareBitmap.SetKey(consensus.PubKey, true); err != nil {
consensus.getLogger().Warn("[Announce] Leader prepareBitmap SetKey failed", "error", err) consensus.getLogger().Warn().Err(err).Msg("[Announce] Leader prepareBitmap SetKey failed")
return return
} }
// Construct broadcast p2p message // Construct broadcast p2p message
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[Announce] Cannot send announce message", "groupID", p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))) consensus.getLogger().Warn().
Str("groupID", string(p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))).
Msg("[Announce] Cannot send announce message")
} else { } else {
consensus.getLogger().Info("[Announce] Sent Announce Message!!", "BlockHash", block.Hash(), "BlockNum", block.NumberU64()) consensus.getLogger().Info().
Bytes("BlockHash", block.Hash().Bytes()).
Uint64("BlockNum", block.NumberU64()).
Msg("[Announce] Sent Announce Message!!")
} }
consensus.getLogger().Debug("[Announce] Switching phase", "From", consensus.phase, "To", Prepare) consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", Prepare.String()).
Msg("[Announce] Switching phase")
consensus.switchPhase(Prepare, true) consensus.switchPhase(Prepare, true)
} }
func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
consensus.getLogger().Debug("[OnAnnounce] Receive announce message") consensus.getLogger().Debug().Msg("[OnAnnounce] Receive announce message")
if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal { if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal {
return return
} }
senderKey, err := consensus.verifySenderKey(msg) senderKey, err := consensus.verifySenderKey(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnAnnounce] VerifySenderKey failed", "error", err) consensus.getLogger().Error().Err(err).Msg("[OnAnnounce] VerifySenderKey failed")
return return
} }
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck { if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
consensus.getLogger().Warn("[OnAnnounce] SenderKey not match leader PubKey", "senderKey", senderKey.SerializeToHexStr(), "leaderKey", consensus.LeaderPubKey.SerializeToHexStr()) consensus.getLogger().Warn().
Str("senderKey", senderKey.SerializeToHexStr()).
Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("[OnAnnounce] SenderKey not match leader PubKey")
return return
} }
if err = verifyMessageSig(senderKey, msg); err != nil { if err = verifyMessageSig(senderKey, msg); err != nil {
consensus.getLogger().Debug("[OnAnnounce] Failed to verify leader signature", "error", err) consensus.getLogger().Error().Err(err).Msg("[OnAnnounce] Failed to verify leader signature")
return return
} }
recvMsg, err := ParsePbftMessage(msg) recvMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnAnnounce] Unparseable leader message", "error", err, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Error().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnAnnounce] Unparseable leader message")
return return
} }
@ -155,17 +176,27 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
var headerObj types.Header var headerObj types.Header
err = rlp.DecodeBytes(blockHeader, &headerObj) err = rlp.DecodeBytes(blockHeader, &headerObj)
if err != nil { if err != nil {
consensus.getLogger().Warn("[OnAnnounce] Unparseable block header data", "error", err, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnAnnounce] Unparseable block header data")
return return
} }
if recvMsg.BlockNum < consensus.blockNum || recvMsg.BlockNum != headerObj.Number.Uint64() { if recvMsg.BlockNum < consensus.blockNum || recvMsg.BlockNum != headerObj.Number.Uint64() {
consensus.getLogger().Debug("[OnAnnounce] BlockNum not match", "MsgBlockNum", recvMsg.BlockNum, "BlockNum", headerObj.Number) consensus.getLogger().Debug().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Str("BlockNum", headerObj.Number.String()).
Msg("[OnAnnounce] BlockNum not match")
return return
} }
if consensus.mode.Mode() == Normal { if consensus.mode.Mode() == Normal {
if err = consensus.VerifyHeader(consensus.ChainReader, &headerObj, true); err != nil { if err = consensus.VerifyHeader(consensus.ChainReader, &headerObj, true); err != nil {
consensus.getLogger().Warn("[OnAnnounce] Block content is not verified successfully", "error", err, "inChain", consensus.ChainReader.CurrentHeader().Number, "MsgBlockNum", headerObj.Number) consensus.getLogger().Warn().
Err(err).
Str("inChain", consensus.ChainReader.CurrentHeader().Number.String()).
Str("MsgBlockNum", headerObj.Number.String()).
Msg("[OnAnnounce] Block content is not verified successfully")
return return
} }
} }
@ -173,14 +204,21 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
logMsgs := consensus.PbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID) logMsgs := consensus.PbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID)
if len(logMsgs) > 0 { if len(logMsgs) > 0 {
if logMsgs[0].BlockHash != recvMsg.BlockHash { if logMsgs[0].BlockHash != recvMsg.BlockHash {
consensus.getLogger().Debug("[OnAnnounce] Leader is malicious", "leaderKey", consensus.LeaderPubKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("[OnAnnounce] Leader is malicious")
consensus.startViewChange(consensus.viewID + 1) consensus.startViewChange(consensus.viewID + 1)
} }
consensus.getLogger().Debug("[OnAnnounce] Announce message received again", "leaderKey", consensus.LeaderPubKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("[OnAnnounce] Announce message received again")
//return //return
} }
consensus.getLogger().Debug("[OnAnnounce] Announce message Added", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnAnnounce] Announce message Added")
consensus.PbftLog.AddMessage(recvMsg) consensus.PbftLog.AddMessage(recvMsg)
consensus.mutex.Lock() consensus.mutex.Lock()
@ -190,13 +228,16 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
// we have already added message and block, skip check viewID and send prepare message if is in ViewChanging mode // we have already added message and block, skip check viewID and send prepare message if is in ViewChanging mode
if consensus.mode.Mode() == ViewChanging { if consensus.mode.Mode() == ViewChanging {
consensus.getLogger().Debug("[OnAnnounce] Still in ViewChanging Mode, Exiting !!") consensus.getLogger().Debug().Msg("[OnAnnounce] Still in ViewChanging Mode, Exiting !!")
return return
} }
if consensus.checkViewID(recvMsg) != nil { if consensus.checkViewID(recvMsg) != nil {
if consensus.mode.Mode() == Normal { if consensus.mode.Mode() == Normal {
consensus.getLogger().Debug("[OnAnnounce] ViewID check failed", "MsgViewID", recvMsg.ViewID, "msgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnAnnounce] ViewID check failed")
} }
return return
} }
@ -211,11 +252,16 @@ func (consensus *Consensus) prepare() {
msgToSend := consensus.constructPrepareMessage() msgToSend := consensus.constructPrepareMessage()
// TODO: this will not return immediatey, may block // TODO: this will not return immediatey, may block
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[OnAnnounce] Cannot send prepare message") consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message")
} else { } else {
consensus.getLogger().Info("[OnAnnounce] Sent Prepare Message!!", "BlockHash", hex.EncodeToString(consensus.blockHash[:])) consensus.getLogger().Info().
} Str("BlockHash", hex.EncodeToString(consensus.blockHash[:])).
consensus.getLogger().Debug("[Announce] Switching Phase", "From", consensus.phase, "To", Prepare) Msg("[OnAnnounce] Sent Prepare Message!!")
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", Prepare.String()).
Msg("[Announce] Switching Phase")
consensus.switchPhase(Prepare, true) consensus.switchPhase(Prepare, true)
} }
@ -227,28 +273,33 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
senderKey, err := consensus.verifySenderKey(msg) senderKey, err := consensus.verifySenderKey(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnPrepare] VerifySenderKey failed", "error", err) consensus.getLogger().Error().Err(err).Msg("[OnPrepare] VerifySenderKey failed")
return return
} }
if err = verifyMessageSig(senderKey, msg); err != nil { if err = verifyMessageSig(senderKey, msg); err != nil {
consensus.getLogger().Debug("[OnPrepare] Failed to verify sender's signature", "error", err) consensus.getLogger().Error().Err(err).Msg("[OnPrepare] Failed to verify sender's signature")
return return
} }
recvMsg, err := ParsePbftMessage(msg) recvMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnPrepare] Unparseable validator message", "error", err) consensus.getLogger().Error().Err(err).Msg("[OnPrepare] Unparseable validator message")
return return
} }
if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum { if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum {
consensus.getLogger().Debug("[OnPrepare] Message ViewId or BlockNum not match", consensus.getLogger().Debug().
"MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum) Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnPrepare] Message ViewId or BlockNum not match")
return return
} }
if !consensus.PbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) { if !consensus.PbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
consensus.getLogger().Debug("[OnPrepare] No Matching Announce message", "MsgblockHash", recvMsg.BlockHash, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnPrepare] No Matching Announce message")
//return //return
} }
@ -260,15 +311,16 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
logger := consensus.getLogger().With().Str("validatorPubKey", validatorPubKey).Logger()
if len(prepareSigs) >= consensus.Quorum() { if len(prepareSigs) >= consensus.Quorum() {
// already have enough signatures // already have enough signatures
consensus.getLogger().Debug("[OnPrepare] Received Additional Prepare Message", "ValidatorPubKey", validatorPubKey) logger.Debug().Msg("[OnPrepare] Received Additional Prepare Message")
return return
} }
// proceed only when the message is not received before // proceed only when the message is not received before
_, ok := prepareSigs[validatorPubKey] _, ok := prepareSigs[validatorPubKey]
if ok { if ok {
consensus.getLogger().Debug("[OnPrepare] Already Received prepare message from the validator", "ValidatorPubKey", validatorPubKey) logger.Debug().Msg("[OnPrepare] Already Received prepare message from the validator")
return return
} }
@ -276,24 +328,25 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
var sign bls.Sign var sign bls.Sign
err = sign.Deserialize(prepareSig) err = sign.Deserialize(prepareSig)
if err != nil { if err != nil {
consensus.getLogger().Error("[OnPrepare] Failed to deserialize bls signature", "ValidatorPubKey", validatorPubKey) consensus.getLogger().Error().Err(err).Msg("[OnPrepare] Failed to deserialize bls signature")
return return
} }
if !sign.VerifyHash(recvMsg.SenderPubkey, consensus.blockHash[:]) { if !sign.VerifyHash(recvMsg.SenderPubkey, consensus.blockHash[:]) {
consensus.getLogger().Error("[OnPrepare] Received invalid BLS signature", "ValidatorPubKey", validatorPubKey) consensus.getLogger().Error().Msg("[OnPrepare] Received invalid BLS signature")
return return
} }
consensus.getLogger().Info("[OnPrepare] Received New Prepare Signature", "NumReceivedSoFar", len(prepareSigs), "validatorPubKey", validatorPubKey, "PublicKeys", len(consensus.PublicKeys)) logger = logger.With().Int("NumReceivedSoFar", len(prepareSigs)).Int("PublicKeys", len(consensus.PublicKeys)).Logger()
logger.Info().Msg("[OnPrepare] Received New Prepare Signature")
prepareSigs[validatorPubKey] = &sign prepareSigs[validatorPubKey] = &sign
// Set the bitmap indicating that this validator signed. // Set the bitmap indicating that this validator signed.
if err := prepareBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil { if err := prepareBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil {
consensus.getLogger().Warn("[OnPrepare] prepareBitmap.SetKey failed", "error", err) consensus.getLogger().Warn().Err(err).Msg("[OnPrepare] prepareBitmap.SetKey failed")
return return
} }
if len(prepareSigs) >= consensus.Quorum() { if len(prepareSigs) >= consensus.Quorum() {
consensus.getLogger().Debug("[OnPrepare] Received Enough Prepare Signatures", "NumReceivedSoFar", len(prepareSigs), "PublicKeys", len(consensus.PublicKeys)) logger.Debug().Msg("[OnPrepare] Received Enough Prepare Signatures")
// Construct and broadcast prepared message // Construct and broadcast prepared message
msgToSend, aggSig := consensus.constructPreparedMessage() msgToSend, aggSig := consensus.constructPreparedMessage()
consensus.aggregatedPrepareSig = aggSig consensus.aggregatedPrepareSig = aggSig
@ -304,7 +357,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
_ = protobuf.Unmarshal(msgPayload, msg) _ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg) pbftMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[OnPrepare] Unable to parse pbft message", "error", err) consensus.getLogger().Warn().Err(err).Msg("[OnPrepare] Unable to parse pbft message")
return return
} }
consensus.PbftLog.AddMessage(pbftMsg) consensus.PbftLog.AddMessage(pbftMsg)
@ -315,51 +368,59 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
commitPayload := append(blockNumHash, consensus.blockHash[:]...) commitPayload := append(blockNumHash, consensus.blockHash[:]...)
consensus.commitSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(commitPayload) consensus.commitSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(commitPayload)
if err := consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil { if err := consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil {
consensus.getLogger().Debug("[OnPrepare] Leader commit bitmap set failed") consensus.getLogger().Debug().Msg("[OnPrepare] Leader commit bitmap set failed")
return return
} }
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[OnPrepare] Cannot send prepared message") consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message")
} else { } else {
consensus.getLogger().Debug("[OnPrepare] Sent Prepared Message!!", "BlockHash", consensus.blockHash, "BlockNum", consensus.blockNum) consensus.getLogger().Debug().
} Bytes("BlockHash", consensus.blockHash[:]).
consensus.getLogger().Debug("[OnPrepare] Switching phase", "From", consensus.phase, "To", Commit) Uint64("BlockNum", consensus.blockNum).
Msg("[OnPrepare] Sent Prepared Message!!")
}
consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Msg("[OnPrepare] Switching phase")
consensus.switchPhase(Commit, true) consensus.switchPhase(Commit, true)
} }
return return
} }
func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
consensus.getLogger().Debug("[OnPrepared] Received Prepared message") consensus.getLogger().Debug().Msg("[OnPrepared] Received Prepared message")
if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal { if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal {
return return
} }
senderKey, err := consensus.verifySenderKey(msg) senderKey, err := consensus.verifySenderKey(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnPrepared] VerifySenderKey failed", "error", err) consensus.getLogger().Debug().Err(err).Msg("[OnPrepared] VerifySenderKey failed")
return return
} }
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck { if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
consensus.getLogger().Warn("[OnPrepared] SenderKey not match leader PubKey") consensus.getLogger().Warn().Msg("[OnPrepared] SenderKey not match leader PubKey")
return return
} }
if err := verifyMessageSig(senderKey, msg); err != nil { if err := verifyMessageSig(senderKey, msg); err != nil {
consensus.getLogger().Debug("[OnPrepared] Failed to verify sender's signature", "error", err) consensus.getLogger().Debug().Err(err).Msg("[OnPrepared] Failed to verify sender's signature")
return return
} }
recvMsg, err := ParsePbftMessage(msg) recvMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnPrepared] Unparseable validator message", "error", err) consensus.getLogger().Debug().Err(err).Msg("[OnPrepared] Unparseable validator message")
return return
} }
consensus.getLogger().Info("[OnPrepared] Received prepared message", "MsgBlockNum", recvMsg.BlockNum, "MsgViewID", recvMsg.ViewID) consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnPrepared] Received prepared message")
if recvMsg.BlockNum < consensus.blockNum { if recvMsg.BlockNum < consensus.blockNum {
consensus.getLogger().Debug("Old Block Received, ignoring!!", consensus.getLogger().Debug().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("Old Block Received, ignoring!!")
"MsgBlockNum", recvMsg.BlockNum)
return return
} }
@ -367,17 +428,23 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
blockHash := recvMsg.BlockHash blockHash := recvMsg.BlockHash
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil { if err != nil {
consensus.getLogger().Error("ReadSignatureBitmapPayload failed!!", "error", err) consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!!")
return return
} }
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() { if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
consensus.getLogger().Debug("Not enough signatures in the Prepared msg", "Need", consensus.Quorum(), "Got", count) consensus.getLogger().Debug().
Int("Need", consensus.Quorum()).
Int("Got", count).
Msg("Not enough signatures in the Prepared msg")
return return
} }
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) { if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
myBlockHash := common.Hash{} myBlockHash := common.Hash{}
myBlockHash.SetBytes(consensus.blockHash[:]) myBlockHash.SetBytes(consensus.blockHash[:])
consensus.getLogger().Warn("[OnPrepared] failed to verify multi signature for prepare phase", "MsgBlockHash", recvMsg.BlockHash, "myBlockHash", myBlockHash) consensus.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnPrepared] failed to verify multi signature for prepare phase")
return return
} }
@ -386,26 +453,40 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
var blockObj types.Block var blockObj types.Block
err = rlp.DecodeBytes(block, &blockObj) err = rlp.DecodeBytes(block, &blockObj)
if err != nil { if err != nil {
consensus.getLogger().Warn("[OnPrepared] Unparseable block header data", "error", err, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Warn().
Err(err).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnPrepared] Unparseable block header data")
return return
} }
if blockObj.NumberU64() != recvMsg.BlockNum || recvMsg.BlockNum < consensus.blockNum { if blockObj.NumberU64() != recvMsg.BlockNum || recvMsg.BlockNum < consensus.blockNum {
consensus.getLogger().Warn("[OnPrepared] BlockNum not match", "MsgBlockNum", recvMsg.BlockNum, "blockNum", blockObj.NumberU64()) consensus.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("blockNum", blockObj.NumberU64()).
Msg("[OnPrepared] BlockNum not match")
return return
} }
if blockObj.Header().Hash() != recvMsg.BlockHash { if blockObj.Header().Hash() != recvMsg.BlockHash {
consensus.getLogger().Warn("[OnPrepared] BlockHash not match", "MsgBlockNum", recvMsg.BlockNum, "MsgBlockHash", recvMsg.BlockHash, "blockObjHash", blockObj.Header().Hash()) consensus.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Bytes("MsgBlockHash", recvMsg.BlockHash[:]).
Bytes("blockObjHash", blockObj.Header().Hash().Bytes()).
Msg("[OnPrepared] BlockHash not match")
return return
} }
if consensus.mode.Mode() == Normal { if consensus.mode.Mode() == Normal {
if err := consensus.VerifyHeader(consensus.ChainReader, blockObj.Header(), true); err != nil { if err := consensus.VerifyHeader(consensus.ChainReader, blockObj.Header(), true); err != nil {
consensus.getLogger().Warn("[OnPrepared] Block header is not verified successfully", "error", err, "inChain", consensus.ChainReader.CurrentHeader().Number, "MsgBlockNum", blockObj.Header().Number) consensus.getLogger().Warn().
Err(err).
Str("inChain", consensus.ChainReader.CurrentHeader().Number.String()).
Str("MsgBlockNum", blockObj.Header().Number.String()).
Msg("[OnPrepared] Block header is not verified successfully")
return return
} }
if consensus.BlockVerifier == nil { if consensus.BlockVerifier == nil {
// do nothing // do nothing
} else if err := consensus.BlockVerifier(&blockObj); err != nil { } else if err := consensus.BlockVerifier(&blockObj); err != nil {
consensus.getLogger().Info("[OnPrepared] Block verification failed", "error", err) consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed")
return return
} }
} }
@ -413,26 +494,34 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
consensus.PbftLog.AddBlock(&blockObj) consensus.PbftLog.AddBlock(&blockObj)
recvMsg.Block = []byte{} // save memory space recvMsg.Block = []byte{} // save memory space
consensus.PbftLog.AddMessage(recvMsg) consensus.PbftLog.AddMessage(recvMsg)
consensus.getLogger().Debug("[OnPrepared] Prepared message and block added", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum, "blockHash", recvMsg.BlockHash) consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Bytes("blockHash", recvMsg.BlockHash[:]).
Msg("[OnPrepared] Prepared message and block added")
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
consensus.tryCatchup() consensus.tryCatchup()
if consensus.mode.Mode() == ViewChanging { if consensus.mode.Mode() == ViewChanging {
consensus.getLogger().Debug("[OnPrepared] Still in ViewChanging mode, Exiting !!") consensus.getLogger().Debug().Msg("[OnPrepared] Still in ViewChanging mode, Exiting!!")
return return
} }
if consensus.checkViewID(recvMsg) != nil { if consensus.checkViewID(recvMsg) != nil {
if consensus.mode.Mode() == Normal { if consensus.mode.Mode() == Normal {
consensus.getLogger().Debug("[OnPrepared] ViewID check failed", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnPrepared] ViewID check failed")
} }
return return
} }
if recvMsg.BlockNum > consensus.blockNum { if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Debug("[OnPrepared] Future Block Received, ignoring!!", consensus.getLogger().Debug().
"MsgBlockNum", recvMsg.BlockNum) Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnPrepared] Future Block Received, ignoring!!")
return return
} }
@ -463,12 +552,18 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
} }
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[OnPrepared] Cannot send commit message!!") consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!")
} else { } else {
consensus.getLogger().Info("[OnPrepared] Sent Commit Message!!", "BlockHash", consensus.blockHash, "BlockNum", consensus.blockNum) consensus.getLogger().Info().
Uint64("BlockNum", consensus.blockNum).
Bytes("BlockHash", consensus.blockHash[:]).
Msg("[OnPrepared] Sent Commit Message!!")
} }
consensus.getLogger().Debug("[OnPrepared] Switching phase", "From", consensus.phase, "To", Commit) consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Msg("[OnPrepared] Switching phase")
consensus.switchPhase(Commit, true) consensus.switchPhase(Commit, true)
return return
@ -482,32 +577,41 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
senderKey, err := consensus.verifySenderKey(msg) senderKey, err := consensus.verifySenderKey(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnCommit] VerifySenderKey Failed", "error", err) consensus.getLogger().Debug().Err(err).Msg("[OnCommit] VerifySenderKey Failed")
return return
} }
if err = verifyMessageSig(senderKey, msg); err != nil { if err = verifyMessageSig(senderKey, msg); err != nil {
consensus.getLogger().Debug("[OnCommit] Failed to verify sender's signature", "error", err) consensus.getLogger().Debug().Err(err).Msg("[OnCommit] Failed to verify sender's signature")
return return
} }
recvMsg, err := ParsePbftMessage(msg) recvMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnCommit] Parse pbft message failed", "error", err) consensus.getLogger().Debug().Err(err).Msg("[OnCommit] Parse pbft message failed")
return return
} }
if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum { if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum {
consensus.getLogger().Debug("[OnCommit] BlockNum/viewID not match", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum, "ValidatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()) consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Str("ValidatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()).
Msg("[OnCommit] BlockNum/viewID not match")
return return
} }
if !consensus.PbftLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) { if !consensus.PbftLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) {
consensus.getLogger().Debug("[OnCommit] Cannot find matching blockhash", "MsgBlockHash", recvMsg.BlockHash, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Bytes("MsgBlockHash", recvMsg.BlockHash[:]).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnCommit] Cannot find matching blockhash")
return return
} }
if !consensus.PbftLog.HasMatchingPrepared(consensus.blockNum, recvMsg.BlockHash) { if !consensus.PbftLog.HasMatchingPrepared(consensus.blockNum, recvMsg.BlockHash) {
consensus.getLogger().Debug("[OnCommit] Cannot find matching prepared message", "blockHash", recvMsg.BlockHash) consensus.getLogger().Debug().
Bytes("blockHash", recvMsg.BlockHash[:]).
Msg("[OnCommit] Cannot find matching prepared message")
return return
} }
@ -518,8 +622,9 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
logger := consensus.getLogger().With().Str("validatorPubKey", validatorPubKey).Logger()
if !consensus.IsValidatorInCommittee(recvMsg.SenderPubkey) { if !consensus.IsValidatorInCommittee(recvMsg.SenderPubkey) {
consensus.getLogger().Error("[OnCommit] Invalid validator", "validatorPubKey", validatorPubKey) logger.Error().Msg("[OnCommit] Invalid validator")
return return
} }
@ -529,7 +634,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
// proceed only when the message is not received before // proceed only when the message is not received before
_, ok := commitSigs[validatorPubKey] _, ok := commitSigs[validatorPubKey]
if ok { if ok {
consensus.getLogger().Debug("[OnCommit] Already received commit message from the validator", "validatorPubKey", validatorPubKey) logger.Debug().Msg("[OnCommit] Already received commit message from the validator")
return return
} }
@ -539,22 +644,24 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
var sign bls.Sign var sign bls.Sign
err = sign.Deserialize(commitSig) err = sign.Deserialize(commitSig)
if err != nil { if err != nil {
consensus.getLogger().Debug("[OnCommit] Failed to deserialize bls signature", "validatorPubKey", validatorPubKey) logger.Debug().Msg("[OnCommit] Failed to deserialize bls signature")
return return
} }
blockNumHash := make([]byte, 8) blockNumHash := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumHash, recvMsg.BlockNum) binary.LittleEndian.PutUint64(blockNumHash, recvMsg.BlockNum)
commitPayload := append(blockNumHash, recvMsg.BlockHash[:]...) commitPayload := append(blockNumHash, recvMsg.BlockHash[:]...)
logger = logger.With().Uint64("MsgViewID", recvMsg.ViewID).Uint64("MsgBlockNum", recvMsg.BlockNum).Logger()
if !sign.VerifyHash(recvMsg.SenderPubkey, commitPayload) { if !sign.VerifyHash(recvMsg.SenderPubkey, commitPayload) {
consensus.getLogger().Error("[OnCommit] Cannot verify commit message", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum) logger.Error().Msg("[OnCommit] Cannot verify commit message")
return return
} }
consensus.getLogger().Info("[OnCommit] Received new commit message", "numReceivedSoFar", len(commitSigs), "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum, "validatorPubKey", validatorPubKey) logger = logger.With().Int("numReceivedSoFar", len(commitSigs)).Logger()
logger.Info().Msg("[OnCommit] Received new commit message")
commitSigs[validatorPubKey] = &sign commitSigs[validatorPubKey] = &sign
// Set the bitmap indicating that this validator signed. // Set the bitmap indicating that this validator signed.
if err := commitBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil { if err := commitBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil {
consensus.getLogger().Warn("[OnCommit] commitBitmap.SetKey failed", "error", err) consensus.getLogger().Warn().Err(err).Msg("[OnCommit] commitBitmap.SetKey failed")
return return
} }
@ -562,10 +669,10 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
rewardThresholdIsMet := len(commitSigs) >= consensus.RewardThreshold() rewardThresholdIsMet := len(commitSigs) >= consensus.RewardThreshold()
if !quorumWasMet && quorumIsMet { if !quorumWasMet && quorumIsMet {
consensus.getLogger().Info("[OnCommit] 2/3 Enough commits received", "NumCommits", len(commitSigs)) logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
go func(viewID uint64) { go func(viewID uint64) {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
consensus.getLogger().Debug("[OnCommit] Commit Grace Period Ended", "NumCommits", len(commitSigs)) logger.Debug().Msg("[OnCommit] Commit Grace Period Ended")
consensus.commitFinishChan <- viewID consensus.commitFinishChan <- viewID
}(consensus.viewID) }(consensus.viewID)
} }
@ -573,13 +680,13 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
if rewardThresholdIsMet { if rewardThresholdIsMet {
go func(viewID uint64) { go func(viewID uint64) {
consensus.commitFinishChan <- viewID consensus.commitFinishChan <- viewID
consensus.getLogger().Info("[OnCommit] 90% Enough commits received", "NumCommits", len(commitSigs)) logger.Info().Msg("[OnCommit] 90% Enough commits received")
}(consensus.viewID) }(consensus.viewID)
} }
} }
func (consensus *Consensus) finalizeCommits() { func (consensus *Consensus) finalizeCommits() {
consensus.getLogger().Info("[Finalizing] Finalizing Block", "NumCommits", len(consensus.commitSigs)) consensus.getLogger().Info().Int("NumCommits", len(consensus.commitSigs)).Msg("[Finalizing] Finalizing Block")
beforeCatchupNum := consensus.blockNum beforeCatchupNum := consensus.blockNum
beforeCatchupViewID := consensus.viewID beforeCatchupViewID := consensus.viewID
@ -594,7 +701,7 @@ func (consensus *Consensus) finalizeCommits() {
_ = protobuf.Unmarshal(msgPayload, msg) _ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg) pbftMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[FinalizeCommits] Unable to parse pbft message", "error", err) consensus.getLogger().Warn().Err(err).Msg("[FinalizeCommits] Unable to parse pbft message")
return return
} }
consensus.PbftLog.AddMessage(pbftMsg) consensus.PbftLog.AddMessage(pbftMsg)
@ -602,19 +709,26 @@ func (consensus *Consensus) finalizeCommits() {
// find correct block content // find correct block content
block := consensus.PbftLog.GetBlockByHash(consensus.blockHash) block := consensus.PbftLog.GetBlockByHash(consensus.blockHash)
if block == nil { if block == nil {
consensus.getLogger().Warn("[FinalizeCommits] Cannot find block by hash", "blockHash", hex.EncodeToString(consensus.blockHash[:])) consensus.getLogger().Warn().
Str("blockHash", hex.EncodeToString(consensus.blockHash[:])).
Msg("[FinalizeCommits] Cannot find block by hash")
return return
} }
consensus.tryCatchup() consensus.tryCatchup()
if consensus.blockNum-beforeCatchupNum != 1 { if consensus.blockNum-beforeCatchupNum != 1 {
consensus.getLogger().Warn("[FinalizeCommits] Leader cannot provide the correct block for committed message", "beforeCatchupBlockNum", beforeCatchupNum) consensus.getLogger().Warn().
Uint64("beforeCatchupBlockNum", beforeCatchupNum).
Msg("[FinalizeCommits] Leader cannot provide the correct block for committed message")
return return
} }
// if leader success finalize the block, send committed message to validators // if leader success finalize the block, send committed message to validators
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
consensus.getLogger().Warn("[Finalizing] Cannot send committed message", "error", err) consensus.getLogger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message")
} else { } else {
consensus.getLogger().Info("[Finalizing] Sent Committed Message", "BlockHash", consensus.blockHash, "BlockNum", consensus.blockNum) consensus.getLogger().Info().
Bytes("BlockHash", consensus.blockHash[:]).
Uint64("BlockNum", consensus.blockNum).
Msg("[Finalizing] Sent Committed Message")
} }
consensus.reportMetrics(*block) consensus.reportMetrics(*block)
@ -627,20 +741,25 @@ func (consensus *Consensus) finalizeCommits() {
if consensus.consensusTimeout[timeoutBootstrap].IsActive() { if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.getLogger().Debug("[Finalizing] Start consensus timer; stop bootstrap timer only once") consensus.getLogger().Debug().Msg("[Finalizing] Start consensus timer; stop bootstrap timer only once")
} else { } else {
consensus.getLogger().Debug("[Finalizing] Start consensus timer") consensus.getLogger().Debug().Msg("[Finalizing] Start consensus timer")
} }
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!", "BlockNum", beforeCatchupNum, "ViewId", beforeCatchupViewID, "BlockHash", block.Hash(), "index", consensus.getIndexOfPubKey(consensus.PubKey)) consensus.getLogger().Info().
Uint64("BlockNum", beforeCatchupNum).
Uint64("ViewId", beforeCatchupViewID).
Str("BlockHash", block.Hash().String()).
Int("index", consensus.getIndexOfPubKey(consensus.PubKey)).
Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!")
// Send signal to Node so the new block can be added and new round of consensus can be triggered // Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- struct{}{}
} }
func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.getLogger().Debug("[OnCommitted] Receive committed message") consensus.getLogger().Debug().Msg("[OnCommitted] Receive committed message")
if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal { if consensus.PubKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal {
return return
@ -648,37 +767,42 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
senderKey, err := consensus.verifySenderKey(msg) senderKey, err := consensus.verifySenderKey(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[OnCommitted] verifySenderKey failed", "error", err) consensus.getLogger().Warn().Err(err).Msg("[OnCommitted] verifySenderKey failed")
return return
} }
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck { if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
consensus.getLogger().Warn("[OnCommitted] senderKey not match leader PubKey") consensus.getLogger().Warn().Msg("[OnCommitted] senderKey not match leader PubKey")
return return
} }
if err = verifyMessageSig(senderKey, msg); err != nil { if err = verifyMessageSig(senderKey, msg); err != nil {
consensus.getLogger().Warn("[OnCommitted] Failed to verify sender's signature", "error", err) consensus.getLogger().Warn().Err(err).Msg("[OnCommitted] Failed to verify sender's signature")
return return
} }
recvMsg, err := ParsePbftMessage(msg) recvMsg, err := ParsePbftMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[OnCommitted] unable to parse msg", "error", err) consensus.getLogger().Warn().Msg("[OnCommitted] unable to parse msg")
return return
} }
if recvMsg.BlockNum < consensus.blockNum { if recvMsg.BlockNum < consensus.blockNum {
consensus.getLogger().Info("[OnCommitted] Received Old Blocks!!", "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnCommitted] Received Old Blocks!!")
return return
} }
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil { if err != nil {
consensus.getLogger().Error("[OnCommitted] readSignatureBitmapPayload failed", "error", err) consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed")
return return
} }
// check has 2f+1 signatures // check has 2f+1 signatures
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() { if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
consensus.getLogger().Warn("[OnCommitted] Not enough signature in committed msg", "need", consensus.Quorum(), "got", count) consensus.getLogger().Warn().
Int("need", consensus.Quorum()).
Int("got", count).
Msg("[OnCommitted] Not enough signature in committed msg")
return return
} }
@ -686,12 +810,17 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
binary.LittleEndian.PutUint64(blockNumBytes, recvMsg.BlockNum) binary.LittleEndian.PutUint64(blockNumBytes, recvMsg.BlockNum)
commitPayload := append(blockNumBytes, recvMsg.BlockHash[:]...) commitPayload := append(blockNumBytes, recvMsg.BlockHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
consensus.getLogger().Error("[OnCommitted] Failed to verify the multi signature for commit phase", "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Error().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnCommitted] Failed to verify the multi signature for commit phase")
return return
} }
consensus.PbftLog.AddMessage(recvMsg) consensus.PbftLog.AddMessage(recvMsg)
consensus.getLogger().Debug("[OnCommitted] Committed message added", "MsgViewID", recvMsg.ViewID, "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnCommitted] Committed message added")
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
@ -700,7 +829,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.commitBitmap = mask consensus.commitBitmap = mask
if recvMsg.BlockNum-consensus.blockNum > consensusBlockNumBuffer { if recvMsg.BlockNum-consensus.blockNum > consensusBlockNumBuffer {
consensus.getLogger().Debug("[OnCommitted] out of sync", "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] out of sync")
go func() { go func() {
select { select {
case consensus.blockNumLowChan <- struct{}{}: case consensus.blockNumLowChan <- struct{}{}:
@ -721,15 +850,15 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.tryCatchup() consensus.tryCatchup()
if consensus.mode.Mode() == ViewChanging { if consensus.mode.Mode() == ViewChanging {
consensus.getLogger().Debug("[OnCommitted] Still in ViewChanging mode, Exiting !!") consensus.getLogger().Debug().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!")
return return
} }
if consensus.consensusTimeout[timeoutBootstrap].IsActive() { if consensus.consensusTimeout[timeoutBootstrap].IsActive() {
consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.getLogger().Debug("[OnCommitted] Start consensus timer; stop bootstrap timer only once") consensus.getLogger().Debug().Msg("[OnCommitted] Start consensus timer; stop bootstrap timer only once")
} else { } else {
consensus.getLogger().Debug("[OnCommitted] Start consensus timer") consensus.getLogger().Debug().Msg("[OnCommitted] Start consensus timer")
} }
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
return return
@ -757,7 +886,7 @@ func (consensus *Consensus) LastCommitSig() ([]byte, []byte, error) {
// try to catch up if fall behind // try to catch up if fall behind
func (consensus *Consensus) tryCatchup() { func (consensus *Consensus) tryCatchup() {
consensus.getLogger().Info("[TryCatchup] commit new blocks") consensus.getLogger().Info().Msg("[TryCatchup] commit new blocks")
// if consensus.phase != Commit && consensus.mode.Mode() == Normal { // if consensus.phase != Commit && consensus.mode.Mode() == Normal {
// return // return
// } // }
@ -768,9 +897,11 @@ func (consensus *Consensus) tryCatchup() {
break break
} }
if len(msgs) > 1 { if len(msgs) > 1 {
consensus.getLogger().Error("[TryCatchup] DANGER!!! we should only get one committed message for a given blockNum", "numMsgs", len(msgs)) consensus.getLogger().Error().
Int("numMsgs", len(msgs)).
Msg("[TryCatchup] DANGER!!! we should only get one committed message for a given blockNum")
} }
consensus.getLogger().Info("[TryCatchup] committed message found") consensus.getLogger().Info().Msg("[TryCatchup] committed message found")
block := consensus.PbftLog.GetBlockByHash(msgs[0].BlockHash) block := consensus.PbftLog.GetBlockByHash(msgs[0].BlockHash)
if block == nil { if block == nil {
@ -780,43 +911,48 @@ func (consensus *Consensus) tryCatchup() {
if consensus.BlockVerifier == nil { if consensus.BlockVerifier == nil {
// do nothing // do nothing
} else if err := consensus.BlockVerifier(block); err != nil { } else if err := consensus.BlockVerifier(block); err != nil {
consensus.getLogger().Info("[TryCatchup]block verification faied") consensus.getLogger().Info().Msg("[TryCatchup]block verification faied")
return return
} }
if block.ParentHash() != consensus.ChainReader.CurrentHeader().Hash() { if block.ParentHash() != consensus.ChainReader.CurrentHeader().Hash() {
consensus.getLogger().Debug("[TryCatchup] parent block hash not match") consensus.getLogger().Debug().Msg("[TryCatchup] parent block hash not match")
break break
} }
consensus.getLogger().Info("[TryCatchup] block found to commit") consensus.getLogger().Info().Msg("[TryCatchup] block found to commit")
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, msgs[0].BlockNum, msgs[0].BlockHash) preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, msgs[0].BlockNum, msgs[0].BlockHash)
msg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs) msg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
if msg == nil { if msg == nil {
break break
} }
consensus.getLogger().Info("[TryCatchup] prepared message found to commit") consensus.getLogger().Info().Msg("[TryCatchup] prepared message found to commit")
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
consensus.blockNum = consensus.blockNum + 1 consensus.blockNum = consensus.blockNum + 1
consensus.viewID = msgs[0].ViewID + 1 consensus.viewID = msgs[0].ViewID + 1
consensus.LeaderPubKey = msgs[0].SenderPubkey consensus.LeaderPubKey = msgs[0].SenderPubkey
consensus.getLogger().Info("[TryCatchup] Adding block to chain") consensus.getLogger().Info().Msg("[TryCatchup] Adding block to chain")
consensus.OnConsensusDone(block) consensus.OnConsensusDone(block)
consensus.ResetState() consensus.ResetState()
select { select {
case consensus.VerifiedNewBlock <- block: case consensus.VerifiedNewBlock <- block:
default: default:
consensus.getLogger().Info("[TryCatchup] consensus verified block send to chan failed", "blockHash", block.Hash()) consensus.getLogger().Info().
Str("blockHash", block.Hash().String()).
Msg("[TryCatchup] consensus verified block send to chan failed")
continue continue
} }
break break
} }
if currentBlockNum < consensus.blockNum { if currentBlockNum < consensus.blockNum {
consensus.getLogger().Info("[TryCatchup] Catched up!", "From", currentBlockNum, "To", consensus.blockNum) consensus.getLogger().Info().
Uint64("From", currentBlockNum).
Uint64("To", consensus.blockNum).
Msg("[TryCatchup] Catched up!")
consensus.switchPhase(Announce, true) consensus.switchPhase(Announce, true)
} }
// catup up and skip from view change trap // catup up and skip from view change trap
@ -833,7 +969,7 @@ func (consensus *Consensus) tryCatchup() {
func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}, startChannel chan struct{}) { func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}, startChannel chan struct{}) {
go func() { go func() {
if nodeconfig.GetDefaultConfig().IsLeader() { if nodeconfig.GetDefaultConfig().IsLeader() {
consensus.getLogger().Info("[ConsensusMainLoop] Waiting for consensus start", "time", time.Now()) consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Waiting for consensus start")
<-startChannel <-startChannel
if nodeconfig.GetDefaultConfig().IsLeader() { if nodeconfig.GetDefaultConfig().IsLeader() {
@ -844,11 +980,14 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
}() }()
} }
} }
consensus.getLogger().Info("[ConsensusMainLoop] Consensus started", "time", time.Now()) consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started")
defer close(stoppedChan) defer close(stoppedChan)
ticker := time.NewTicker(3 * time.Second) ticker := time.NewTicker(3 * time.Second)
consensus.consensusTimeout[timeoutBootstrap].Start() consensus.consensusTimeout[timeoutBootstrap].Start()
consensus.getLogger().Debug("[ConsensusMainLoop] Start bootstrap timeout (only once)", "viewID", consensus.viewID, "block", consensus.blockNum) consensus.getLogger().Debug().
Uint64("viewID", consensus.viewID).
Uint64("block", consensus.blockNum).
Msg("[ConsensusMainLoop] Start bootstrap timeout (only once)")
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@ -860,11 +999,11 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
continue continue
} }
if k != timeoutViewChange { if k != timeoutViewChange {
consensus.getLogger().Debug("[ConsensusMainLoop] Ops Consensus Timeout!!!") consensus.getLogger().Debug().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!")
consensus.startViewChange(consensus.viewID + 1) consensus.startViewChange(consensus.viewID + 1)
break break
} else { } else {
consensus.getLogger().Debug("[ConsensusMainLoop] Ops View Change Timeout!!!") consensus.getLogger().Debug().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!")
viewID := consensus.mode.ViewID() viewID := consensus.mode.ViewID()
consensus.startViewChange(viewID + 1) consensus.startViewChange(viewID + 1)
break break
@ -872,15 +1011,17 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
} }
case <-consensus.syncReadyChan: case <-consensus.syncReadyChan:
consensus.updateConsensusInformation() consensus.updateConsensusInformation()
consensus.getLogger().Info("Node is in sync") consensus.getLogger().Info().Msg("Node is in sync")
case <-consensus.syncNotReadyChan: case <-consensus.syncNotReadyChan:
consensus.SetBlockNum(consensus.ChainReader.CurrentHeader().Number.Uint64() + 1) consensus.SetBlockNum(consensus.ChainReader.CurrentHeader().Number.Uint64() + 1)
consensus.mode.SetMode(Syncing) consensus.mode.SetMode(Syncing)
consensus.getLogger().Info("Node is out of sync") consensus.getLogger().Info().Msg("Node is out of sync")
case newBlock := <-blockChannel: case newBlock := <-blockChannel:
consensus.getLogger().Info("[ConsensusMainLoop] Received Proposed New Block!", "MsgBlockNum", newBlock.NumberU64()) consensus.getLogger().Info().
Uint64("MsgBlockNum", newBlock.NumberU64()).
Msg("[ConsensusMainLoop] Received Proposed New Block!")
if consensus.ShardID == 0 { if consensus.ShardID == 0 {
// TODO ek/rj - re-enable this after fixing DRand // TODO ek/rj - re-enable this after fixing DRand
//if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation //if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation
@ -902,7 +1043,9 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
if err == nil { if err == nil {
// Verify the randomness // Verify the randomness
_ = blockHash _ = blockHash
consensus.getLogger().Info("[ConsensusMainLoop] Adding randomness into new block", "rnd", rnd) consensus.getLogger().Info().
Bytes("rnd", rnd[:]).
Msg("[ConsensusMainLoop] Adding randomness into new block")
// newBlock.AddVdf([258]byte{}) // TODO(HB): add real vdf // newBlock.AddVdf([258]byte{}) // TODO(HB): add real vdf
} else { } else {
//consensus.getLogger().Info("Failed to get randomness", "error", err) //consensus.getLogger().Info("Failed to get randomness", "error", err)
@ -910,7 +1053,12 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
} }
startTime = time.Now() startTime = time.Now()
consensus.getLogger().Debug("[ConsensusMainLoop] STARTING CONSENSUS", "numTxs", len(newBlock.Transactions()), "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys)) consensus.getLogger().Debug().
Int("numTxs", len(newBlock.Transactions())).
Interface("consensus", consensus).
Time("startTime", startTime).
Int("publicKeys", len(consensus.PublicKeys)).
Msg("[ConsensusMainLoop] STARTING CONSENSUS")
consensus.announce(newBlock) consensus.announce(newBlock)
case msg := <-consensus.MsgChan: case msg := <-consensus.MsgChan:

@ -27,7 +27,7 @@ func (consensus *Consensus) constructPrepareMessage() []byte {
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to sign and marshal the Prepare message", "error", err) utils.Logger().Error().Err(err).Msg("Failed to sign and marshal the Prepare message")
} }
return proto.ConstructConsensusMessage(marshaledMessage) return proto.ConstructConsensusMessage(marshaledMessage)
} }
@ -53,7 +53,7 @@ func (consensus *Consensus) constructCommitMessage(commitPayload []byte) []byte
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil { if err != nil {
utils.GetLogInstance().Error("Failed to sign and marshal the Commit message", "error", err) utils.Logger().Error().Err(err).Msg("Failed to sign and marshal the Commit message")
} }
return proto.ConstructConsensusMessage(marshaledMessage) return proto.ConstructConsensusMessage(marshaledMessage)
} }

@ -42,13 +42,16 @@ func (consensus *Consensus) constructViewChangeMessage() []byte {
vcMsg.Payload = append(msgToSign[:0:0], msgToSign...) vcMsg.Payload = append(msgToSign[:0:0], msgToSign...)
} }
consensus.getLogger().Debug("[constructViewChangeMessage]", "m1Payload", vcMsg.Payload, "pubKey", consensus.PubKey.SerializeToHexStr()) consensus.getLogger().Debug().
Bytes("m1Payload", vcMsg.Payload).
Str("pubKey", consensus.PubKey.SerializeToHexStr()).
Msg("[constructViewChangeMessage]")
sign := consensus.priKey.SignHash(msgToSign) sign := consensus.priKey.SignHash(msgToSign)
if sign != nil { if sign != nil {
vcMsg.ViewchangeSig = sign.Serialize() vcMsg.ViewchangeSig = sign.Serialize()
} else { } else {
utils.GetLogger().Error("unable to serialize m1/m2 view change message signature") utils.Logger().Error().Msg("unable to serialize m1/m2 view change message signature")
} }
viewIDBytes := make([]byte, 8) viewIDBytes := make([]byte, 8)
@ -57,12 +60,12 @@ func (consensus *Consensus) constructViewChangeMessage() []byte {
if sign1 != nil { if sign1 != nil {
vcMsg.ViewidSig = sign1.Serialize() vcMsg.ViewidSig = sign1.Serialize()
} else { } else {
utils.GetLogger().Error("unable to serialize viewID signature") utils.Logger().Error().Msg("unable to serialize viewID signature")
} }
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil { if err != nil {
utils.GetLogInstance().Error("[constructViewChangeMessage] failed to sign and marshal the viewchange message", "error", err) utils.Logger().Error().Err(err).Msg("[constructViewChangeMessage] failed to sign and marshal the viewchange message")
} }
return proto.ConstructConsensusMessage(marshaledMessage) return proto.ConstructConsensusMessage(marshaledMessage)
} }
@ -86,7 +89,7 @@ func (consensus *Consensus) constructNewViewMessage() []byte {
vcMsg.Payload = consensus.m1Payload vcMsg.Payload = consensus.m1Payload
sig2arr := consensus.GetNilSigsArray() sig2arr := consensus.GetNilSigsArray()
consensus.getLogger().Debug("[constructNewViewMessage] M2 (NIL) type signatures", "len", len(sig2arr)) consensus.getLogger().Debug().Int("len", len(sig2arr)).Msg("[constructNewViewMessage] M2 (NIL) type signatures")
if len(sig2arr) > 0 { if len(sig2arr) > 0 {
m2Sig := bls_cosi.AggregateSig(sig2arr) m2Sig := bls_cosi.AggregateSig(sig2arr)
vcMsg.M2Aggsigs = m2Sig.Serialize() vcMsg.M2Aggsigs = m2Sig.Serialize()
@ -94,7 +97,7 @@ func (consensus *Consensus) constructNewViewMessage() []byte {
} }
sig3arr := consensus.GetViewIDSigsArray() sig3arr := consensus.GetViewIDSigsArray()
consensus.getLogger().Debug("[constructNewViewMessage] M3 (ViewID) type signatures", "len", len(sig3arr)) consensus.getLogger().Debug().Int("len", len(sig3arr)).Msg("[constructNewViewMessage] M3 (ViewID) type signatures")
// even we check here for safty, m3 type signatures must >= 2f+1 // even we check here for safty, m3 type signatures must >= 2f+1
if len(sig3arr) > 0 { if len(sig3arr) > 0 {
m3Sig := bls_cosi.AggregateSig(sig3arr) m3Sig := bls_cosi.AggregateSig(sig3arr)
@ -104,7 +107,7 @@ func (consensus *Consensus) constructNewViewMessage() []byte {
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil { if err != nil {
utils.GetLogInstance().Error("[constructNewViewMessage] failed to sign and marshal the new view message", "error", err) utils.Logger().Error().Err(err).Msg("[constructNewViewMessage] failed to sign and marshal the new view message")
} }
return proto.ConstructConsensusMessage(marshaledMessage) return proto.ConstructConsensusMessage(marshaledMessage)
} }

@ -257,26 +257,26 @@ func ParseViewChangeMessage(msg *msg_pb.Message) (*PbftMessage, error) {
pubKey, err := bls_cosi.BytesToBlsPublicKey(vcMsg.SenderPubkey) pubKey, err := bls_cosi.BytesToBlsPublicKey(vcMsg.SenderPubkey)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to parse senderpubkey", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse senderpubkey")
return nil, err return nil, err
} }
leaderKey, err := bls_cosi.BytesToBlsPublicKey(vcMsg.LeaderPubkey) leaderKey, err := bls_cosi.BytesToBlsPublicKey(vcMsg.LeaderPubkey)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to parse leaderpubkey", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse leaderpubkey")
return nil, err return nil, err
} }
vcSig := bls.Sign{} vcSig := bls.Sign{}
err = vcSig.Deserialize(vcMsg.ViewchangeSig) err = vcSig.Deserialize(vcMsg.ViewchangeSig)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to deserialize the viewchange signature", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the viewchange signature")
return nil, err return nil, err
} }
vcSig1 := bls.Sign{} vcSig1 := bls.Sign{}
err = vcSig1.Deserialize(vcMsg.ViewidSig) err = vcSig1.Deserialize(vcMsg.ViewidSig)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to deserialize the viewid signature", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the viewid signature")
return nil, err return nil, err
} }
pbftMsg.SenderPubkey = pubKey pbftMsg.SenderPubkey = pubKey
@ -303,7 +303,7 @@ func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*PbftMessa
pubKey, err := bls_cosi.BytesToBlsPublicKey(vcMsg.SenderPubkey) pubKey, err := bls_cosi.BytesToBlsPublicKey(vcMsg.SenderPubkey)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to parse senderpubkey", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse senderpubkey")
return nil, err return nil, err
} }
pbftMsg.SenderPubkey = pubKey pbftMsg.SenderPubkey = pubKey
@ -312,12 +312,12 @@ func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*PbftMessa
m3Sig := bls.Sign{} m3Sig := bls.Sign{}
err = m3Sig.Deserialize(vcMsg.M3Aggsigs) err = m3Sig.Deserialize(vcMsg.M3Aggsigs)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to deserialize the multi signature for M3 viewID signature", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M3 viewID signature")
return nil, err return nil, err
} }
m3mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil) m3mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to create mask for multi signature", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err return nil, err
} }
m3mask.SetMask(vcMsg.M3Bitmap) m3mask.SetMask(vcMsg.M3Bitmap)
@ -329,12 +329,12 @@ func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*PbftMessa
m2Sig := bls.Sign{} m2Sig := bls.Sign{}
err = m2Sig.Deserialize(vcMsg.M2Aggsigs) err = m2Sig.Deserialize(vcMsg.M2Aggsigs)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to deserialize the multi signature for M2 aggregated signature", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M2 aggregated signature")
return nil, err return nil, err
} }
m2mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil) m2mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("ParseViewChangeMessage failed to create mask for multi signature", "error", err) utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err return nil, err
} }
m2mask.SetMask(vcMsg.M2Bitmap) m2mask.SetMask(vcMsg.M2Bitmap)

@ -120,7 +120,9 @@ func (consensus *Consensus) switchPhase(desirePhase PbftPhase, override bool) {
func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKey { func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKey {
idx := consensus.getIndexOfPubKey(consensus.LeaderPubKey) idx := consensus.getIndexOfPubKey(consensus.LeaderPubKey)
if idx == -1 { if idx == -1 {
consensus.getLogger().Warn("GetNextLeaderKey: currentLeaderKey not found", "key", consensus.LeaderPubKey.SerializeToHexStr()) consensus.getLogger().Warn().
Str("key", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("GetNextLeaderKey: currentLeaderKey not found")
} }
idx = (idx + 1) % len(consensus.PublicKeys) idx = (idx + 1) % len(consensus.PublicKeys)
return consensus.PublicKeys[idx] return consensus.PublicKeys[idx]
@ -137,7 +139,9 @@ func (consensus *Consensus) getIndexOfPubKey(pubKey *bls.PublicKey) int {
// ResetViewChangeState reset the state for viewchange // ResetViewChangeState reset the state for viewchange
func (consensus *Consensus) ResetViewChangeState() { func (consensus *Consensus) ResetViewChangeState() {
consensus.getLogger().Debug("[ResetViewChangeState] Resetting view change state", "Phase", consensus.phase) consensus.getLogger().Debug().
Str("Phase", consensus.phase.String()).
Msg("[ResetViewChangeState] Resetting view change state")
consensus.mode.SetMode(Normal) consensus.mode.SetMode(Normal)
bhpBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil) bhpBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
nilBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil) nilBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
@ -173,20 +177,26 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
diff := viewID - consensus.viewID diff := viewID - consensus.viewID
duration := time.Duration(int64(diff) * int64(viewChangeDuration)) duration := time.Duration(int64(diff) * int64(viewChangeDuration))
consensus.getLogger().Info("[startViewChange]", "ViewChangingID", viewID, "timeoutDuration", duration, "NextLeader", consensus.LeaderPubKey.SerializeToHexStr()) consensus.getLogger().Info().
Uint64("ViewChangingID", viewID).
Dur("timeoutDuration", duration).
Str("NextLeader", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("[startViewChange]")
msgToSend := consensus.constructViewChangeMessage() msgToSend := consensus.constructViewChangeMessage()
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration) consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
consensus.consensusTimeout[timeoutViewChange].Start() consensus.consensusTimeout[timeoutViewChange].Start()
consensus.getLogger().Debug("[startViewChange] start view change timer", "ViewChangingID", consensus.mode.ViewID()) consensus.getLogger().Debug().
Uint64("ViewChangingID", consensus.mode.ViewID()).
Msg("[startViewChange] start view change timer")
} }
func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
recvMsg, err := ParseViewChangeMessage(msg) recvMsg, err := ParseViewChangeMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[onViewChange] Unable To Parse Viewchange Message") consensus.getLogger().Warn().Msg("[onViewChange] Unable To Parse Viewchange Message")
return return
} }
newLeaderKey := recvMsg.LeaderPubkey newLeaderKey := recvMsg.LeaderPubkey
@ -195,33 +205,44 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} }
if len(consensus.viewIDSigs) >= consensus.Quorum() { if len(consensus.viewIDSigs) >= consensus.Quorum() {
consensus.getLogger().Debug("[onViewChange] Received Enough View Change Messages", "have", len(consensus.viewIDSigs), "need", consensus.Quorum(), "validatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()) consensus.getLogger().Debug().
Int("have", len(consensus.viewIDSigs)).
Int("need", consensus.Quorum()).
Str("validatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()).
Msg("[onViewChange] Received Enough View Change Messages")
return return
} }
senderKey, err := consensus.verifyViewChangeSenderKey(msg) senderKey, err := consensus.verifyViewChangeSenderKey(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug("[onViewChange] VerifySenderKey Failed", "error", err) consensus.getLogger().Debug().Err(err).Msg("[onViewChange] VerifySenderKey Failed")
return return
} }
// TODO: if difference is only one, new leader can still propose the same committed block to avoid another view change // TODO: if difference is only one, new leader can still propose the same committed block to avoid another view change
if consensus.blockNum > recvMsg.BlockNum { if consensus.blockNum > recvMsg.BlockNum {
consensus.getLogger().Debug("[onViewChange] Message BlockNum Is Low", "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onViewChange] Message BlockNum Is Low")
return return
} }
if consensus.blockNum < recvMsg.BlockNum { if consensus.blockNum < recvMsg.BlockNum {
consensus.getLogger().Warn("[onViewChange] New Leader Has Lower Blocknum", "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Warn().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onViewChange] New Leader Has Lower Blocknum")
return return
} }
if consensus.mode.Mode() == ViewChanging && consensus.mode.ViewID() > recvMsg.ViewID { if consensus.mode.Mode() == ViewChanging && consensus.mode.ViewID() > recvMsg.ViewID {
consensus.getLogger().Warn("[onViewChange] ViewChanging ID Is Low", "MyViewChangingID", consensus.mode.ViewID(), "MsgViewChangingID", recvMsg.ViewID) consensus.getLogger().Warn().
Uint64("MyViewChangingID", consensus.mode.ViewID()).
Uint64("MsgViewChangingID", recvMsg.ViewID).
Msg("[onViewChange] ViewChanging ID Is Low")
return return
} }
if err = verifyMessageSig(senderKey, msg); err != nil { if err = verifyMessageSig(senderKey, msg); err != nil {
consensus.getLogger().Debug("[onViewChange] Failed To Verify Sender's Signature", "error", err) consensus.getLogger().Debug().Err(err).Msg("[onViewChange] Failed To Verify Sender's Signature")
return return
} }
@ -236,11 +257,11 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_PREPARED, recvMsg.BlockNum) preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_PREPARED, recvMsg.BlockNum)
preparedMsg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs) preparedMsg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
if preparedMsg == nil { if preparedMsg == nil {
consensus.getLogger().Debug("[onViewChange] add my M2(NIL) type messaage") consensus.getLogger().Debug().Msg("[onViewChange] add my M2(NIL) type messaage")
consensus.nilSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(NIL) consensus.nilSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(NIL)
consensus.nilBitmap.SetKey(consensus.PubKey, true) consensus.nilBitmap.SetKey(consensus.PubKey, true)
} else { } else {
consensus.getLogger().Debug("[onViewChange] add my M1 type messaage") consensus.getLogger().Debug().Msg("[onViewChange] add my M1 type messaage")
msgToSign := append(preparedMsg.BlockHash[:], preparedMsg.Payload...) msgToSign := append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
consensus.bhpSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(msgToSign) consensus.bhpSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(msgToSign)
consensus.bhpBitmap.SetKey(consensus.PubKey, true) consensus.bhpBitmap.SetKey(consensus.PubKey, true)
@ -259,50 +280,63 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
if len(recvMsg.Payload) == 0 { if len(recvMsg.Payload) == 0 {
_, ok := consensus.nilSigs[senderKey.SerializeToHexStr()] _, ok := consensus.nilSigs[senderKey.SerializeToHexStr()]
if ok { if ok {
consensus.getLogger().Debug("[onViewChange] Already Received M2 message from validator", "validatorPubKey", senderKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Already Received M2 message from validator")
return return
} }
if !recvMsg.ViewchangeSig.VerifyHash(senderKey, NIL) { if !recvMsg.ViewchangeSig.VerifyHash(senderKey, NIL) {
consensus.getLogger().Warn("[onViewChange] Failed To Verify Signature For M2 Type Viewchange Message") consensus.getLogger().Warn().Msg("[onViewChange] Failed To Verify Signature For M2 Type Viewchange Message")
return return
} }
consensus.getLogger().Debug("[onViewChange] Add M2 (NIL) type message", "validatorPubKey", senderKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Add M2 (NIL) type message")
consensus.nilSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewchangeSig consensus.nilSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewchangeSig
consensus.nilBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed. consensus.nilBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed.
} else { // m1 type message } else { // m1 type message
_, ok := consensus.bhpSigs[senderKey.SerializeToHexStr()] _, ok := consensus.bhpSigs[senderKey.SerializeToHexStr()]
if ok { if ok {
consensus.getLogger().Debug("[onViewChange] Already Received M1 Message From the Validator", "validatorPubKey", senderKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Already Received M1 Message From the Validator")
return return
} }
if !recvMsg.ViewchangeSig.VerifyHash(recvMsg.SenderPubkey, recvMsg.Payload) { if !recvMsg.ViewchangeSig.VerifyHash(recvMsg.SenderPubkey, recvMsg.Payload) {
consensus.getLogger().Warn("[onViewChange] Failed to Verify Signature for M1 Type Viewchange Message") consensus.getLogger().Warn().Msg("[onViewChange] Failed to Verify Signature for M1 Type Viewchange Message")
return return
} }
// first time receive m1 type message, need verify validity of prepared message // first time receive m1 type message, need verify validity of prepared message
if len(consensus.m1Payload) == 0 || !bytes.Equal(consensus.m1Payload, recvMsg.Payload) { if len(consensus.m1Payload) == 0 || !bytes.Equal(consensus.m1Payload, recvMsg.Payload) {
if len(recvMsg.Payload) <= 32 { if len(recvMsg.Payload) <= 32 {
consensus.getLogger().Debug("[onViewChange] M1 RecvMsg Payload Not Enough Length", "len", len(recvMsg.Payload)) consensus.getLogger().Debug().
Int("len", len(recvMsg.Payload)).
Msg("[onViewChange] M1 RecvMsg Payload Not Enough Length")
return return
} }
blockHash := recvMsg.Payload[:32] blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil { if err != nil {
consensus.getLogger().Error("[onViewChange] M1 RecvMsg Payload Read Error", "error", err) consensus.getLogger().Error().Err(err).Msg("[onViewChange] M1 RecvMsg Payload Read Error")
return return
} }
// check has 2f+1 signature in m1 type message // check has 2f+1 signature in m1 type message
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() { if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
consensus.getLogger().Debug("[onViewChange] M1 Payload Not Have Enough Signature", "need", consensus.Quorum(), "have", count) consensus.getLogger().Debug().
Int("need", consensus.Quorum()).
Int("have", count).
Msg("[onViewChange] M1 Payload Not Have Enough Signature")
return return
} }
// Verify the multi-sig for prepare phase // Verify the multi-sig for prepare phase
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) { if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) {
consensus.getLogger().Warn("[onViewChange] failed to verify multi signature for m1 prepared payload", "blockHash", blockHash) consensus.getLogger().Warn().
Bytes("blockHash", blockHash).
Msg("[onViewChange] failed to verify multi signature for m1 prepared payload")
return return
} }
@ -316,11 +350,13 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32) preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:]) copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = consensus.PubKey preparedMsg.SenderPubkey = consensus.PubKey
consensus.getLogger().Info("[onViewChange] New Leader Prepared Message Added") consensus.getLogger().Info().Msg("[onViewChange] New Leader Prepared Message Added")
consensus.PbftLog.AddMessage(&preparedMsg) consensus.PbftLog.AddMessage(&preparedMsg)
} }
} }
consensus.getLogger().Debug("[onViewChange] Add M1 (prepared) type message", "validatorPubKey", senderKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Add M1 (prepared) type message")
consensus.bhpSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewchangeSig consensus.bhpSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewchangeSig
consensus.bhpBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed. consensus.bhpBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed.
} }
@ -328,19 +364,28 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
// check and add viewID (m3 type) message signature // check and add viewID (m3 type) message signature
_, ok := consensus.viewIDSigs[senderKey.SerializeToHexStr()] _, ok := consensus.viewIDSigs[senderKey.SerializeToHexStr()]
if ok { if ok {
consensus.getLogger().Debug("[onViewChange] Already Received M3(ViewID) message from the validator", "senderKey.SerializeToHexStr()", senderKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Already Received M3(ViewID) message from the validator")
return return
} }
viewIDHash := make([]byte, 8) viewIDHash := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDHash, recvMsg.ViewID) binary.LittleEndian.PutUint64(viewIDHash, recvMsg.ViewID)
if !recvMsg.ViewidSig.VerifyHash(recvMsg.SenderPubkey, viewIDHash) { if !recvMsg.ViewidSig.VerifyHash(recvMsg.SenderPubkey, viewIDHash) {
consensus.getLogger().Warn("[onViewChange] Failed to Verify M3 Message Signature", "MsgViewID", recvMsg.ViewID) consensus.getLogger().Warn().
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onViewChange] Failed to Verify M3 Message Signature")
return return
} }
consensus.getLogger().Debug("[onViewChange] Add M3 (ViewID) type message", "validatorPubKey", senderKey.SerializeToHexStr()) consensus.getLogger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Add M3 (ViewID) type message")
consensus.viewIDSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewidSig consensus.viewIDSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewidSig
consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed. consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed.
consensus.getLogger().Debug("[onViewChange]", "numSigs", len(consensus.viewIDSigs), "needed", consensus.Quorum()) consensus.getLogger().Debug().
Int("numSigs", len(consensus.viewIDSigs)).
Int("needed", consensus.Quorum()).
Msg("[onViewChange]")
// received enough view change messages, change state to normal consensus // received enough view change messages, change state to normal consensus
if len(consensus.viewIDSigs) >= consensus.Quorum() { if len(consensus.viewIDSigs) >= consensus.Quorum() {
@ -352,12 +397,15 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- struct{}{}
}() }()
} else { } else {
consensus.getLogger().Debug("[OnViewChange] Switching phase", "From", consensus.phase, "To", Commit) consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(Commit, true) consensus.switchPhase(Commit, true)
copy(consensus.blockHash[:], consensus.m1Payload[:32]) copy(consensus.blockHash[:], consensus.m1Payload[:32])
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil { if err != nil {
consensus.getLogger().Error("[onViewChange] ReadSignatureBitmapPayload Fail", "error", err) consensus.getLogger().Error().Err(err).Msg("[onViewChange] ReadSignatureBitmapPayload Fail")
return return
} }
consensus.aggregatedPrepareSig = aggSig consensus.aggregatedPrepareSig = aggSig
@ -369,48 +417,57 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
commitPayload := append(blockNumBytes, consensus.blockHash[:]...) commitPayload := append(blockNumBytes, consensus.blockHash[:]...)
consensus.commitSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(commitPayload) consensus.commitSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(commitPayload)
if err = consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil { if err = consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil {
consensus.getLogger().Debug("[OnViewChange] New Leader commit bitmap set failed") consensus.getLogger().Debug().Msg("[OnViewChange] New Leader commit bitmap set failed")
return return
} }
} }
consensus.mode.SetViewID(recvMsg.ViewID) consensus.mode.SetViewID(recvMsg.ViewID)
msgToSend := consensus.constructNewViewMessage() msgToSend := consensus.constructNewViewMessage()
consensus.getLogger().Warn("[onViewChange] Sent NewView Message", "len(M1Payload)", len(consensus.m1Payload), "M1Payload", consensus.m1Payload) consensus.getLogger().Warn().
Int("payloadSize", len(consensus.m1Payload)).
Bytes("M1Payload", consensus.m1Payload).
Msg("[onViewChange] Sent NewView Message")
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.viewID = recvMsg.ViewID consensus.viewID = recvMsg.ViewID
consensus.ResetViewChangeState() consensus.ResetViewChangeState()
consensus.consensusTimeout[timeoutViewChange].Stop() consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Debug("[onViewChange] New Leader Start Consensus Timer and Stop View Change Timer", "viewChangingID", consensus.mode.ViewID()) consensus.getLogger().Debug().
consensus.getLogger().Debug("[onViewChange] I am the New Leader", "myKey", consensus.PubKey.SerializeToHexStr(), "viewID", consensus.viewID, "block", consensus.blockNum) Uint64("viewChangingID", consensus.mode.ViewID()).
Msg("[onViewChange] New Leader Start Consensus Timer and Stop View Change Timer")
consensus.getLogger().Debug().
Str("myKey", consensus.PubKey.SerializeToHexStr()).
Uint64("viewID", consensus.viewID).
Uint64("block", consensus.blockNum).
Msg("[onViewChange] I am the New Leader")
} }
} }
// TODO: move to consensus_leader.go later // TODO: move to consensus_leader.go later
func (consensus *Consensus) onNewView(msg *msg_pb.Message) { func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
consensus.getLogger().Debug("[onNewView] Received NewView Message") consensus.getLogger().Debug().Msg("[onNewView] Received NewView Message")
senderKey, err := consensus.verifyViewChangeSenderKey(msg) senderKey, err := consensus.verifyViewChangeSenderKey(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[onNewView] VerifySenderKey Failed", "error", err) consensus.getLogger().Warn().Err(err).Msg("[onNewView] VerifySenderKey Failed")
return return
} }
recvMsg, err := consensus.ParseNewViewMessage(msg) recvMsg, err := consensus.ParseNewViewMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Warn("[onNewView] Unable to Parse NewView Message", "error", err) consensus.getLogger().Warn().Err(err).Msg("[onNewView] Unable to Parse NewView Message")
return return
} }
if err = verifyMessageSig(senderKey, msg); err != nil { if err = verifyMessageSig(senderKey, msg); err != nil {
consensus.getLogger().Error("[onNewView] Failed to Verify New Leader's Signature", "error", err) consensus.getLogger().Error().Err(err).Msg("[onNewView] Failed to Verify New Leader's Signature")
return return
} }
consensus.vcLock.Lock() consensus.vcLock.Lock()
defer consensus.vcLock.Unlock() defer consensus.vcLock.Unlock()
if recvMsg.M3AggSig == nil || recvMsg.M3Bitmap == nil { if recvMsg.M3AggSig == nil || recvMsg.M3Bitmap == nil {
consensus.getLogger().Error("[onNewView] M3AggSig or M3Bitmap is nil") consensus.getLogger().Error().Msg("[onNewView] M3AggSig or M3Bitmap is nil")
return return
} }
m3Sig := recvMsg.M3AggSig m3Sig := recvMsg.M3AggSig
@ -420,21 +477,28 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID) binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
// check total number of sigs >= 2f+1 // check total number of sigs >= 2f+1
if count := utils.CountOneBits(m3Mask.Bitmap); count < consensus.Quorum() { if count := utils.CountOneBits(m3Mask.Bitmap); count < consensus.Quorum() {
consensus.getLogger().Debug("[onNewView] Not Have Enough M3 (ViewID) Signature", "need", consensus.Quorum(), "have", count) consensus.getLogger().Debug().
Int("need", consensus.Quorum()).
Int("have", count).
Msg("[onNewView] Not Have Enough M3 (ViewID) Signature")
return return
} }
if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDBytes) { if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDBytes) {
consensus.getLogger().Warn("[onNewView] Unable to Verify Aggregated Signature of M3 (ViewID) payload", "m3Sig", m3Sig.SerializeToHexStr(), "m3Mask", m3Mask.Bitmap, "MsgViewID", recvMsg.ViewID) consensus.getLogger().Warn().
Str("m3Sig", m3Sig.SerializeToHexStr()).
Bytes("m3Mask", m3Mask.Bitmap).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[onNewView] Unable to Verify Aggregated Signature of M3 (ViewID) payload")
return return
} }
m2Mask := recvMsg.M2Bitmap m2Mask := recvMsg.M2Bitmap
if recvMsg.M2AggSig != nil { if recvMsg.M2AggSig != nil {
consensus.getLogger().Debug("[onNewView] M2AggSig (NIL) is Not Empty") consensus.getLogger().Debug().Msg("[onNewView] M2AggSig (NIL) is Not Empty")
m2Sig := recvMsg.M2AggSig m2Sig := recvMsg.M2AggSig
if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) { if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) {
consensus.getLogger().Warn("[onNewView] Unable to Verify Aggregated Signature of M2 (NIL) payload") consensus.getLogger().Warn().Msg("[onNewView] Unable to Verify Aggregated Signature of M2 (NIL) payload")
return return
} }
} }
@ -442,18 +506,18 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
// check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty // check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty
if m2Mask == nil || m2Mask.Bitmap == nil || (m2Mask != nil && m2Mask.Bitmap != nil && utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) { if m2Mask == nil || m2Mask.Bitmap == nil || (m2Mask != nil && m2Mask.Bitmap != nil && utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) {
if len(recvMsg.Payload) <= 32 { if len(recvMsg.Payload) <= 32 {
consensus.getLogger().Debug("[onNewView] M1 (prepared) Type Payload Not Have Enough Length") consensus.getLogger().Debug().Msg("[onNewView] M1 (prepared) Type Payload Not Have Enough Length")
return return
} }
// m1 is not empty, check it's valid // m1 is not empty, check it's valid
blockHash := recvMsg.Payload[:32] blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil { if err != nil {
consensus.getLogger().Error("[onNewView] ReadSignatureBitmapPayload Failed", "error", err) consensus.getLogger().Error().Err(err).Msg("[onNewView] ReadSignatureBitmapPayload Failed")
return return
} }
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash) { if !aggSig.VerifyHash(mask.AggregatePublic, blockHash) {
consensus.getLogger().Warn("[onNewView] Failed to Verify Signature for M1 (prepare) message") consensus.getLogger().Warn().Msg("[onNewView] Failed to Verify Signature for M1 (prepare) message")
return return
} }
copy(consensus.blockHash[:], blockHash) copy(consensus.blockHash[:], blockHash)
@ -478,7 +542,10 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
// change view and leaderKey to keep in sync with network // change view and leaderKey to keep in sync with network
if consensus.blockNum != recvMsg.BlockNum { if consensus.blockNum != recvMsg.BlockNum {
consensus.getLogger().Debug("[onNewView] New Leader Changed", "newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr(), "MsgBlockNum", recvMsg.BlockNum) consensus.getLogger().Debug().
Str("newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[onNewView] New Leader Changed")
return return
} }
@ -490,16 +557,21 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
commitPayload := append(blockNumHash, consensus.blockHash[:]...) commitPayload := append(blockNumHash, consensus.blockHash[:]...)
msgToSend := consensus.constructCommitMessage(commitPayload) msgToSend := consensus.constructCommitMessage(commitPayload)
consensus.getLogger().Info("onNewView === commit") consensus.getLogger().Info().Msg("onNewView === commit")
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.getLogger().Debug("[OnViewChange] Switching phase", "From", consensus.phase, "To", Commit) consensus.getLogger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(Commit, true) consensus.switchPhase(Commit, true)
} else { } else {
consensus.ResetState() consensus.ResetState()
consensus.getLogger().Info("onNewView === announce") consensus.getLogger().Info().Msg("onNewView === announce")
} }
consensus.getLogger().Debug("new leader changed", "newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr()) consensus.getLogger().Debug().
consensus.getLogger().Debug("validator start consensus timer and stop view change timer") Str("newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("new leader changed")
consensus.getLogger().Debug().Msg("validator start consensus timer and stop view change timer")
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
consensus.consensusTimeout[timeoutViewChange].Stop() consensus.consensusTimeout[timeoutViewChange].Stop()
} }

Loading…
Cancel
Save