Merge pull request #3419 from rlan35/fix_vc_commit

Fix view change stuck issue
pull/3425/head
Rongjian Lan 4 years ago committed by GitHub
commit d65f64a55c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      api/service/syncing/syncing.go
  2. 10
      consensus/consensus_service.go
  3. 2
      consensus/consensus_v2.go
  4. 10
      consensus/construct_test.go
  5. 2
      consensus/quorum/one-node-one-vote.go
  6. 2
      consensus/quorum/one-node-staked-vote.go
  7. 14
      consensus/quorum/quorom_test.go
  8. 6
      consensus/quorum/quorum.go
  9. 19
      consensus/validator.go
  10. 5
      consensus/view_change.go

@ -32,7 +32,7 @@ const (
TimesToFail = 5 // downloadBlocks service retry limit TimesToFail = 5 // downloadBlocks service retry limit
RegistrationNumber = 3 RegistrationNumber = 3
SyncingPortDifference = 3000 SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus inSyncThreshold = 1 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes SyncLoopBatchSize uint32 = 1000 // maximum size for one query of block hashes
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size
SyncLoopFrequency = 1 // unit in second SyncLoopFrequency = 1 // unit in second

@ -421,6 +421,12 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
Msg("[UpdateConsensusInformation] changing committee") Msg("[UpdateConsensusInformation] changing committee")
// take care of possible leader change during the epoch // take care of possible leader change during the epoch
// TODO: in a very rare case, when a M1 view change happened, the block contains coinbase for last leader
// but the new leader is actually recognized by most of the nodes. At this time, if a node sync to this
// exact block and set its leader, it will set with the failed leader as in the coinbase of the block.
// This is a very rare case scenario and not likely to cause any issue in mainnet. But we need to think about
// a solution to take care of this case because the coinbase of the latest block doesn't really represent the
// the real current leader in case of M1 view change.
if !curHeader.IsLastBlockInEpoch() && curHeader.Number().Uint64() != 0 { if !curHeader.IsLastBlockInEpoch() && curHeader.Number().Uint64() != 0 {
leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(curHeader) leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil || leaderPubKey == nil { if err != nil || leaderPubKey == nil {
@ -573,9 +579,9 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
continue continue
} }
if _, err := consensus.Decider.SubmitVote( if _, err := consensus.Decider.AddNewVote(
quorum.Commit, quorum.Commit,
[]bls.SerializedPublicKey{key.Pub.Bytes}, []*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload), key.Pri.SignHash(commitPayload),
common.BytesToHash(consensus.blockHash[:]), common.BytesToHash(consensus.blockHash[:]),
block.NumberU64(), block.NumberU64(),

@ -517,7 +517,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
} }
atomic.AddUint64(&consensus.blockNum, 1) atomic.AddUint64(&consensus.blockNum, 1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1) consensus.SetViewIDs(committedMsg.ViewID + 1)
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]
// Update consensus keys at last so the change of leader status doesn't mess up normal flow // Update consensus keys at last so the change of leader status doesn't mess up normal flow
if blk.IsLastBlockInEpoch() { if blk.IsLastBlockInEpoch() {

@ -71,19 +71,21 @@ func TestConstructPreparedMessage(test *testing.T) {
message := "test string" message := "test string"
leaderKey := bls.SerializedPublicKey{} leaderKey := bls.SerializedPublicKey{}
leaderKey.FromLibBLSPublicKey(leaderPubKey) leaderKey.FromLibBLSPublicKey(leaderPubKey)
leaderKeyWrapper := bls.PublicKeyWrapper{Object: leaderPubKey, Bytes: leaderKey}
validatorKey := bls.SerializedPublicKey{} validatorKey := bls.SerializedPublicKey{}
validatorKey.FromLibBLSPublicKey(validatorPubKey) validatorKey.FromLibBLSPublicKey(validatorPubKey)
consensus.Decider.SubmitVote( validatorKeyWrapper := bls.PublicKeyWrapper{Object: validatorPubKey, Bytes: validatorKey}
consensus.Decider.AddNewVote(
quorum.Prepare, quorum.Prepare,
[]bls.SerializedPublicKey{leaderKey}, []*bls.PublicKeyWrapper{&leaderKeyWrapper},
leaderPriKey.Sign(message), leaderPriKey.Sign(message),
common.BytesToHash(consensus.blockHash[:]), common.BytesToHash(consensus.blockHash[:]),
consensus.blockNum, consensus.blockNum,
consensus.GetCurBlockViewID(), consensus.GetCurBlockViewID(),
) )
if _, err := consensus.Decider.SubmitVote( if _, err := consensus.Decider.AddNewVote(
quorum.Prepare, quorum.Prepare,
[]bls.SerializedPublicKey{validatorKey}, []*bls.PublicKeyWrapper{&validatorKeyWrapper},
validatorPriKey.Sign(message), validatorPriKey.Sign(message),
common.BytesToHash(consensus.blockHash[:]), common.BytesToHash(consensus.blockHash[:]),
consensus.blockNum, consensus.blockNum,

@ -36,7 +36,7 @@ func (v *uniformVoteWeight) AddNewVote(
for i, pubKey := range pubKeys { for i, pubKey := range pubKeys {
pubKeysBytes[i] = pubKey.Bytes pubKeysBytes[i] = pubKey.Bytes
} }
return v.SubmitVote(p, pubKeysBytes, sig, headerHash, height, viewID) return v.submitVote(p, pubKeysBytes, sig, headerHash, height, viewID)
} }
// IsQuorumAchieved .. // IsQuorumAchieved ..

@ -82,7 +82,7 @@ func (v *stakedVoteWeight) AddNewVote(
pubKeysBytes[i] = pubKey.Bytes pubKeysBytes[i] = pubKey.Bytes
} }
ballet, err := v.SubmitVote(p, pubKeysBytes, sig, headerHash, height, viewID) ballet, err := v.submitVote(p, pubKeysBytes, sig, headerHash, height, viewID)
if err != nil { if err != nil {
return ballet, err return ballet, err

@ -88,7 +88,7 @@ func TestSubmitVote(test *testing.T) {
decider.UpdateParticipants([]bls.PublicKeyWrapper{pubKeyWrapper1, pubKeyWrapper2}) decider.UpdateParticipants([]bls.PublicKeyWrapper{pubKeyWrapper1, pubKeyWrapper2})
if _, err := decider.SubmitVote( if _, err := decider.submitVote(
Prepare, Prepare,
[]bls.SerializedPublicKey{pubKeyWrapper1.Bytes}, []bls.SerializedPublicKey{pubKeyWrapper1.Bytes},
blsPriKey1.Sign(message), blsPriKey1.Sign(message),
@ -99,7 +99,7 @@ func TestSubmitVote(test *testing.T) {
test.Log(err) test.Log(err)
} }
if _, err := decider.SubmitVote( if _, err := decider.submitVote(
Prepare, Prepare,
[]bls.SerializedPublicKey{pubKeyWrapper2.Bytes}, []bls.SerializedPublicKey{pubKeyWrapper2.Bytes},
blsPriKey2.Sign(message), blsPriKey2.Sign(message),
@ -110,7 +110,7 @@ func TestSubmitVote(test *testing.T) {
test.Log(err) test.Log(err)
} }
if decider.SignersCount(Prepare) != 2 { if decider.SignersCount(Prepare) != 2 {
test.Fatal("SubmitVote failed") test.Fatal("submitVote failed")
} }
aggSig := &bls_core.Sign{} aggSig := &bls_core.Sign{}
@ -145,7 +145,7 @@ func TestSubmitVoteAggregateSig(test *testing.T) {
decider.UpdateParticipants([]bls.PublicKeyWrapper{pubKeyWrapper1, pubKeyWrapper2}) decider.UpdateParticipants([]bls.PublicKeyWrapper{pubKeyWrapper1, pubKeyWrapper2})
decider.SubmitVote( decider.submitVote(
Prepare, Prepare,
[]bls.SerializedPublicKey{pubKeyWrapper1.Bytes}, []bls.SerializedPublicKey{pubKeyWrapper1.Bytes},
blsPriKey1.SignHash(blockHash[:]), blsPriKey1.SignHash(blockHash[:]),
@ -160,7 +160,7 @@ func TestSubmitVoteAggregateSig(test *testing.T) {
aggSig.Add(s) aggSig.Add(s)
} }
} }
if _, err := decider.SubmitVote( if _, err := decider.submitVote(
Prepare, Prepare,
[]bls.SerializedPublicKey{pubKeyWrapper2.Bytes, pubKeyWrapper3.Bytes}, []bls.SerializedPublicKey{pubKeyWrapper2.Bytes, pubKeyWrapper3.Bytes},
aggSig, aggSig,
@ -172,7 +172,7 @@ func TestSubmitVoteAggregateSig(test *testing.T) {
} }
if decider.SignersCount(Prepare) != 3 { if decider.SignersCount(Prepare) != 3 {
test.Fatal("SubmitVote failed") test.Fatal("submitVote failed")
} }
aggSig.Add(blsPriKey1.SignHash(blockHash[:])) aggSig.Add(blsPriKey1.SignHash(blockHash[:]))
@ -180,7 +180,7 @@ func TestSubmitVoteAggregateSig(test *testing.T) {
test.Fatal("AggregateVotes failed") test.Fatal("AggregateVotes failed")
} }
if _, err := decider.SubmitVote( if _, err := decider.submitVote(
Prepare, Prepare,
[]bls.SerializedPublicKey{pubKeyWrapper2.Bytes}, []bls.SerializedPublicKey{pubKeyWrapper2.Bytes},
aggSig, aggSig,

@ -81,7 +81,8 @@ type ParticipantTracker interface {
// SignatoryTracker .. // SignatoryTracker ..
type SignatoryTracker interface { type SignatoryTracker interface {
ParticipantTracker ParticipantTracker
SubmitVote( // This func shouldn't be called directly from outside of quorum. Use AddNewVote instead.
submitVote(
p Phase, pubkeys []bls.SerializedPublicKey, p Phase, pubkeys []bls.SerializedPublicKey,
sig *bls_core.Sign, headerHash common.Hash, sig *bls_core.Sign, headerHash common.Hash,
height, viewID uint64, height, viewID uint64,
@ -118,6 +119,7 @@ type Decider interface {
DependencyInjectionWriter DependencyInjectionWriter
SetVoters(subCommittee *shard.Committee, epoch *big.Int) (*TallyResult, error) SetVoters(subCommittee *shard.Committee, epoch *big.Int) (*TallyResult, error)
Policy() Policy Policy() Policy
// Add new vote will add the signature in the memory and increase the cumulative voting power
AddNewVote( AddNewVote(
p Phase, pubkeys []*bls_cosi.PublicKeyWrapper, p Phase, pubkeys []*bls_cosi.PublicKeyWrapper,
sig *bls_core.Sign, headerHash common.Hash, sig *bls_core.Sign, headerHash common.Hash,
@ -273,7 +275,7 @@ func (s *cIdentities) SignersCount(p Phase) int64 {
} }
} }
func (s *cIdentities) SubmitVote( func (s *cIdentities) submitVote(
p Phase, pubkeys []bls.SerializedPublicKey, p Phase, pubkeys []bls.SerializedPublicKey,
sig *bls_core.Sign, headerHash common.Hash, sig *bls_core.Sign, headerHash common.Hash,
height, viewID uint64, height, viewID uint64,

@ -114,10 +114,6 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Msg("Wrong BlockNum Received, ignoring!") Msg("Wrong BlockNum Received, ignoring!")
return return
} }
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Warn().Msgf("[OnPrepared] low consensus block number. Spin sync")
consensus.spinUpStateSync()
}
// check validity of prepared signature // check validity of prepared signature
blockHash := recvMsg.BlockHash blockHash := recvMsg.BlockHash
@ -153,6 +149,12 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
if !consensus.onPreparedSanityChecks(&blockObj, recvMsg) { if !consensus.onPreparedSanityChecks(&blockObj, recvMsg) {
return return
} }
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Warn().Msgf("[OnPrepared] low consensus block number. Spin sync")
consensus.spinUpStateSync()
}
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
@ -236,10 +238,6 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
if !consensus.isRightBlockNumCheck(recvMsg) { if !consensus.isRightBlockNumCheck(recvMsg) {
return return
} }
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync")
consensus.spinUpStateSync()
}
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil { if err != nil {
@ -272,6 +270,11 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.FBFTLog.AddMessage(recvMsg) consensus.FBFTLog.AddMessage(recvMsg)
if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync")
consensus.spinUpStateSync()
}
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()

@ -131,9 +131,8 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
if curTimestamp <= blockTimestamp { if curTimestamp <= blockTimestamp {
return consensus.fallbackNextViewID() return consensus.fallbackNextViewID()
} }
totalNode := consensus.Decider.ParticipantsCount() // diff only increases
// diff is at least 1, and it won't exceed the totalNode diff := uint64((curTimestamp - blockTimestamp) / viewChangeTimeout)
diff := uint64(((curTimestamp - blockTimestamp) / viewChangeTimeout) % int64(totalNode))
nextViewID := diff + consensus.GetCurBlockViewID() nextViewID := diff + consensus.GetCurBlockViewID()
consensus.getLogger().Info(). consensus.getLogger().Info().

Loading…
Cancel
Save