From 9cc1d5c14ef705c8474b8ad2a1c3b187c26890d6 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 3 Nov 2020 17:32:53 -0800 Subject: [PATCH] Disable pipelining during epoch change --- consensus/consensus_v2.go | 45 +++++++++++++++++---------------------- consensus/leader.go | 12 +++++++---- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 26a8d9d30..6f16b9cdf 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -136,12 +136,12 @@ func (consensus *Consensus) finalCommit() { } block.SetCurrentCommitSig(commitSigAndBitmap) - consensus.commitBlock(block, FBFTMsg) + err = consensus.commitBlock(block, FBFTMsg) - if consensus.blockNum-beforeCatchupNum != 1 { - consensus.getLogger().Warn(). + if err != nil || consensus.blockNum-beforeCatchupNum != 1 { + consensus.getLogger().Err(err). Uint64("beforeCatchupBlockNum", beforeCatchupNum). - Msg("[finalCommit] Leader cannot provide the correct block for committed message") + Msg("[finalCommit] Leader failed to commit the confirmed block") return } @@ -170,6 +170,10 @@ func (consensus *Consensus) finalCommit() { Msg("[finalCommit] Sent Committed Message") } + if consensus.IsLeader() && block.IsLastBlockInEpoch() { + consensus.ReadySignal <- AsyncProposal + } + // Dump new block into level db // In current code, we add signatures in block in tryCatchup, the block dump to explorer does not contains signatures // but since explorer doesn't need signatures, it should be fine @@ -507,27 +511,18 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { network.FBFTMsg consensus.FBFTLog.AddMessage(FBFTMsg) - blk.SetCurrentCommitSig(FBFTMsg.Payload) - if err := consensus.OnConsensusDone(blk); err != nil { - consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain") - return err - } - - // If it's not the epoch block, do pipelining and send committed message to validators now at 67% committed. - if !blk.IsLastBlockInEpoch() { - if err := consensus.msgSender.SendWithRetry( - blk.NumberU64(), - msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ - nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), - }, - p2p.ConstructMessage(msgToSend), true); err != nil { - consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message") - } else { - consensus.getLogger().Info(). - Str("blockHash", blk.Hash().Hex()). - Uint64("blockNum", consensus.blockNum). - Msg("[preCommitAndPropose] Sent Committed Message") - } + if err := consensus.msgSender.SendWithRetry( + blk.NumberU64(), + msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ + nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), + }, + p2p.ConstructMessage(msgToSend), true); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message") + } else { + consensus.getLogger().Info(). + Str("blockHash", blk.Hash().Hex()). + Uint64("blockNum", consensus.blockNum). + Msg("[preCommitAndPropose] Sent Committed Message") } // Send signal to Node to propose the new block for consensus diff --git a/consensus/leader.go b/consensus/leader.go index 5e3480fd5..b827aed6d 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -300,10 +300,14 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { if !quorumWasMet && quorumIsMet { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") - go func() { - // TODO: make it synchronized with commitFinishChan - consensus.preCommitAndPropose(blockObj) - }() + + // If it's not the epoch block, do pipelining and send committed message to validators now at 67% committed. + if !blockObj.IsLastBlockInEpoch() { + go func() { + // TODO: make it synchronized with commitFinishChan + consensus.preCommitAndPropose(blockObj) + }() + } consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period") go func(viewID uint64) {