send committed at 67%

pull/3405/head
Rongjian Lan 4 years ago
parent 0e95cb634b
commit 8db8802eed
  1. 2
      api/service/explorer/service.go
  2. 34
      api/service/syncing/syncing.go
  3. 5
      consensus/consensus_msg_sender.go
  4. 59
      consensus/consensus_v2.go
  5. 5
      consensus/leader.go
  6. 1
      consensus/threshold.go
  7. 27
      consensus/validator.go
  8. 2
      consensus/view_change.go
  9. 4
      node/node_syncing.go

@ -176,7 +176,7 @@ func (s *Service) GetTotalSupply(w http.ResponseWriter, r *http.Request) {
// GetNodeSync returns status code 500 if node is not in sync
func (s *Service) GetNodeSync(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
sync := !s.stateSync.IsOutOfSync(s.blockchain)
sync := !s.stateSync.IsOutOfSync(s.blockchain, false)
if !sync {
w.WriteHeader(http.StatusTeapot)
}

@ -899,18 +899,36 @@ func (ss *StateSync) GetMaxPeerHeight() uint64 {
}
// IsOutOfSync checks whether the node is out of sync from other peers
func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool {
func (ss *StateSync) IsOutOfSync(bc *core.BlockChain, doubleCheck bool) bool {
if ss.syncConfig == nil {
return true // If syncConfig is not instantiated, return not in sync
}
otherHeight := ss.getMaxPeerHeight(false)
otherHeight1 := ss.getMaxPeerHeight(false)
lastHeight := bc.CurrentBlock().NumberU64()
wasOutOfSync := lastHeight+inSyncThreshold < otherHeight1
if !doubleCheck {
utils.Logger().Info().
Uint64("OtherHeight", otherHeight1).
Uint64("lastHeight", lastHeight).
Msg("[SYNC] Checking sync status")
return wasOutOfSync
}
time.Sleep(3 * time.Second)
// double check the sync status after 3 second to confirm (avoid false alarm)
otherHeight2 := ss.getMaxPeerHeight(false)
currentHeight := bc.CurrentBlock().NumberU64()
utils.Logger().Debug().
Uint64("OtherHeight", otherHeight).
Uint64("MyHeight", currentHeight).
Bool("IsOutOfSync", currentHeight+inSyncThreshold < otherHeight).
isOutOfSync := currentHeight+inSyncThreshold < otherHeight2
utils.Logger().Info().
Uint64("OtherHeight1", otherHeight1).
Uint64("OtherHeight2", otherHeight2).
Uint64("lastHeight", lastHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] Checking sync status")
return currentHeight+inSyncThreshold < otherHeight
// Only confirm out of sync when the node has lower height and didn't move in heights for 2 consecutive checks
return wasOutOfSync && isOutOfSync && lastHeight == currentHeight
}
// SyncLoop will keep syncing with peers until catches up
@ -978,7 +996,7 @@ func (ss *StateSync) addConsensusLastMile(bc *core.BlockChain, consensus *consen
return errors.Wrap(err, "failed to InsertChain")
}
}
consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64() + 1)
consensus.FBFTLog.PruneCacheBeforeBlock(bc.CurrentBlock().NumberU64())
return nil
}

@ -56,7 +56,7 @@ func (sender *MessageSender) Reset(blockNum uint64) {
}
// SendWithRetry sends message with retry logic.
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) error {
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte, immediate bool) error {
if sender.retryTimes != 0 {
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0}
atomic.StoreUint32(&msgRetry.isActive, 1)
@ -65,7 +65,10 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
sender.Retry(&msgRetry)
}()
}
if immediate {
return sender.host.SendMessageToGroups(groups, p2pMsg)
}
return nil
}
// SendWithoutRetry sends message without retry logic.

@ -149,12 +149,17 @@ func (consensus *Consensus) finalCommit() {
}
// if leader successfully finalizes the block, send committed message to validators
sendImmediately := false
if !consensus.IsLeader() || block.IsLastBlockInEpoch() {
sendImmediately = true
}
if err := consensus.msgSender.SendWithRetry(
block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend)); err != nil {
p2p.ConstructMessage(msgToSend),
sendImmediately); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message")
} else {
consensus.getLogger().Info().
@ -195,6 +200,14 @@ func (consensus *Consensus) finalCommit() {
// If still the leader, send commit sig/bitmap to finish the new block proposal,
// else, the block proposal will timeout by itself.
if consensus.IsLeader() {
if block.IsLastBlockInEpoch() {
// No pipelining
go func() {
consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal")
consensus.ReadySignal <- SyncProposal
}()
} else {
// pipelining
go func() {
select {
case consensus.CommitSigChannel <- commitSigAndBitmap:
@ -203,6 +216,7 @@ func (consensus *Consensus) finalCommit() {
}
}()
}
}
}
// BlockCommitSigs returns the byte array of aggregated
@ -290,12 +304,14 @@ func (consensus *Consensus) Start(
case <-consensus.syncReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan")
consensus.mutex.Lock()
if consensus.blockNum < consensus.Blockchain.CurrentHeader().Number().Uint64()+1 {
consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
consensus.SetViewIDs(consensus.Blockchain.CurrentHeader().ViewID().Uint64() + 1)
mode := consensus.UpdateConsensusInformation()
consensus.current.SetMode(mode)
consensus.mutex.Unlock()
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC")
}
consensus.mutex.Unlock()
case <-consensus.syncNotReadyChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
@ -393,7 +409,7 @@ func (consensus *Consensus) Start(
consensus.announce(newBlock)
case viewID := <-consensus.commitFinishChan:
consensus.getLogger().Info().Msg("[ConsensusMainLoop] commitFinishChan")
consensus.getLogger().Info().Uint64("viewID", viewID).Msg("[ConsensusMainLoop] commitFinishChan")
// Only Leader execute this condition
func() {
@ -487,9 +503,26 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
return errors.New("block to pre-commit is nil")
}
leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey()
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found")
return err
}
// Construct committed message
consensus.mutex.Lock()
bareMinimumCommit := consensus.constructQuorumSigAndBitmap(quorum.Commit)
network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey})
consensus.mutex.Unlock()
if err != nil {
consensus.getLogger().Warn().Err(err).
Msg("[preCommitAndPropose] Unable to construct Committed message")
return err
}
msgToSend, FBFTMsg :=
network.Bytes,
network.FBFTMsg
bareMinimumCommit := FBFTMsg.Payload
consensus.FBFTLog.AddMessage(FBFTMsg)
blk.SetCurrentCommitSig(bareMinimumCommit)
@ -498,6 +531,22 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
return err
}
// if leader successfully finalizes the block, send committed message to validators
if err := consensus.msgSender.SendWithRetry(
blk.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend),
true); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
} else {
consensus.getLogger().Info().
Str("blockHash", blk.Hash().Hex()).
Uint64("blockNum", consensus.blockNum).
Msg("[preCommitAndPropose] Sent Committed Message")
}
// Send signal to Node to propose the new block for consensus
consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal")
@ -573,7 +622,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.AddUint64(&consensus.blockNum, 1)
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]
// Update consensus keys at last so the change of leader status doesn't mess up normal flow

@ -80,7 +80,7 @@ func (consensus *Consensus) announce(block *types.Block) {
if err := consensus.msgSender.SendWithRetry(
consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, p2p.ConstructMessage(msgToSend)); err != nil {
}, p2p.ConstructMessage(msgToSend), true); err != nil {
consensus.getLogger().Warn().
Str("groupID", string(nodeconfig.NewGroupIDByShardID(
nodeconfig.ShardID(consensus.ShardID),
@ -300,10 +300,13 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
if !quorumWasMet && quorumIsMet {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
if !blockObj.IsLastBlockInEpoch() {
// only do early commit if it's not epoch block to avoid problems
go func() {
// TODO: make it synchronized with commitFinishChan
consensus.preCommitAndPropose(blockObj)
}()
}
consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period")
go func(viewID uint64) {

@ -74,6 +74,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend),
true,
); err != nil {
consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message")
} else {

@ -229,7 +229,11 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message")
if !consensus.isRightBlockNumCheck(recvMsg) {
// Ok to receive committed from last block since it could have more signatures
if recvMsg.BlockNum < consensus.blockNum-1 {
consensus.getLogger().Debug().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("Wrong BlockNum Received, ignoring!")
return
}
@ -244,7 +248,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return
}
if !consensus.Decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn().Msgf("[OnCommitted] Quorum Not achieved.")
consensus.getLogger().Warn().Hex("sigbitmap", recvMsg.Payload).Msgf("[OnCommitted] Quorum Not achieved.")
return
}
@ -272,14 +276,23 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.FBFTLog.AddMessage(recvMsg)
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync")
consensus.spinUpStateSync()
}
consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask
// If we already have a committed signature received before, check whether the new one
// has more signatures and if yes, override the old data.
// Otherwise, simply write the commit signature in db.
commitSigBitmap, err := consensus.Blockchain.ReadCommitSig(blockObj.NumberU64())
if err == nil && len(commitSigBitmap) == len(recvMsg.Payload) {
new := mask.CountEnabled()
mask.SetMask(commitSigBitmap[bls.BLSSignatureSizeInBytes:])
cur := mask.CountEnabled()
if new > cur {
consensus.getLogger().Info().Hex("old", commitSigBitmap).Hex("new", recvMsg.Payload).Msg("[OnCommitted] Overriding commit signatures!!")
consensus.Blockchain.WriteCommitSig(blockObj.NumberU64(), recvMsg.Payload)
}
}
consensus.tryCatchup()
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC")

@ -265,6 +265,7 @@ func (consensus *Consensus) startViewChange() {
[]nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend),
true,
); err != nil {
consensus.getLogger().Err(err).
Msg("[startViewChange] could not send out the ViewChange message")
@ -294,6 +295,7 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri
[]nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend),
true,
); err != nil {
return errors.New("failed to send out the NewView message")
}

@ -260,7 +260,7 @@ func (node *Node) doSync(bc *core.BlockChain, worker *worker.Worker, willJoinCon
utils.Logger().Debug().Int("len", node.stateSync.GetActivePeerNumber()).Msg("[SYNC] Get Active Peers")
}
// TODO: treat fake maximum height
if node.stateSync.IsOutOfSync(bc) {
if node.stateSync.IsOutOfSync(bc, true) {
node.IsInSync.UnSet()
if willJoinConsensus {
node.Consensus.BlocksNotSynchronized()
@ -542,5 +542,5 @@ func (node *Node) GetMaxPeerHeight() uint64 {
// IsOutOfSync ...
func (node *Node) IsOutOfSync(bc *core.BlockChain) bool {
return node.stateSync.IsOutOfSync(bc)
return node.stateSync.IsOutOfSync(bc, false)
}

Loading…
Cancel
Save