Fixed locks.

pull/4377/head
frozen 2 years ago committed by Casey Gardiner
parent 119c256e5f
commit 8f993111ef
  1. 10
      consensus/consensus_msg_sender.go
  2. 11
      consensus/consensus_service.go
  3. 9
      consensus/consensus_v2.go
  4. 5
      node/node_handler.go

@ -67,7 +67,10 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
sender.Retry(&msgRetry)
}()
}
return sender.host.SendMessageToGroups(groups, p2pMsg)
// MessageSender lays inside consensus, but internally calls consensus public api.
// Tt would be deadlock if run in current thread.
go sender.host.SendMessageToGroups(groups, p2pMsg)
return nil
}
// DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries.
@ -86,7 +89,10 @@ func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_p
// SendWithoutRetry sends message without retry logic.
func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error {
return sender.host.SendMessageToGroups(groups, p2pMsg)
// MessageSender lays inside consensus, but internally calls consensus public api.
// It would be deadlock if run in current thread.
go sender.host.SendMessageToGroups(groups, p2pMsg)
return nil
}
// Retry will retry the consensus message for <RetryTimes> times.

@ -74,6 +74,8 @@ func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Messa
// UpdatePublicKeys updates the PublicKeys for
// quorum on current subcommittee, protected by a mutex
func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.Decider.UpdateParticipants(pubKeys, allowlist)
consensus.getLogger().Info().Msg("My Committee updated")
for i := range pubKeys {
@ -94,7 +96,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
}
// reset states after update public keys
// TODO: incorporate bitmaps in the decider, so their state can't be inconsistent.
consensus.UpdateBitmaps()
consensus.updateBitmaps()
consensus.resetState()
// do not reset view change state if it is in view changing mode
@ -126,7 +128,7 @@ func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message,
}
// UpdateBitmaps update the bitmaps for prepare and commit phase
func (consensus *Consensus) UpdateBitmaps() {
func (consensus *Consensus) updateBitmaps() {
consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
@ -440,9 +442,8 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
// IsLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key
func (consensus *Consensus) IsLeader() bool {
// TODO: if remove locks blockchain stucks.
//consensus.mutex.RLock()
//defer consensus.mutex.RUnlock()
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isLeader()
}

@ -703,11 +703,14 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
}
consensus.FinishFinalityCount()
consensus.PostConsensusJob(blk)
consensus.SetupForNewConsensus(blk, committedMsg)
go func() {
consensus.PostConsensusJob(blk)
}()
consensus.setupForNewConsensus(blk, committedMsg)
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()).
Str("hash", blk.Header().Hash().Hex()).
Msg("Added New Block to Blockchain!!!")
return nil
}
@ -756,7 +759,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
}
// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]

@ -361,9 +361,8 @@ func VerifyNewBlock(nodeConfig *nodeconfig.ConfigType, blockChain core.BlockChai
}
// PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// 1. add the new block to blockchain
// 2. [leader] send new block to the client
// 3. [leader] send cross shard tx receipts to destination shard
// 1. [leader] send new block to the client
// 2. [leader] send cross shard tx receipts to destination shard
func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
if node.Consensus.IsLeader() {
if node.IsRunningBeaconChain() {

Loading…
Cancel
Save