Fix usage of private methods.

pull/4369/head
frozen 2 years ago committed by Casey Gardiner
parent 1fa5953deb
commit abf5f5aa8c
  1. 2
      consensus/checks.go
  2. 12
      consensus/consensus_service.go
  3. 16
      consensus/consensus_v2.go
  4. 4
      consensus/construct.go
  5. 7
      consensus/debug.go
  6. 8
      consensus/leader.go
  7. 12
      consensus/view_change.go

@ -147,7 +147,7 @@ func (consensus *Consensus) onViewChangeSanityCheck(recvMsg *FBFTMessage) bool {
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MyViewChangingID", consensus.GetViewChangingID()). Uint64("MyViewChangingID", consensus.getViewChangingID()).
Uint64("MsgViewChangingID", recvMsg.ViewID). Uint64("MsgViewChangingID", recvMsg.ViewID).
Interface("SendPubKeys", recvMsg.SenderPubkeys). Interface("SendPubKeys", recvMsg.SenderPubkeys).
Msg("[onViewChangeSanityCheck]") Msg("[onViewChangeSanityCheck]")

@ -210,6 +210,13 @@ func (consensus *Consensus) SetIsBackup(isBackup bool) {
// Mode returns the mode of consensus // Mode returns the mode of consensus
func (consensus *Consensus) Mode() Mode { func (consensus *Consensus) Mode() Mode {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.mode()
}
// mode returns the mode of consensus
func (consensus *Consensus) mode() Mode {
return consensus.current.Mode() return consensus.current.Mode()
} }
@ -254,6 +261,11 @@ func (consensus *Consensus) SetBlockNum(blockNum uint64) {
atomic.StoreUint64(&consensus.blockNum, blockNum) atomic.StoreUint64(&consensus.blockNum, blockNum)
} }
// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
func (consensus *Consensus) setBlockNum(blockNum uint64) {
atomic.StoreUint64(&consensus.blockNum, blockNum)
}
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading // ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) { func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) {
consensus.mutex.RLock() consensus.mutex.RLock()

@ -367,7 +367,7 @@ func (consensus *Consensus) syncReadyChan() {
func (consensus *Consensus) syncNotReadyChan() { func (consensus *Consensus) syncNotReadyChan() {
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.current.SetMode(Syncing) consensus.current.SetMode(Syncing)
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC")
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc()
@ -452,14 +452,14 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) {
// waitForCommit wait extra 2 seconds for commit phase to finish // waitForCommit wait extra 2 seconds for commit phase to finish
func (consensus *Consensus) waitForCommit() { func (consensus *Consensus) waitForCommit() {
if consensus.Mode() != Normal || consensus.phase.Get() != FBFTCommit { if consensus.mode() != Normal || consensus.phase.Get() != FBFTCommit {
return return
} }
// We only need to wait consensus is in normal commit phase // We only need to wait consensus is in normal commit phase
utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait") utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait")
maxWait := time.Now().Add(2 * consensus.BlockPeriod) maxWait := time.Now().Add(2 * consensus.BlockPeriod)
for time.Now().Before(maxWait) && consensus.GetConsensusPhase() == "Commit" { for time.Now().Before(maxWait) && consensus.getConsensusPhase() == "Commit" {
utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") utils.Logger().Warn().Msg("[shutdown] wait for consensus finished")
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} }
@ -635,7 +635,7 @@ func (consensus *Consensus) tryCatchup() error {
if consensus.BlockVerifier == nil { if consensus.BlockVerifier == nil {
return errors.New("consensus haven't finished initialization") return errors.New("consensus haven't finished initialization")
} }
initBN := consensus.BlockNum() initBN := consensus.getBlockNum()
defer consensus.postCatchup(initBN) defer consensus.postCatchup(initBN)
blks, msgs, err := consensus.getLastMileBlocksAndMsg(initBN) blks, msgs, err := consensus.getLastMileBlocksAndMsg(initBN)
@ -764,15 +764,15 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
} }
func (consensus *Consensus) postCatchup(initBN uint64) { func (consensus *Consensus) postCatchup(initBN uint64) {
if initBN < consensus.BlockNum() { if initBN < consensus.getBlockNum() {
consensus.getLogger().Info(). consensus.getLogger().Info().
Uint64("From", initBN). Uint64("From", initBN).
Uint64("To", consensus.BlockNum()). Uint64("To", consensus.getBlockNum()).
Msg("[TryCatchup] Caught up!") Msg("[TryCatchup] Caught up!")
consensus.switchPhase("TryCatchup", FBFTAnnounce) consensus.switchPhase("TryCatchup", FBFTAnnounce)
} }
// catch up and skip from view change trap // catch up and skip from view change trap
if initBN < consensus.BlockNum() && consensus.isViewChangingMode() { if initBN < consensus.getBlockNum() && consensus.isViewChangingMode() {
consensus.current.SetMode(Normal) consensus.current.SetMode(Normal)
consensus.consensusTimeout[timeoutViewChange].Stop() consensus.consensusTimeout[timeoutViewChange].Stop()
} }
@ -833,7 +833,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN
start := time.Now() start := time.Now()
vdf.Execute() vdf.Execute()
duration := time.Since(start) duration := time.Since(start)
consensus.getLogger().Info(). consensus.GetLogger().Info().
Dur("duration", duration). Dur("duration", duration).
Msg("[ConsensusMainLoop] VDF computation finished") Msg("[ConsensusMainLoop] VDF computation finished")
output := <-outputChannel output := <-outputChannel

@ -29,8 +29,8 @@ type NetworkMessage struct {
func (consensus *Consensus) populateMessageFields( func (consensus *Consensus) populateMessageFields(
request *msg_pb.ConsensusRequest, blockHash []byte, request *msg_pb.ConsensusRequest, blockHash []byte,
) *msg_pb.ConsensusRequest { ) *msg_pb.ConsensusRequest {
request.ViewId = consensus.GetCurBlockViewID() request.ViewId = consensus.getCurBlockViewID()
request.BlockNum = consensus.BlockNum() request.BlockNum = consensus.getBlockNum()
request.ShardId = consensus.ShardID request.ShardId = consensus.ShardID
// 32 byte block hash // 32 byte block hash
request.BlockHash = blockHash request.BlockHash = blockHash

@ -2,6 +2,13 @@ package consensus
// GetConsensusPhase returns the current phase of the consensus. // GetConsensusPhase returns the current phase of the consensus.
func (consensus *Consensus) GetConsensusPhase() string { func (consensus *Consensus) GetConsensusPhase() string {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.getConsensusPhase()
}
// GetConsensusPhase returns the current phase of the consensus.
func (consensus *Consensus) getConsensusPhase() string {
return consensus.phase.String() return consensus.phase.String()
} }

@ -74,7 +74,7 @@ func (consensus *Consensus) announce(block *types.Block) {
} }
// Construct broadcast p2p message // Construct broadcast p2p message
if err := consensus.msgSender.SendWithRetry( if err := consensus.msgSender.SendWithRetry(
consensus.BlockNum(), msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{ consensus.getBlockNum(), msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, p2p.ConstructMessage(msgToSend)); err != nil { }, p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
@ -95,7 +95,7 @@ func (consensus *Consensus) announce(block *types.Block) {
func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
// TODO(audit): make FBFT lookup using map instead of looping through all items. // TODO(audit): make FBFT lookup using map instead of looping through all items.
if !consensus.FBFTLog.HasMatchingViewAnnounce( if !consensus.FBFTLog.HasMatchingViewAnnounce(
consensus.BlockNum(), consensus.GetCurBlockViewID(), recvMsg.BlockHash, consensus.getBlockNum(), consensus.getCurBlockViewID(), recvMsg.BlockHash,
) { ) {
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgViewID", recvMsg.ViewID).
@ -280,7 +280,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
//// Write - End //// Write - End
//// Read - Start //// Read - Start
viewID := consensus.GetCurBlockViewID() viewID := consensus.getCurBlockViewID()
if consensus.Decider.IsAllSigsCollected() { if consensus.Decider.IsAllSigsCollected() {
logger.Info().Msg("[OnCommit] 100% Enough commits received") logger.Info().Msg("[OnCommit] 100% Enough commits received")
@ -315,7 +315,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
if viewID == consensus.GetCurBlockViewID() { if viewID == consensus.getCurBlockViewID() {
consensus.finalCommit() consensus.finalCommit()
} }
}(viewID) }(viewID)

@ -88,14 +88,14 @@ func (pm *State) SetIsBackup(isBackup bool) {
// fallbackNextViewID return the next view ID and duration when there is an exception // fallbackNextViewID return the next view ID and duration when there is an exception
// to calculate the time-based viewId // to calculate the time-based viewId
func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
diff := int64(consensus.GetViewChangingID() + 1 - consensus.GetCurBlockViewID()) diff := int64(consensus.getViewChangingID() + 1 - consensus.getCurBlockViewID())
if diff <= 0 { if diff <= 0 {
diff = int64(1) diff = int64(1)
} }
consensus.getLogger().Error(). consensus.getLogger().Error().
Int64("diff", diff). Int64("diff", diff).
Msg("[fallbackNextViewID] use legacy viewID algorithm") Msg("[fallbackNextViewID] use legacy viewID algorithm")
return consensus.GetViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration)) return consensus.getViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration))
} }
// getNextViewID return the next view ID based on the timestamp // getNextViewID return the next view ID based on the timestamp
@ -152,7 +152,7 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper { func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper {
gap := 1 gap := 1
cur := consensus.GetCurBlockViewID() cur := consensus.getCurBlockViewID()
if viewID > cur { if viewID > cur {
gap = int(viewID - cur) gap = int(viewID - cur)
} }
@ -196,7 +196,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe
Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()). Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()).
Int("gap", gap). Int("gap", gap).
Uint64("newViewID", viewID). Uint64("newViewID", viewID).
Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()). Uint64("myCurBlockViewID", consensus.getCurBlockViewID()).
Msg("[getNextLeaderKey] got leaderPubKey from coinbase") Msg("[getNextLeaderKey] got leaderPubKey from coinbase")
// wasFound, next := consensus.Decider.NthNext(lastLeaderPubKey, gap) // wasFound, next := consensus.Decider.NthNext(lastLeaderPubKey, gap)
// FIXME: rotate leader on harmony nodes only before fully externalization // FIXME: rotate leader on harmony nodes only before fully externalization
@ -234,7 +234,7 @@ func createTimeout() map[TimeoutType]*utils.Timeout {
// startViewChange start the view change process // startViewChange start the view change process
func (consensus *Consensus) startViewChange() { func (consensus *Consensus) startViewChange() {
if consensus.disableViewChange || consensus.IsBackup() { if consensus.disableViewChange || consensus.isBackup {
return return
} }
@ -242,7 +242,7 @@ func (consensus *Consensus) startViewChange() {
consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.current.SetMode(ViewChanging) consensus.current.SetMode(ViewChanging)
nextViewID, duration := consensus.getNextViewID() nextViewID, duration := consensus.getNextViewID()
consensus.SetViewChangingID(nextViewID) consensus.setViewChangingID(nextViewID)
// TODO: set the Leader PubKey to the next leader for view change // TODO: set the Leader PubKey to the next leader for view change
// this is dangerous as the leader change is not succeeded yet // this is dangerous as the leader change is not succeeded yet
// we use it this way as in many code we validate the messages // we use it this way as in many code we validate the messages

Loading…
Cancel
Save