From 59859250f1b65748cd0e4f9dfb05398607dc82ec Mon Sep 17 00:00:00 2001 From: Edgar Aroutiounian Date: Sat, 8 Feb 2020 16:27:07 -0800 Subject: [PATCH] =?UTF-8?q?[consensus]=20Remove=20unnecessary=20usage=20of?= =?UTF-8?q?=20bandwidth=20in=20network=20messages=E2=80=A6=20(#2226)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [consensus] Remove unnecessary usage of bandwidth in network messages, small adjustments after consensus code review * [consensus] Factor out behavior of when quorum prepare reached * [consensus] Push code from another scratch branch * [consensus] Still do need to accept higher than blocknum message --- consensus/checks.go | 40 +++++++++++++++++++ consensus/construct.go | 3 +- consensus/leader.go | 53 +----------------------- consensus/threshold.go | 75 ++++++++++++++++++++++++++++++++++ consensus/validator.go | 91 ++++-------------------------------------- 5 files changed, 125 insertions(+), 137 deletions(-) create mode 100644 consensus/threshold.go diff --git a/consensus/checks.go b/consensus/checks.go index 62514691a..75b646b9a 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -88,6 +88,46 @@ func (consensus *Consensus) onCommitSanityChecks( return true } +func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool { + logMsgs := consensus.FBFTLog.GetMessagesByTypeSeqView( + msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID, + ) + if len(logMsgs) > 0 { + if logMsgs[0].BlockHash != recvMsg.BlockHash && + logMsgs[0].SenderPubkey.IsEqual(recvMsg.SenderPubkey) { + consensus.getLogger().Debug(). + Str("logMsgSenderKey", logMsgs[0].SenderPubkey.SerializeToHexStr()). + Str("logMsgBlockHash", logMsgs[0].BlockHash.Hex()). + Str("recvMsg.SenderPubkey", recvMsg.SenderPubkey.SerializeToHexStr()). + Uint64("recvMsg.BlockNum", recvMsg.BlockNum). + Uint64("recvMsg.ViewID", recvMsg.ViewID). + Str("recvMsgBlockHash", recvMsg.BlockHash.Hex()). + Str("LeaderKey", consensus.LeaderPubKey.SerializeToHexStr()). + Msg("[OnAnnounce] Leader is malicious") + if consensus.current.Mode() == ViewChanging { + viewID := consensus.current.ViewID() + consensus.startViewChange(viewID + 1) + } else { + consensus.startViewChange(consensus.viewID + 1) + } + } + consensus.getLogger().Debug(). + Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()). + Msg("[OnAnnounce] Announce message received again") + } + return consensus.isRightBlockNumCheck(recvMsg) +} + +func (consensus *Consensus) isRightBlockNumCheck(recvMsg *FBFTMessage) bool { + if recvMsg.BlockNum < consensus.blockNum { + consensus.getLogger().Debug(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Msg("Wrong BlockNum Received, ignoring!") + return false + } + return true +} + func (consensus *Consensus) onPreparedSanityChecks( blockObj *types.Block, recvMsg *FBFTMessage, ) bool { diff --git a/consensus/construct.go b/consensus/construct.go index 7c112feb0..866c83a0d 100644 --- a/consensus/construct.go +++ b/consensus/construct.go @@ -57,7 +57,6 @@ func (consensus *Consensus) construct( } case msg_pb.MessageType_COMMIT: if s := consensus.priKey.SignHash(payloadForSignOverride); s != nil { - consensusMsg.Block = consensus.block consensusMsg.Payload = s.Serialize() } case msg_pb.MessageType_COMMITTED: @@ -69,7 +68,7 @@ func (consensus *Consensus) construct( buffer.Write(consensus.commitBitmap.Bitmap) consensusMsg.Payload = buffer.Bytes() case msg_pb.MessageType_ANNOUNCE: - consensusMsg.Payload = consensus.blockHeader + consensusMsg.Payload = consensus.blockHash[:] } marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message) diff --git a/consensus/leader.go b/consensus/leader.go index 23b9912cc..b3e502d14 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -164,60 +164,9 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { } if consensus.Decider.IsQuorumAchieved(quorum.Prepare) { - logger.Debug().Msg("[OnPrepare] Received Enough Prepare Signatures") - // Construct and broadcast prepared message - network, err := consensus.construct(msg_pb.MessageType_PREPARED, nil) - if err != nil { - consensus.getLogger().Err(err). - Str("message-type", msg_pb.MessageType_PREPARED.String()). - Msg("failed constructing message") + if err := consensus.didReachPrepareQuorum(); err != nil { return } - msgToSend, FBFTMsg, aggSig := - network.Bytes, - network.FBFTMsg, - network.OptionalAggregateSignature - - consensus.aggregatedPrepareSig = aggSig - consensus.FBFTLog.AddMessage(FBFTMsg) - // Leader add commit phase signature - blockNumHash := make([]byte, 8) - binary.LittleEndian.PutUint64(blockNumHash, consensus.blockNum) - commitPayload := append(blockNumHash, consensus.blockHash[:]...) - consensus.Decider.SubmitVote( - quorum.Commit, - consensus.PubKey, - consensus.priKey.SignHash(commitPayload), - consensus.block[:], - ) - - if err := consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil { - consensus.getLogger().Debug().Msg("[OnPrepare] Leader commit bitmap set failed") - return - } - - if err := consensus.msgSender.SendWithRetry( - consensus.blockNum, - msg_pb.MessageType_PREPARED, []nodeconfig.GroupID{ - nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), - }, - host.ConstructP2pMessage(byte(17), msgToSend), - ); err != nil { - consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") - } else { - consensus.getLogger().Debug(). - Hex("blockHash", consensus.blockHash[:]). - Uint64("blockNum", consensus.blockNum). - Msg("[OnPrepare] Sent Prepared Message!!") - } - consensus.msgSender.StopRetry(msg_pb.MessageType_ANNOUNCE) - // Stop retry committed msg of last consensus - consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED) - - consensus.getLogger().Debug(). - Str("From", consensus.phase.String()). - Str("To", FBFTCommit.String()). - Msg("[OnPrepare] Switching phase") consensus.switchPhase(FBFTCommit, true) } } diff --git a/consensus/threshold.go b/consensus/threshold.go new file mode 100644 index 000000000..0249d8172 --- /dev/null +++ b/consensus/threshold.go @@ -0,0 +1,75 @@ +package consensus + +import ( + "encoding/binary" + + msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/consensus/quorum" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p/host" +) + +func (consensus *Consensus) didReachPrepareQuorum() error { + logger := utils.Logger() + logger.Debug().Msg("[OnPrepare] Received Enough Prepare Signatures") + // Construct and broadcast prepared message + network, err := consensus.construct(msg_pb.MessageType_PREPARED, nil) + if err != nil { + consensus.getLogger().Err(err). + Str("message-type", msg_pb.MessageType_PREPARED.String()). + Msg("failed constructing message") + return err + } + msgToSend, FBFTMsg, aggSig := + network.Bytes, + network.FBFTMsg, + network.OptionalAggregateSignature + + consensus.aggregatedPrepareSig = aggSig + consensus.FBFTLog.AddMessage(FBFTMsg) + // Leader add commit phase signature + blockNumHash := make([]byte, 8) + binary.LittleEndian.PutUint64(blockNumHash, consensus.blockNum) + commitPayload := append(blockNumHash, consensus.blockHash[:]...) + + // so by this point, everyone has committed to the blockhash of this block + // in prepare and so this is the actual block. + + consensus.Decider.SubmitVote( + quorum.Commit, + consensus.PubKey, + consensus.priKey.SignHash(commitPayload), + consensus.block[:], + ) + + if err := consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil { + consensus.getLogger().Debug().Msg("[OnPrepare] Leader commit bitmap set failed") + return err + } + + if err := consensus.msgSender.SendWithRetry( + consensus.blockNum, + msg_pb.MessageType_PREPARED, []nodeconfig.GroupID{ + nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), + }, + host.ConstructP2pMessage(byte(17), msgToSend), + ); err != nil { + consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") + } else { + consensus.getLogger().Debug(). + Hex("blockHash", consensus.blockHash[:]). + Uint64("blockNum", consensus.blockNum). + Msg("[OnPrepare] Sent Prepared Message!!") + } + consensus.msgSender.StopRetry(msg_pb.MessageType_ANNOUNCE) + // Stop retry committed msg of last consensus + consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED) + + consensus.getLogger().Debug(). + Str("From", consensus.phase.String()). + Str("To", FBFTCommit.String()). + Msg("[OnPrepare] Switching phase") + + return nil +} diff --git a/consensus/validator.go b/consensus/validator.go index 5c5a17007..59a666643 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -9,9 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" msg_pb "github.com/harmony-one/harmony/api/proto/message" - "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/internal/chain" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/p2p/host" ) @@ -26,81 +24,11 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { return } - // verify validity of block header object - // TODO: think about just sending the block hash instead of the header. - encodedHeader := recvMsg.Payload - header := new(block.Header) - if err := rlp.DecodeBytes(encodedHeader, header); err != nil { - consensus.getLogger().Warn(). - Err(err). - Uint64("MsgBlockNum", recvMsg.BlockNum). - Msg("[OnAnnounce] Unparseable block header data") - return - } - - if recvMsg.BlockNum < consensus.blockNum || - recvMsg.BlockNum != header.Number().Uint64() { - consensus.getLogger().Debug(). - Uint64("MsgBlockNum", recvMsg.BlockNum). - Uint64("hdrBlockNum", header.Number().Uint64()). - Uint64("consensuBlockNum", consensus.blockNum). - Msg("[OnAnnounce] BlockNum does not match") + // NOTE let it handle its own logs + if !consensus.onAnnounceSanityChecks(recvMsg) { return } - if err := chain.Engine.VerifyHeader(consensus.ChainReader, header, true); err != nil { - consensus.getLogger().Warn(). - Err(err). - Str("inChain", consensus.ChainReader.CurrentHeader().Number().String()). - Str("MsgBlockNum", header.Number().String()). - Msg("[OnAnnounce] Block content is not verified successfully") - return - } - - //VRF/VDF is only generated in the beach chain - if consensus.NeedsRandomNumberGeneration(header.Epoch()) { - //validate the VRF with proof if a non zero VRF is found in header - if len(header.Vrf()) > 0 { - if !consensus.ValidateVrfAndProof(header) { - return - } - } - - //validate the VDF with proof if a non zero VDF is found in header - if len(header.Vdf()) > 0 { - if !consensus.ValidateVdfAndProof(header) { - return - } - } - } - - logMsgs := consensus.FBFTLog.GetMessagesByTypeSeqView( - msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID, - ) - if len(logMsgs) > 0 { - if logMsgs[0].BlockHash != recvMsg.BlockHash && - logMsgs[0].SenderPubkey.IsEqual(recvMsg.SenderPubkey) { - consensus.getLogger().Debug(). - Str("logMsgSenderKey", logMsgs[0].SenderPubkey.SerializeToHexStr()). - Str("logMsgBlockHash", logMsgs[0].BlockHash.Hex()). - Str("recvMsg.SenderPubkey", recvMsg.SenderPubkey.SerializeToHexStr()). - Uint64("recvMsg.BlockNum", recvMsg.BlockNum). - Uint64("recvMsg.ViewID", recvMsg.ViewID). - Str("recvMsgBlockHash", recvMsg.BlockHash.Hex()). - Str("LeaderKey", consensus.LeaderPubKey.SerializeToHexStr()). - Msg("[OnAnnounce] Leader is malicious") - if consensus.current.Mode() == ViewChanging { - viewID := consensus.current.ViewID() - consensus.startViewChange(viewID + 1) - } else { - consensus.startViewChange(consensus.viewID + 1) - } - } - consensus.getLogger().Debug(). - Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()). - Msg("[OnAnnounce] Announce message received again") - } - consensus.getLogger().Debug(). Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgBlockNum", recvMsg.BlockNum). @@ -130,21 +58,20 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } func (consensus *Consensus) prepare() { - network, err := consensus.construct(msg_pb.MessageType_PREPARE, nil) + networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil) if err != nil { consensus.getLogger().Err(err). Str("message-type", msg_pb.MessageType_PREPARE.String()). Msg("could not construct message") return } - msgToSend := network.Bytes // TODO: this will not return immediatey, may block if err := consensus.msgSender.SendWithoutRetry( []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), }, - host.ConstructP2pMessage(byte(17), msgToSend), + host.ConstructP2pMessage(byte(17), networkMessage.Bytes), ); err != nil { consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") } else { @@ -174,7 +101,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { if recvMsg.BlockNum < consensus.blockNum { consensus.getLogger().Debug().Uint64("MsgBlockNum", recvMsg.BlockNum). - Msg("Old Block Received, ignoring!!") + Msg("Wrong BlockNum Received, ignoring!") return } @@ -182,7 +109,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { blockHash := recvMsg.BlockHash aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) if err != nil { - consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!!") + consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") return } @@ -265,13 +192,11 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { } blockNumBytes := make([]byte, 8) binary.LittleEndian.PutUint64(blockNumBytes, consensus.blockNum) - network, _ := consensus.construct( + networkMessage, _ := consensus.construct( // TODO: should only sign on block hash msg_pb.MessageType_COMMIT, append(blockNumBytes, consensus.blockHash[:]...), ) - msgToSend := network.Bytes - // TODO: genesis account node delay for 1 second, // this is a temp fix for allows FN nodes to earning reward if consensus.delayCommit > 0 { @@ -281,7 +206,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { if err := consensus.msgSender.SendWithoutRetry( []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, - host.ConstructP2pMessage(byte(17), msgToSend), + host.ConstructP2pMessage(byte(17), networkMessage.Bytes), ); err != nil { consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") } else {