diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index bed0e2272..112d4dda7 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -176,7 +176,7 @@ func (s *Service) GetTotalSupply(w http.ResponseWriter, r *http.Request) { // GetNodeSync returns status code 500 if node is not in sync func (s *Service) GetNodeSync(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - sync := !s.stateSync.IsOutOfSync(s.blockchain) + sync := !s.stateSync.IsOutOfSync(s.blockchain, false) if !sync { w.WriteHeader(http.StatusTeapot) } diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index cf37f1de7..124e45d92 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -899,18 +899,36 @@ func (ss *StateSync) GetMaxPeerHeight() uint64 { } // IsOutOfSync checks whether the node is out of sync from other peers -func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool { +func (ss *StateSync) IsOutOfSync(bc *core.BlockChain, doubleCheck bool) bool { if ss.syncConfig == nil { return true // If syncConfig is not instantiated, return not in sync } - otherHeight := ss.getMaxPeerHeight(false) + otherHeight1 := ss.getMaxPeerHeight(false) + lastHeight := bc.CurrentBlock().NumberU64() + wasOutOfSync := lastHeight+inSyncThreshold < otherHeight1 + + if !doubleCheck { + utils.Logger().Info(). + Uint64("OtherHeight", otherHeight1). + Uint64("lastHeight", lastHeight). + Msg("[SYNC] Checking sync status") + return wasOutOfSync + } + time.Sleep(3 * time.Second) + // double check the sync status after 3 second to confirm (avoid false alarm) + + otherHeight2 := ss.getMaxPeerHeight(false) currentHeight := bc.CurrentBlock().NumberU64() - utils.Logger().Debug(). - Uint64("OtherHeight", otherHeight). - Uint64("MyHeight", currentHeight). - Bool("IsOutOfSync", currentHeight+inSyncThreshold < otherHeight). + + isOutOfSync := currentHeight+inSyncThreshold < otherHeight2 + utils.Logger().Info(). + Uint64("OtherHeight1", otherHeight1). + Uint64("OtherHeight2", otherHeight2). + Uint64("lastHeight", lastHeight). + Uint64("currentHeight", currentHeight). Msg("[SYNC] Checking sync status") - return currentHeight+inSyncThreshold < otherHeight + // Only confirm out of sync when the node has lower height and didn't move in heights for 2 consecutive checks + return wasOutOfSync && isOutOfSync && lastHeight == currentHeight } // SyncLoop will keep syncing with peers until catches up @@ -978,7 +996,7 @@ func (ss *StateSync) addConsensusLastMile(bc *core.BlockChain, consensus *consen return errors.Wrap(err, "failed to InsertChain") } } - consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64() + 1) + consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64()) return nil } diff --git a/consensus/consensus_msg_sender.go b/consensus/consensus_msg_sender.go index 9d1da5149..bbd63a746 100644 --- a/consensus/consensus_msg_sender.go +++ b/consensus/consensus_msg_sender.go @@ -56,7 +56,7 @@ func (sender *MessageSender) Reset(blockNum uint64) { } // SendWithRetry sends message with retry logic. -func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) error { +func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte, immediate bool) error { if sender.retryTimes != 0 { msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0} atomic.StoreUint32(&msgRetry.isActive, 1) @@ -65,7 +65,10 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa sender.Retry(&msgRetry) }() } - return sender.host.SendMessageToGroups(groups, p2pMsg) + if immediate { + return sender.host.SendMessageToGroups(groups, p2pMsg) + } + return nil } // SendWithoutRetry sends message without retry logic. diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index cb50c6258..3fc719c46 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -149,12 +149,17 @@ func (consensus *Consensus) finalCommit() { } // if leader successfully finalizes the block, send committed message to validators + sendImmediately := false + if !consensus.IsLeader() || block.IsLastBlockInEpoch() { + sendImmediately = true + } if err := consensus.msgSender.SendWithRetry( block.NumberU64(), msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), }, - p2p.ConstructMessage(msgToSend)); err != nil { + p2p.ConstructMessage(msgToSend), + sendImmediately); err != nil { consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message") } else { consensus.getLogger().Info(). @@ -195,13 +200,22 @@ func (consensus *Consensus) finalCommit() { // If still the leader, send commit sig/bitmap to finish the new block proposal, // else, the block proposal will timeout by itself. if consensus.IsLeader() { - go func() { - select { - case consensus.CommitSigChannel <- commitSigAndBitmap: - case <-time.After(6 * time.Second): - utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap") - } - }() + if block.IsLastBlockInEpoch() { + // No pipelining + go func() { + consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal") + consensus.ReadySignal <- SyncProposal + }() + } else { + // pipelining + go func() { + select { + case consensus.CommitSigChannel <- commitSigAndBitmap: + case <-time.After(6 * time.Second): + utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap") + } + }() + } } } @@ -290,12 +304,14 @@ func (consensus *Consensus) Start( case <-consensus.syncReadyChan: consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") consensus.mutex.Lock() - consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1) - consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1) - mode := consensus.UpdateConsensusInformation() - consensus.current.SetMode(mode) + if consensus.blockNum < consensus.Blockchain.CurrentHeader().Number().Uint64()+1 { + consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1) + consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1) + mode := consensus.UpdateConsensusInformation() + consensus.current.SetMode(mode) + consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") + } consensus.mutex.Unlock() - consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") case <-consensus.syncNotReadyChan: consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") @@ -393,7 +409,7 @@ func (consensus *Consensus) Start( consensus.announce(newBlock) case viewID := <-consensus.commitFinishChan: - consensus.getLogger().Info().Msg("[ConsensusMainLoop] commitFinishChan") + consensus.getLogger().Info().Uint64("viewID", viewID).Msg("[ConsensusMainLoop] commitFinishChan") // Only Leader execute this condition func() { @@ -487,9 +503,26 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { return errors.New("block to pre-commit is nil") } + leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() + if err != nil { + consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found") + return err + } + + // Construct committed message consensus.mutex.Lock() - bareMinimumCommit := consensus.constructQuorumSigAndBitmap(quorum.Commit) + network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey}) consensus.mutex.Unlock() + if err != nil { + consensus.getLogger().Warn().Err(err). + Msg("[preCommitAndPropose] Unable to construct Committed message") + return err + } + msgToSend, FBFTMsg := + network.Bytes, + network.FBFTMsg + bareMinimumCommit := FBFTMsg.Payload + consensus.FBFTLog.AddMessage(FBFTMsg) blk.SetCurrentCommitSig(bareMinimumCommit) @@ -498,6 +531,22 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { return err } + // if leader successfully finalizes the block, send committed message to validators + if err := consensus.msgSender.SendWithRetry( + blk.NumberU64(), + msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ + nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), + }, + p2p.ConstructMessage(msgToSend), + true); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message") + } else { + consensus.getLogger().Info(). + Str("blockHash", blk.Hash().Hex()). + Uint64("blockNum", consensus.blockNum). + Msg("[preCommitAndPropose] Sent Committed Message") + } + // Send signal to Node to propose the new block for consensus consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal") @@ -573,7 +622,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess // SetupForNewConsensus sets the state for new consensus func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { - atomic.AddUint64(&consensus.blockNum, 1) + atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) consensus.SetCurBlockViewID(committedMsg.ViewID + 1) consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] // Update consensus keys at last so the change of leader status doesn't mess up normal flow diff --git a/consensus/leader.go b/consensus/leader.go index 7ae69bb59..26c751ff9 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -80,7 +80,7 @@ func (consensus *Consensus) announce(block *types.Block) { if err := consensus.msgSender.SendWithRetry( consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), - }, p2p.ConstructMessage(msgToSend)); err != nil { + }, p2p.ConstructMessage(msgToSend), true); err != nil { consensus.getLogger().Warn(). Str("groupID", string(nodeconfig.NewGroupIDByShardID( nodeconfig.ShardID(consensus.ShardID), @@ -300,10 +300,13 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { if !quorumWasMet && quorumIsMet { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") - go func() { - // TODO: make it synchronized with commitFinishChan - consensus.preCommitAndPropose(blockObj) - }() + if !blockObj.IsLastBlockInEpoch() { + // only do early commit if it's not epoch block to avoid problems + go func() { + // TODO: make it synchronized with commitFinishChan + consensus.preCommitAndPropose(blockObj) + }() + } consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period") go func(viewID uint64) { diff --git a/consensus/threshold.go b/consensus/threshold.go index 503185f3b..f9ced1139 100644 --- a/consensus/threshold.go +++ b/consensus/threshold.go @@ -74,6 +74,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), }, p2p.ConstructMessage(msgToSend), + true, ); err != nil { consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") } else { diff --git a/consensus/validator.go b/consensus/validator.go index 7655da547..a692d3969 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -229,7 +229,11 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { Uint64("MsgViewID", recvMsg.ViewID). Msg("[OnCommitted] Received committed message") - if !consensus.isRightBlockNumCheck(recvMsg) { + // Ok to receive committed from last block since it could have more signatures + if recvMsg.BlockNum < consensus.blockNum-1 { + consensus.getLogger().Debug(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Msg("Wrong BlockNum Received, ignoring!") return } @@ -244,7 +248,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { return } if !consensus.Decider.IsQuorumAchievedByMask(mask) { - consensus.getLogger().Warn().Msgf("[OnCommitted] Quorum Not achieved.") + consensus.getLogger().Warn().Hex("sigbitmap", recvMsg.Payload).Msgf("[OnCommitted] Quorum Not achieved.") return } @@ -272,14 +276,23 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.FBFTLog.AddMessage(recvMsg) - if recvMsg.BlockNum > consensus.blockNum { - consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync") - consensus.spinUpStateSync() - } - consensus.aggregatedCommitSig = aggSig consensus.commitBitmap = mask + // If we already have a committed signature received before, check whether the new one + // has more signatures and if yes, override the old data. + // Otherwise, simply write the commit signature in db. + commitSigBitmap, err := consensus.Blockchain.ReadCommitSig(blockObj.NumberU64()) + if err == nil && len(commitSigBitmap) == len(recvMsg.Payload) { + new := mask.CountEnabled() + mask.SetMask(commitSigBitmap[bls.BLSSignatureSizeInBytes:]) + cur := mask.CountEnabled() + if new > cur { + consensus.getLogger().Info().Hex("old", commitSigBitmap).Hex("new", recvMsg.Payload).Msg("[OnCommitted] Overriding commit signatures!!") + consensus.Blockchain.WriteCommitSig(blockObj.NumberU64(), recvMsg.Payload) + } + } + consensus.tryCatchup() if recvMsg.BlockNum > consensus.blockNum { consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC") diff --git a/consensus/view_change.go b/consensus/view_change.go index 7b8cb42a4..1a94f2892 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -265,6 +265,7 @@ func (consensus *Consensus) startViewChange() { []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, p2p.ConstructMessage(msgToSend), + true, ); err != nil { consensus.getLogger().Err(err). Msg("[startViewChange] could not send out the ViewChange message") @@ -294,6 +295,7 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, p2p.ConstructMessage(msgToSend), + true, ); err != nil { return errors.New("failed to send out the NewView message") } diff --git a/node/node_syncing.go b/node/node_syncing.go index 77d2aa8ca..7e5fab3b6 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -260,7 +260,7 @@ func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinCon utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers") } // TODO: treat fake maximum height - if node.stateSync.IsOutOfSync(bc) { + if node.stateSync.IsOutOfSync(bc, true) { node.IsInSync.UnSet() if willJoinConsensus { node.Consensus.BlocksNotSynchronized() @@ -542,5 +542,5 @@ func (node *Node) GetMaxPeerHeight() uint64 { // IsOutOfSync ... func (node *Node) IsOutOfSync(bc *core.BlockChain) bool { - return node.stateSync.IsOutOfSync(bc) + return node.stateSync.IsOutOfSync(bc, false) }