Thread safe Decider. (#4610)

pull/4612/head
Konstantin 11 months ago committed by GitHub
parent 1dd67a801c
commit a31b4f5640
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 12
      consensus/consensus.go
  2. 26
      consensus/consensus_service.go
  3. 3
      consensus/consensus_test.go
  4. 12
      consensus/consensus_v2.go
  5. 4
      consensus/construct.go
  6. 4
      consensus/construct_test.go
  7. 2
      consensus/double_sign.go
  8. 26
      consensus/leader.go
  9. 179
      consensus/quorum/thread_safe_decider.go
  10. 2
      consensus/threshold.go
  11. 6
      consensus/validator.go
  12. 28
      consensus/view_change.go
  13. 6
      consensus/view_change_test.go
  14. 2
      node/api.go
  15. 2
      node/node.go
  16. 2
      node/node_explorer.go

@ -46,7 +46,7 @@ type DownloadAsync interface {
// Consensus is the main struct with all states and data related to consensus process. // Consensus is the main struct with all states and data related to consensus process.
type Consensus struct { type Consensus struct {
Decider quorum.Decider decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process // FBFTLog stores the pbft messages and blocks during FBFT process
fBFTLog *FBFTLog fBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc // phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
@ -200,7 +200,9 @@ func (consensus *Consensus) BlocksNotSynchronized(reason string) {
// VdfSeedSize returns the number of VRFs for VDF computation // VdfSeedSize returns the number of VRFs for VDF computation
func (consensus *Consensus) VdfSeedSize() int { func (consensus *Consensus) VdfSeedSize() int {
return int(consensus.Decider.ParticipantsCount()) * 2 / 3 consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return int(consensus.decider.ParticipantsCount()) * 2 / 3
} }
// GetPublicKeys returns the public keys // GetPublicKeys returns the public keys
@ -275,7 +277,7 @@ func New(
fBFTLog: NewFBFTLog(), fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce, phase: FBFTAnnounce,
current: State{mode: Normal}, current: State{mode: Normal},
Decider: Decider, decider: Decider,
registry: registry, registry: registry,
MinPeers: minPeers, MinPeers: minPeers,
AggregateSig: aggregateSig, AggregateSig: aggregateSig,
@ -322,6 +324,10 @@ func (consensus *Consensus) Registry() *registry.Registry {
return consensus.registry return consensus.registry
} }
func (consensus *Consensus) Decider() quorum.Decider {
return quorum.NewThreadSafeDecider(consensus.decider, consensus.mutex)
}
// InitConsensusWithValidators initialize shard state // InitConsensusWithValidators initialize shard state
// from latest epoch and update committee pub // from latest epoch and update committee pub
// keys for consensus // keys for consensus

@ -82,7 +82,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
} }
func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 { func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.Decider.UpdateParticipants(pubKeys, allowlist) consensus.decider.UpdateParticipants(pubKeys, allowlist)
consensus.getLogger().Info().Msg("My Committee updated") consensus.getLogger().Info().Msg("My Committee updated")
for i := range pubKeys { for i := range pubKeys {
consensus.getLogger().Info(). consensus.getLogger().Info().
@ -91,7 +91,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
Msg("Member") Msg("Member")
} }
allKeys := consensus.Decider.Participants() allKeys := consensus.decider.Participants()
if len(allKeys) != 0 { if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0] consensus.LeaderPubKey = &allKeys[0]
consensus.getLogger().Info(). consensus.getLogger().Info().
@ -115,7 +115,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
if !consensus.isViewChangingMode() { if !consensus.isViewChangingMode() {
consensus.resetViewChangeState() consensus.resetViewChangeState()
} }
return consensus.Decider.ParticipantsCount() return consensus.decider.ParticipantsCount()
} }
// Sign on the hash of the message // Sign on the hash of the message
@ -144,7 +144,7 @@ func (consensus *Consensus) updateBitmaps() {
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()). Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps") Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.Decider.Participants() members := consensus.decider.Participants()
prepareBitmap := bls_cosi.NewMask(members) prepareBitmap := bls_cosi.NewMask(members)
commitBitmap := bls_cosi.NewMask(members) commitBitmap := bls_cosi.NewMask(members)
multiSigBitmap := bls_cosi.NewMask(members) multiSigBitmap := bls_cosi.NewMask(members)
@ -160,7 +160,7 @@ func (consensus *Consensus) resetState() {
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
consensus.block = []byte{} consensus.block = []byte{}
consensus.Decider.ResetPrepareAndCommitVotes() consensus.decider.ResetPrepareAndCommitVotes()
if consensus.prepareBitmap != nil { if consensus.prepareBitmap != nil {
consensus.prepareBitmap.Clear() consensus.prepareBitmap.Clear()
} }
@ -179,7 +179,7 @@ func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKe
} }
func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool { func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
return consensus.Decider.IndexOf(pubKey) != -1 return consensus.decider.IndexOf(pubKey) != -1
} }
// SetMode sets the mode of consensus // SetMode sets the mode of consensus
@ -271,7 +271,7 @@ func (consensus *Consensus) setBlockNum(blockNum uint64) {
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading // ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) { func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) {
consensus.mutex.RLock() consensus.mutex.RLock()
members := consensus.Decider.Participants() members := consensus.decider.Participants()
consensus.mutex.RUnlock() consensus.mutex.RUnlock()
return consensus.readSignatureBitmapPayload(recvPayload, offset, members) return consensus.readSignatureBitmapPayload(recvPayload, offset, members)
} }
@ -334,12 +334,12 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) && isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch) curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) && haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) &&
consensus.Decider.Policy() != quorum.SuperMajorityStake consensus.decider.Policy() != quorum.SuperMajorityStake
// Only happens once, the flip-over to a new Decider policy // Only happens once, the flip-over to a new Decider policy
if isFirstTimeStaking || haventUpdatedDecider { if isFirstTimeStaking || haventUpdatedDecider {
decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID) decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID)
consensus.Decider = decider consensus.decider = decider
} }
var committeeToSet *shard.Committee var committeeToSet *shard.Committee
@ -412,7 +412,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist()) consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist())
// Update voters in the committee // Update voters in the committee
if _, err := consensus.Decider.SetVoters( if _, err := consensus.decider.SetVoters(
committeeToSet, epochToSet, committeeToSet, epochToSet,
); err != nil { ); err != nil {
consensus.getLogger().Error(). consensus.getLogger().Error().
@ -582,7 +582,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
return errGetPreparedBlock return errGetPreparedBlock
} }
aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.Decider.Participants()) aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
if err != nil { if err != nil {
return errReadBitmapPayload return errReadBitmapPayload
} }
@ -606,7 +606,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
continue continue
} }
if _, err := consensus.Decider.AddNewVote( if _, err := consensus.decider.AddNewVote(
quorum.Commit, quorum.Commit,
[]*bls_cosi.PublicKeyWrapper{key.Pub}, []*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload), key.Pri.SignHash(commitPayload),
@ -628,7 +628,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 { func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0) count := uint32(0)
consensus.mutex.Lock() consensus.mutex.Lock()
members := consensus.Decider.Participants() members := consensus.decider.Participants()
pubKeys := consensus.getPublicKeys() pubKeys := consensus.getPublicKeys()
consensus.mutex.Unlock() consensus.mutex.Unlock()

@ -18,7 +18,7 @@ import (
) )
func TestConsensusInitialization(t *testing.T) { func TestConsensusInitialization(t *testing.T) {
host, multiBLSPrivateKey, consensus, decider, err := GenerateConsensusForTesting() host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting()
assert.NoError(t, err) assert.NoError(t, err)
messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
@ -30,7 +30,6 @@ func TestConsensusInitialization(t *testing.T) {
expectedTimeouts[timeoutViewChange] = viewChangeDuration expectedTimeouts[timeoutViewChange] = viewChangeDuration
expectedTimeouts[timeoutBootstrap] = bootstrapDuration expectedTimeouts[timeoutBootstrap] = bootstrapDuration
assert.Equal(t, decider, consensus.Decider)
assert.Equal(t, host, consensus.host) assert.Equal(t, host, consensus.host)
assert.Equal(t, messageSender, consensus.msgSender) assert.Equal(t, messageSender, consensus.msgSender)

@ -91,7 +91,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
case t == msg_pb.MessageType_VIEWCHANGE: case t == msg_pb.MessageType_VIEWCHANGE:
fbftMsg, err = ParseViewChangeMessage(msg) fbftMsg, err = ParseViewChangeMessage(msg)
case t == msg_pb.MessageType_NEWVIEW: case t == msg_pb.MessageType_NEWVIEW:
members := consensus.Decider.Participants() members := consensus.decider.Participants()
fbftMsg, err = ParseNewViewMessage(msg, members) fbftMsg, err = ParseNewViewMessage(msg, members)
default: default:
fbftMsg, err = consensus.parseFBFTMessage(msg) fbftMsg, err = consensus.parseFBFTMessage(msg)
@ -138,7 +138,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
} }
func (consensus *Consensus) finalCommit() { func (consensus *Consensus) finalCommit() {
numCommits := consensus.Decider.SignersCount(quorum.Commit) numCommits := consensus.decider.SignersCount(quorum.Commit)
consensus.getLogger().Info(). consensus.getLogger().Info().
Int64("NumCommits", numCommits). Int64("NumCommits", numCommits).
@ -441,7 +441,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) {
Int("numTxs", len(newBlock.Transactions())). Int("numTxs", len(newBlock.Transactions())).
Int("numStakingTxs", len(newBlock.StakingTransactions())). Int("numStakingTxs", len(newBlock.StakingTransactions())).
Time("startTime", startTime). Time("startTime", startTime).
Int64("publicKeys", consensus.Decider.ParticipantsCount()). Int64("publicKeys", consensus.decider.ParticipantsCount()).
Msg("[ConsensusMainLoop] STARTING CONSENSUS") Msg("[ConsensusMainLoop] STARTING CONSENSUS")
consensus.announce(newBlock) consensus.announce(newBlock)
}) })
@ -741,16 +741,16 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {
for i := 0; i < len(committee.Slots); i++ { for i := 0; i < len(committee.Slots); i++ {
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) { if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(committee.Slots, leader, offset) wasFound, next = consensus.decider.NthNextValidator(committee.Slots, leader, offset)
} else { } else {
wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset) wasFound, next = consensus.decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset)
} }
if !wasFound { if !wasFound {
utils.Logger().Error().Msg("Failed to get next leader") utils.Logger().Error().Msg("Failed to get next leader")
// Seems like nothing we can do here. // Seems like nothing we can do here.
return nil return nil
} }
members := consensus.Decider.Participants() members := consensus.decider.Participants()
mask := bls.NewMask(members) mask := bls.NewMask(members)
skipped := 0 skipped := 0
for i := 0; i < blocksCountAliveness; i++ { for i := 0; i < blocksCountAliveness; i++ {

@ -82,7 +82,7 @@ func (consensus *Consensus) construct(
) )
} else { } else {
// TODO: use a persistent bitmap to report bitmap // TODO: use a persistent bitmap to report bitmap
mask := bls.NewMask(consensus.Decider.Participants()) mask := bls.NewMask(consensus.decider.Participants())
for _, key := range priKeys { for _, key := range priKeys {
mask.SetKey(key.Pub.Bytes, true) mask.SetKey(key.Pub.Bytes, true)
} }
@ -161,7 +161,7 @@ func (consensus *Consensus) construct(
func (consensus *Consensus) constructQuorumSigAndBitmap(p quorum.Phase) []byte { func (consensus *Consensus) constructQuorumSigAndBitmap(p quorum.Phase) []byte {
buffer := bytes.Buffer{} buffer := bytes.Buffer{}
// 96 bytes aggregated signature // 96 bytes aggregated signature
aggSig := consensus.Decider.AggregateVotes(p) aggSig := consensus.decider.AggregateVotes(p)
buffer.Write(aggSig.Serialize()) buffer.Write(aggSig.Serialize())
// Bitmap // Bitmap
if p == quorum.Prepare { if p == quorum.Prepare {

@ -81,7 +81,7 @@ func TestConstructPreparedMessage(test *testing.T) {
validatorKey := bls.SerializedPublicKey{} validatorKey := bls.SerializedPublicKey{}
validatorKey.FromLibBLSPublicKey(validatorPubKey) validatorKey.FromLibBLSPublicKey(validatorPubKey)
validatorKeyWrapper := bls.PublicKeyWrapper{Object: validatorPubKey, Bytes: validatorKey} validatorKeyWrapper := bls.PublicKeyWrapper{Object: validatorPubKey, Bytes: validatorKey}
consensus.Decider.AddNewVote( consensus.Decider().AddNewVote(
quorum.Prepare, quorum.Prepare,
[]*bls.PublicKeyWrapper{&leaderKeyWrapper}, []*bls.PublicKeyWrapper{&leaderKeyWrapper},
leaderPriKey.Sign(message), leaderPriKey.Sign(message),
@ -89,7 +89,7 @@ func TestConstructPreparedMessage(test *testing.T) {
consensus.BlockNum(), consensus.BlockNum(),
consensus.GetCurBlockViewID(), consensus.GetCurBlockViewID(),
) )
if _, err := consensus.Decider.AddNewVote( if _, err := consensus.Decider().AddNewVote(
quorum.Prepare, quorum.Prepare,
[]*bls.PublicKeyWrapper{&validatorKeyWrapper}, []*bls.PublicKeyWrapper{&validatorKeyWrapper},
validatorPriKey.Sign(message), validatorPriKey.Sign(message),

@ -17,7 +17,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
if consensus.couldThisBeADoubleSigner(recvMsg) { if consensus.couldThisBeADoubleSigner(recvMsg) {
addrSet := map[common.Address]struct{}{} addrSet := map[common.Address]struct{}{}
for _, pubKey2 := range recvMsg.SenderPubkeys { for _, pubKey2 := range recvMsg.SenderPubkeys {
if alreadyCastBallot := consensus.Decider.ReadBallot( if alreadyCastBallot := consensus.decider.ReadBallot(
quorum.Commit, pubKey2.Bytes, quorum.Commit, pubKey2.Bytes,
); alreadyCastBallot != nil { ); alreadyCastBallot != nil {
for _, pubKey1 := range alreadyCastBallot.SignerPubKeys { for _, pubKey1 := range alreadyCastBallot.SignerPubKeys {

@ -62,7 +62,7 @@ func (consensus *Consensus) announce(block *types.Block) {
continue continue
} }
if _, err := consensus.Decider.AddNewVote( if _, err := consensus.decider.AddNewVote(
quorum.Prepare, quorum.Prepare,
[]*bls.PublicKeyWrapper{key.Pub}, []*bls.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(consensus.blockHash[:]), key.Pri.SignHash(consensus.blockHash[:]),
@ -112,7 +112,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
prepareBitmap := consensus.prepareBitmap prepareBitmap := consensus.prepareBitmap
// proceed only when the message is not received before // proceed only when the message is not received before
for _, signer := range recvMsg.SenderPubkeys { for _, signer := range recvMsg.SenderPubkeys {
signed := consensus.Decider.ReadBallot(quorum.Prepare, signer.Bytes) signed := consensus.decider.ReadBallot(quorum.Prepare, signer.Bytes)
if signed != nil { if signed != nil {
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("validatorPubKey", signer.Bytes.Hex()). Str("validatorPubKey", signer.Bytes.Hex()).
@ -121,14 +121,14 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
} }
} }
if consensus.Decider.IsQuorumAchieved(quorum.Prepare) { if consensus.decider.IsQuorumAchieved(quorum.Prepare) {
// already have enough signatures // already have enough signatures
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Interface("validatorPubKeys", recvMsg.SenderPubkeys). Interface("validatorPubKeys", recvMsg.SenderPubkeys).
Msg("[OnPrepare] Received Additional Prepare Message") Msg("[OnPrepare] Received Additional Prepare Message")
return return
} }
signerCount := consensus.Decider.SignersCount(quorum.Prepare) signerCount := consensus.decider.SignersCount(quorum.Prepare)
//// Read - End //// Read - End
// Check BLS signature for the multi-sig // Check BLS signature for the multi-sig
@ -161,11 +161,11 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Int64("NumReceivedSoFar", signerCount). Int64("NumReceivedSoFar", signerCount).
Int64("PublicKeys", consensus.Decider.ParticipantsCount()). Int64("PublicKeys", consensus.decider.ParticipantsCount()).
Msg("[OnPrepare] Received New Prepare Signature") Msg("[OnPrepare] Received New Prepare Signature")
//// Write - Start //// Write - Start
if _, err := consensus.Decider.AddNewVote( if _, err := consensus.decider.AddNewVote(
quorum.Prepare, recvMsg.SenderPubkeys, quorum.Prepare, recvMsg.SenderPubkeys,
&sign, recvMsg.BlockHash, &sign, recvMsg.BlockHash,
recvMsg.BlockNum, recvMsg.ViewID, recvMsg.BlockNum, recvMsg.ViewID,
@ -181,7 +181,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
//// Write - End //// Write - End
//// Read - Start //// Read - Start
if consensus.Decider.IsQuorumAchieved(quorum.Prepare) { if consensus.decider.IsQuorumAchieved(quorum.Prepare) {
// NOTE Let it handle its own logs // NOTE Let it handle its own logs
if err := consensus.didReachPrepareQuorum(); err != nil { if err := consensus.didReachPrepareQuorum(); err != nil {
return return
@ -199,7 +199,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
} }
// proceed only when the message is not received before // proceed only when the message is not received before
for _, signer := range recvMsg.SenderPubkeys { for _, signer := range recvMsg.SenderPubkeys {
signed := consensus.Decider.ReadBallot(quorum.Commit, signer.Bytes) signed := consensus.decider.ReadBallot(quorum.Commit, signer.Bytes)
if signed != nil { if signed != nil {
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("validatorPubKey", signer.Bytes.Hex()). Str("validatorPubKey", signer.Bytes.Hex()).
@ -211,9 +211,9 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
commitBitmap := consensus.commitBitmap commitBitmap := consensus.commitBitmap
// has to be called before verifying signature // has to be called before verifying signature
quorumWasMet := consensus.Decider.IsQuorumAchieved(quorum.Commit) quorumWasMet := consensus.decider.IsQuorumAchieved(quorum.Commit)
signerCount := consensus.Decider.SignersCount(quorum.Commit) signerCount := consensus.decider.SignersCount(quorum.Commit)
//// Read - End //// Read - End
// Verify the signature on commitPayload is correct // Verify the signature on commitPayload is correct
@ -267,7 +267,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
return return
} }
*/ */
if _, err := consensus.Decider.AddNewVote( if _, err := consensus.decider.AddNewVote(
quorum.Commit, recvMsg.SenderPubkeys, quorum.Commit, recvMsg.SenderPubkeys,
&sign, recvMsg.BlockHash, &sign, recvMsg.BlockHash,
recvMsg.BlockNum, recvMsg.ViewID, recvMsg.BlockNum, recvMsg.ViewID,
@ -285,7 +285,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
//// Read - Start //// Read - Start
viewID := consensus.getCurBlockViewID() viewID := consensus.getCurBlockViewID()
if consensus.Decider.IsAllSigsCollected() { if consensus.decider.IsAllSigsCollected() {
logger.Info().Msg("[OnCommit] 100% Enough commits received") logger.Info().Msg("[OnCommit] 100% Enough commits received")
consensus.finalCommit() consensus.finalCommit()
@ -293,7 +293,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
return return
} }
quorumIsMet := consensus.Decider.IsQuorumAchieved(quorum.Commit) quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Commit)
//// Read - End //// Read - End
if !quorumWasMet && quorumIsMet { if !quorumWasMet && quorumIsMet {

@ -0,0 +1,179 @@
package quorum
import (
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/votepower"
"github.com/harmony-one/harmony/crypto/bls"
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/numeric"
"github.com/harmony-one/harmony/shard"
)
var _ Decider = threadSafeDeciderImpl{}
type threadSafeDeciderImpl struct {
mu *sync.RWMutex
decider Decider
}
func NewThreadSafeDecider(decider Decider, mu *sync.RWMutex) Decider {
return threadSafeDeciderImpl{
mu: mu,
decider: decider,
}
}
func (a threadSafeDeciderImpl) String() string {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.String()
}
func (a threadSafeDeciderImpl) Participants() multibls.PublicKeys {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.Participants()
}
func (a threadSafeDeciderImpl) IndexOf(key bls.SerializedPublicKey) int {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IndexOf(key)
}
func (a threadSafeDeciderImpl) ParticipantsCount() int64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.ParticipantsCount()
}
func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextValidator(slotList, pubKey, next)
}
func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextHmy(instance, pubkey, next)
}
func (a threadSafeDeciderImpl) NthNextHmyExt(instance shardingconfig.Instance, wrapper *bls.PublicKeyWrapper, i int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextHmyExt(instance, wrapper, i)
}
func (a threadSafeDeciderImpl) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.FirstParticipant(instance)
}
func (a threadSafeDeciderImpl) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.UpdateParticipants(pubKeys, allowlist)
}
func (a threadSafeDeciderImpl) submitVote(p Phase, pubkeys []bls.SerializedPublicKey, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.submitVote(p, pubkeys, sig, headerHash, height, viewID)
}
func (a threadSafeDeciderImpl) SignersCount(phase Phase) int64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.SignersCount(phase)
}
func (a threadSafeDeciderImpl) reset(phases []Phase) {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.reset(phases)
}
func (a threadSafeDeciderImpl) ReadBallot(p Phase, pubkey bls.SerializedPublicKey) *votepower.Ballot {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.ReadBallot(p, pubkey)
}
func (a threadSafeDeciderImpl) TwoThirdsSignersCount() int64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.TwoThirdsSignersCount()
}
func (a threadSafeDeciderImpl) AggregateVotes(p Phase) *bls_core.Sign {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.AggregateVotes(p)
}
func (a threadSafeDeciderImpl) SetVoters(subCommittee *shard.Committee, epoch *big.Int) (*TallyResult, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.SetVoters(subCommittee, epoch)
}
func (a threadSafeDeciderImpl) Policy() Policy {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.Policy()
}
func (a threadSafeDeciderImpl) AddNewVote(p Phase, pubkeys []*bls.PublicKeyWrapper, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.AddNewVote(p, pubkeys, sig, headerHash, height, viewID)
}
func (a threadSafeDeciderImpl) IsQuorumAchievedByMask(mask *bls.Mask) bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IsQuorumAchievedByMask(mask)
}
func (a threadSafeDeciderImpl) QuorumThreshold() numeric.Dec {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.QuorumThreshold()
}
func (a threadSafeDeciderImpl) IsAllSigsCollected() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IsAllSigsCollected()
}
func (a threadSafeDeciderImpl) ResetPrepareAndCommitVotes() {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.ResetPrepareAndCommitVotes()
}
func (a threadSafeDeciderImpl) ResetViewChangeVotes() {
a.mu.Lock()
defer a.mu.Unlock()
a.decider.ResetViewChangeVotes()
}
func (a threadSafeDeciderImpl) CurrentTotalPower(p Phase) (*numeric.Dec, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.CurrentTotalPower(p)
}
func (a threadSafeDeciderImpl) IsQuorumAchieved(p Phase) bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.IsQuorumAchieved(p)
}

@ -57,7 +57,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
continue continue
} }
if _, err := consensus.Decider.AddNewVote( if _, err := consensus.decider.AddNewVote(
quorum.Commit, quorum.Commit,
[]*bls.PublicKeyWrapper{key.Pub}, []*bls.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload), key.Pri.SignHash(commitPayload),

@ -199,12 +199,12 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) {
// check validity of prepared signature // check validity of prepared signature
blockHash := recvMsg.BlockHash blockHash := recvMsg.BlockHash
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.Decider.Participants()) aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.decider.Participants())
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!")
return return
} }
if !consensus.Decider.IsQuorumAchievedByMask(mask) { if !consensus.decider.IsQuorumAchievedByMask(mask) {
consensus.getLogger().Warn().Msgf("[OnPrepared] Quorum Not achieved.") consensus.getLogger().Warn().Msgf("[OnPrepared] Quorum Not achieved.")
return return
} }
@ -335,7 +335,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) {
return return
} }
aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.Decider.Participants()) aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.decider.Participants())
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed")
return return

@ -187,7 +187,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
// it can still sync with other validators. // it can still sync with other validators.
if curHeader.IsLastBlockInEpoch() { if curHeader.IsLastBlockInEpoch() {
consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch") consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch")
lastLeaderPubKey = consensus.Decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch))
} }
} }
} }
@ -204,18 +204,18 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
var next *bls.PublicKeyWrapper var next *bls.PublicKeyWrapper
if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) { if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) { if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator( wasFound, next = consensus.decider.NthNextValidator(
committee.Slots, committee.Slots,
lastLeaderPubKey, lastLeaderPubKey,
gap) gap)
} else { } else {
wasFound, next = consensus.Decider.NthNextHmy( wasFound, next = consensus.decider.NthNextHmy(
shard.Schedule.InstanceForEpoch(epoch), shard.Schedule.InstanceForEpoch(epoch),
lastLeaderPubKey, lastLeaderPubKey,
gap) gap)
} }
} else { } else {
wasFound, next = consensus.Decider.NthNextHmy( wasFound, next = consensus.decider.NthNextHmy(
shard.Schedule.InstanceForEpoch(epoch), shard.Schedule.InstanceForEpoch(epoch),
lastLeaderPubKey, lastLeaderPubKey,
gap) gap)
@ -281,7 +281,7 @@ func (consensus *Consensus) startViewChange() {
defer consensus.consensusTimeout[timeoutViewChange].Start() defer consensus.consensusTimeout[timeoutViewChange].Start()
// update the dictionary key if the viewID is first time received // update the dictionary key if the viewID is first time received
members := consensus.Decider.Participants() members := consensus.decider.Participants()
consensus.vc.AddViewIDKeyIfNotExist(nextViewID, members) consensus.vc.AddViewIDKeyIfNotExist(nextViewID, members)
// init my own payload // init my own payload
@ -386,10 +386,10 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
return return
} }
if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) {
consensus.getLogger().Info(). consensus.getLogger().Info().
Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)). Int64("have", consensus.decider.SignersCount(quorum.ViewChange)).
Int64("need", consensus.Decider.TwoThirdsSignersCount()). Int64("need", consensus.decider.TwoThirdsSignersCount()).
Interface("SenderPubkeys", recvMsg.SenderPubkeys). Interface("SenderPubkeys", recvMsg.SenderPubkeys).
Str("newLeaderKey", newLeaderKey.Bytes.Hex()). Str("newLeaderKey", newLeaderKey.Bytes.Hex()).
Msg("[onViewChange] Received Enough View Change Messages") Msg("[onViewChange] Received Enough View Change Messages")
@ -404,7 +404,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
senderKey := recvMsg.SenderPubkeys[0] senderKey := recvMsg.SenderPubkeys[0]
// update the dictionary key if the viewID is first time received // update the dictionary key if the viewID is first time received
members := consensus.Decider.Participants() members := consensus.decider.Participants()
consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members) consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members)
// do it once only per viewID/Leader // do it once only per viewID/Leader
@ -420,7 +420,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
return return
} }
err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.Decider, recvMsg, consensus.verifyBlock) err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.decider, recvMsg, consensus.verifyBlock)
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err). consensus.getLogger().Error().Err(err).
Uint64("viewID", recvMsg.ViewID). Uint64("viewID", recvMsg.ViewID).
@ -431,7 +431,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
} }
// received enough view change messages, change state to normal consensus // received enough view change messages, change state to normal consensus
if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() { if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() {
// no previous prepared message, go straight to normal mode // no previous prepared message, go straight to normal mode
// and start proposing new block // and start proposing new block
if consensus.vc.IsM1PayloadEmpty() { if consensus.vc.IsM1PayloadEmpty() {
@ -495,7 +495,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) {
} }
m3Mask := recvMsg.M3Bitmap m3Mask := recvMsg.M3Bitmap
if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) { if !consensus.decider.IsQuorumAchievedByMask(m3Mask) {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Msgf("[onNewView] Quorum Not achieved") Msgf("[onNewView] Quorum Not achieved")
return return
@ -507,7 +507,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) {
utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) { utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) {
// m1 is not empty, check it's valid // m1 is not empty, check it's valid
blockHash := recvMsg.Payload[:32] blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.Decider.Participants()) aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.decider.Participants())
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err). consensus.getLogger().Error().Err(err).
Msg("[onNewView] ReadSignatureBitmapPayload Failed") Msg("[onNewView] ReadSignatureBitmapPayload Failed")
@ -584,5 +584,5 @@ func (consensus *Consensus) resetViewChangeState() {
Msg("[ResetViewChangeState] Resetting view change state") Msg("[ResetViewChangeState] Resetting view change state")
consensus.current.SetMode(Normal) consensus.current.SetMode(Normal)
consensus.vc.Reset() consensus.vc.Reset()
consensus.Decider.ResetViewChangeVotes() consensus.decider.ResetViewChangeVotes()
} }

@ -94,7 +94,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
_, _, consensus, _, err := GenerateConsensusForTesting() _, _, consensus, _, err := GenerateConsensusForTesting()
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, int64(0), consensus.Decider.ParticipantsCount()) assert.Equal(t, int64(0), consensus.Decider().ParticipantsCount())
blsKeys := []*bls_core.PublicKey{} blsKeys := []*bls_core.PublicKey{}
wrappedBLSKeys := []bls.PublicKeyWrapper{} wrappedBLSKeys := []bls.PublicKeyWrapper{}
@ -111,8 +111,8 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {
wrappedBLSKeys = append(wrappedBLSKeys, wrapped) wrappedBLSKeys = append(wrappedBLSKeys, wrapped)
} }
consensus.Decider.UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{}) consensus.Decider().UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{})
assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount()) assert.Equal(t, keyCount, consensus.Decider().ParticipantsCount())
consensus.LeaderPubKey = &wrappedBLSKeys[0] consensus.LeaderPubKey = &wrappedBLSKeys[0]
nextKey := consensus.getNextLeaderKey(uint64(1), nil) nextKey := consensus.getNextLeaderKey(uint64(1), nil)

@ -177,7 +177,7 @@ func (node *Node) GetConfig() rpc_common.Config {
// GetLastSigningPower get last signed power // GetLastSigningPower get last signed power
func (node *Node) GetLastSigningPower() (float64, error) { func (node *Node) GetLastSigningPower() (float64, error) {
power, err := node.Consensus.Decider.CurrentTotalPower(quorum.Commit) power, err := node.Consensus.Decider().CurrentTotalPower(quorum.Commit)
if err != nil { if err != nil {
return 0, err return 0, err
} }

@ -655,7 +655,7 @@ func validateShardBoundMessage(consensus *consensus.Consensus, peer libp2p_peer.
return nil, nil, true, errors.WithStack(shard.ErrValidNotInCommittee) return nil, nil, true, errors.WithStack(shard.ErrValidNotInCommittee)
} }
} else { } else {
count := consensus.Decider.ParticipantsCount() count := consensus.Decider().ParticipantsCount()
if (count+7)>>3 != int64(len(senderBitmap)) { if (count+7)>>3 != int64(len(senderBitmap)) {
nodeConsensusMessageCounterVec.With(prometheus.Labels{"type": "invalid_participant_count"}).Inc() nodeConsensusMessageCounterVec.With(prometheus.Labels{"type": "invalid_participant_count"}).Inc()
return nil, nil, true, errors.WithStack(errWrongSizeOfBitmap) return nil, nil, true, errors.WithStack(errWrongSizeOfBitmap)

@ -53,7 +53,7 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag
return err return err
} }
if !node.Consensus.Decider.IsQuorumAchievedByMask(mask) { if !node.Consensus.Decider().IsQuorumAchievedByMask(mask) {
utils.Logger().Error().Msg("[Explorer] not have enough signature power") utils.Logger().Error().Msg("[Explorer] not have enough signature power")
return nil return nil
} }

Loading…
Cancel
Save