Fixed lock.

pull/4509/head
frozen 2 years ago committed by Casey Gardiner
parent 69ae68c744
commit 53dfc9d357
  1. 37
      consensus/consensus_service.go
  2. 10
      consensus/consensus_v2.go
  3. 16
      internal/utils/singleton.go

@ -76,6 +76,10 @@ func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Messa
func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.updatePublicKeys(pubKeys, allowlist)
}
func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.Decider.UpdateParticipants(pubKeys, allowlist)
consensus.getLogger().Info().Msg("My Committee updated")
for i := range pubKeys {
@ -172,6 +176,13 @@ func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKe
// SetMode sets the mode of consensus
func (consensus *Consensus) SetMode(m Mode) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.setMode(m)
}
// SetMode sets the mode of consensus
func (consensus *Consensus) setMode(m Mode) {
if m == Normal && consensus.isBackup {
m = NormalBackup
}
@ -212,8 +223,8 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
if consensus.IgnoreViewIDCheck.IsSet() {
//in syncing mode, node accepts incoming messages without viewID/leaderKey checking
//so only set mode to normal when new node enters consensus and need checking viewID
consensus.SetMode(Normal)
consensus.SetViewIDs(msg.ViewID)
consensus.setMode(Normal)
consensus.setViewIDs(msg.ViewID)
if !msg.HasSingleSender() {
return errors.New("Leader message can not have multiple sender keys")
}
@ -268,6 +279,12 @@ func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offse
// (b) node in committed but has any err during processing: Syncing mode
// (c) node in committed and everything looks good: Normal mode
func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.updateConsensusInformation()
}
func (consensus *Consensus) updateConsensusInformation() Mode {
curHeader := consensus.Blockchain().CurrentHeader()
curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
@ -372,7 +389,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.getLogger().Info().
Int("numPubKeys", len(pubKeys)).
Msg("[UpdateConsensusInformation] Successfully updated public keys")
consensus.UpdatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist())
consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist())
// Update voters in the committee
if _, err := consensus.Decider.SetVoters(
@ -485,8 +502,8 @@ func (consensus *Consensus) SetViewIDs(height uint64) {
// SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) setViewIDs(height uint64) {
consensus.SetCurBlockViewID(height)
consensus.SetViewChangingID(height)
consensus.setCurBlockViewID(height)
consensus.setViewChangingID(height)
}
// SetCurBlockViewID set the current view ID
@ -494,11 +511,21 @@ func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) {
consensus.current.SetCurBlockViewID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) SetViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) setViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
}
// StartFinalityCount set the finality counter to current time
func (consensus *Consensus) StartFinalityCount() {
consensus.finalityCounter.Store(time.Now().UnixNano())

@ -348,7 +348,7 @@ func (consensus *Consensus) syncReadyChan() {
if consensus.BlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 {
consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.SetViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1)
mode := consensus.UpdateConsensusInformation()
mode := consensus.updateConsensusInformation()
consensus.current.SetMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
@ -357,8 +357,8 @@ func (consensus *Consensus) syncReadyChan() {
} else if consensus.Mode() == Syncing {
// Corner case where sync is triggered before `onCommitted` and there is a race
// for block insertion between consensus and downloader.
mode := consensus.UpdateConsensusInformation()
consensus.SetMode(mode)
mode := consensus.updateConsensusInformation()
consensus.setMode(mode)
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc()
@ -761,7 +761,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1)
consensus.setCurBlockViewID(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]
var epoch *big.Int
if blk.IsLastBlockInEpoch() {
@ -775,7 +775,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
// Update consensus keys at last so the change of leader status doesn't mess up normal flow
if blk.IsLastBlockInEpoch() {
consensus.SetMode(consensus.UpdateConsensusInformation())
consensus.setMode(consensus.updateConsensusInformation())
}
consensus.FBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.resetState()

@ -230,3 +230,19 @@ func AssertNoLongerThan0[E any](t time.Duration, f func() E) E {
return f()
}
func AssertNoLongerThan(t time.Duration, f func()) {
ch := make(chan struct{})
defer close(ch)
stack := debug.Stack()
go func() {
select {
case <-time.After(t):
panic("AssertNoLongerThan0: " + string(stack))
case <-ch:
return
}
}()
f()
}

Loading…
Cancel
Save