Merge pull request #922 from harmony-ek/add_signature_grace_period

Add signature grace period
pull/924/head
Eugene Kim 6 years ago committed by GitHub
commit 0d013c17fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      consensus/consensus.go
  2. 8
      consensus/consensus_leader.go
  3. 1
      consensus/consensus_service.go
  4. 104
      consensus/consensus_v2.go
  5. 6
      consensus/view_change.go

@ -37,6 +37,12 @@ var (
// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
ConsensusVersion string
// Consensus round. Increments every time state is reset.
// Useful for delayed processing for the current round,
// such as commit finalization.
round uint64
// pbftLog stores the pbft messages and blocks during PBFT process
pbftLog *PbftLog
// phase: different phase of PBFT protocol: pre-prepare, prepare, commit, finish etc
@ -152,6 +158,9 @@ type Consensus struct {
// If true, this consensus will not propose view change.
disableViewChange bool
// Consensus rounds whose commit phase finished
commitFinishChan chan uint64
}
// StakeInfoFinder returns the stake information finder instance this
@ -182,6 +191,11 @@ func (consensus *Consensus) BlocksSynchronized() {
consensus.syncReadyChan <- struct{}{}
}
// Quorum returns the consensus quorum of the current committee (2f+1).
func (consensus *Consensus) Quorum() int {
return len(consensus.PublicKeys)*2/3 + 1
}
// StakeInfoFinder finds the staking account for the given consensus key.
type StakeInfoFinder interface {
// FindStakeInfoByNodeKey returns a list of staking information matching
@ -250,6 +264,7 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
consensus.MsgChan = make(chan []byte)
consensus.syncReadyChan = make(chan struct{})
consensus.commitFinishChan = make(chan uint64)
// For validators to keep track of all blocks received but not yet committed, so as to catch up to latest consensus if lagged behind.
consensus.blocksReceived = make(map[uint32]*BlockConsensusStatus)

@ -179,7 +179,7 @@ func (consensus *Consensus) processPrepareMessage(message *msg_pb.Message) {
return
}
if len(prepareSigs) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
if len(prepareSigs) >= consensus.Quorum() {
utils.GetLogInstance().Debug("Received additional prepare message", "validatorAddress", validatorAddress)
return
}
@ -202,7 +202,7 @@ func (consensus *Consensus) processPrepareMessage(message *msg_pb.Message) {
prepareBitmap.SetKey(validatorPubKey, true) // Set the bitmap indicating that this validator signed.
targetState := PreparedDone
if len(prepareSigs) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state < targetState {
if len(prepareSigs) >= consensus.Quorum() && consensus.state < targetState {
utils.GetLogInstance().Debug("Enough prepares received with signatures", "num", len(prepareSigs), "state", consensus.state)
// Construct and broadcast prepared message
@ -258,7 +258,7 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) {
return
}
if len((commitSigs)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
if len(commitSigs) >= consensus.Quorum() {
utils.GetLogInstance().Debug("Received additional new commit message", "validatorAddress", validatorAddress)
return
}
@ -282,7 +282,7 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) {
commitBitmap.SetKey(validatorPubKey, true)
targetState := CommittedDone
if len(commitSigs) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState {
if len(commitSigs) >= consensus.Quorum() && consensus.state != targetState {
utils.GetLogInstance().Info("Enough commits received!", "num", len(commitSigs), "state", consensus.state)
// Construct and broadcast committed message

@ -333,6 +333,7 @@ func (consensus *Consensus) GetNilSigsArray() []*bls.Sign {
// ResetState resets the state of the consensus
func (consensus *Consensus) ResetState() {
consensus.round++
consensus.state = Finished
consensus.phase = Announce
consensus.prepareSigs = map[common.Address]*bls.Sign{}

@ -242,7 +242,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if len(prepareSigs) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
if len(prepareSigs) >= consensus.Quorum() {
// already have enough signatures
return
}
@ -270,7 +270,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
prepareSigs[validatorAddress] = &sign
prepareBitmap.SetKey(validatorPubKey, true) // Set the bitmap indicating that this validator signed.
if len(prepareSigs) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
if len(prepareSigs) >= consensus.Quorum() {
consensus.switchPhase(Commit)
// Construct and broadcast prepared message
@ -431,9 +431,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return
}
if len((commitSigs)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
return
}
quorumWasMet := len(commitSigs) >= consensus.Quorum()
// Verify the signature on prepare multi-sig and bitmap is correct
var sign bls.Sign
@ -453,54 +451,64 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
// Set the bitmap indicating that this validator signed.
commitBitmap.SetKey(validatorPubKey, true)
if len(commitSigs) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
quorumIsMet := len(commitSigs) >= consensus.Quorum()
if !quorumWasMet && quorumIsMet {
utils.GetLogInstance().Info("Enough commits received!", "num", len(commitSigs), "state", consensus.state)
consensus.switchPhase(Announce)
go func(round uint64) {
time.Sleep(1 * time.Second)
utils.GetLogger().Debug("Commit grace period ended", "round", round)
consensus.commitFinishChan <- round
}(consensus.round)
}
}
// Construct and broadcast committed message
msgToSend, aggSig := consensus.constructCommittedMessage()
consensus.aggregatedCommitSig = aggSig
func (consensus *Consensus) finalizeCommits() {
utils.GetLogger().Info("finalizing block", "num", len(consensus.commitSigs), "state", consensus.state)
consensus.switchPhase(Announce)
utils.GetLogInstance().Warn("[Consensus]", "sent committed message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
// Construct and broadcast committed message
msgToSend, aggSig := consensus.constructCommittedMessage()
consensus.aggregatedCommitSig = aggSig
var blockObj types.Block
err := rlp.DecodeBytes(consensus.block, &blockObj)
if err != nil {
utils.GetLogInstance().Debug("failed to construct the new block after consensus")
}
utils.GetLogInstance().Warn("[Consensus]", "sent committed message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
// Sign the block
blockObj.SetPrepareSig(
consensus.aggregatedPrepareSig.Serialize(),
consensus.prepareBitmap.Bitmap)
blockObj.SetCommitSig(
consensus.aggregatedCommitSig.Serialize(),
consensus.commitBitmap.Bitmap)
select {
case consensus.VerifiedNewBlock <- &blockObj:
default:
utils.GetLogInstance().Info("[SYNC] Failed to send consensus verified block for state sync", "blockHash", blockObj.Hash())
}
var blockObj types.Block
err := rlp.DecodeBytes(consensus.block, &blockObj)
if err != nil {
utils.GetLogInstance().Debug("failed to construct the new block after consensus")
}
// Sign the block
blockObj.SetPrepareSig(
consensus.aggregatedPrepareSig.Serialize(),
consensus.prepareBitmap.Bitmap)
blockObj.SetCommitSig(
consensus.aggregatedCommitSig.Serialize(),
consensus.commitBitmap.Bitmap)
consensus.reportMetrics(blockObj)
select {
case consensus.VerifiedNewBlock <- &blockObj:
default:
utils.GetLogInstance().Info("[SYNC] Failed to send consensus verified block for state sync", "blockHash", blockObj.Hash())
}
// Dump new block into level db.
explorer.GetStorageInstance(consensus.leader.IP, consensus.leader.Port, true).Dump(&blockObj, consensus.viewID)
consensus.reportMetrics(blockObj)
// Reset state to Finished, and clear other data.
consensus.ResetState()
consensus.viewID++
consensus.blockNum++
// Dump new block into level db.
explorer.GetStorageInstance(consensus.leader.IP, consensus.leader.Port, true).Dump(&blockObj, consensus.viewID)
consensus.OnConsensusDone(&blockObj)
utils.GetLogInstance().Debug("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!", "viewID", consensus.viewID, "numOfSignatures", len(commitSigs))
// Reset state to Finished, and clear other data.
consensus.ResetState()
consensus.viewID++
consensus.blockNum++
// Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- struct{}{}
}
return
consensus.OnConsensusDone(&blockObj)
utils.GetLogInstance().Debug("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!", "viewID", consensus.viewID, "numOfSignatures", len(consensus.commitSigs))
// Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- struct{}{}
}
func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
@ -737,6 +745,16 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
case msg := <-consensus.MsgChan:
consensus.handleMessageUpdate(msg)
case round := <-consensus.commitFinishChan:
func() {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if round == consensus.round {
consensus.finalizeCommits()
}
}()
case <-stopChan:
return
}

@ -224,7 +224,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
}
}
if (len(consensus.bhpSigs) + len(consensus.nilSigs)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
if (len(consensus.bhpSigs) + len(consensus.nilSigs)) >= consensus.Quorum() {
return
}
@ -281,7 +281,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.bhpBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed.
}
if (len(consensus.bhpSigs) + len(consensus.nilSigs)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
if (len(consensus.bhpSigs) + len(consensus.nilSigs)) >= consensus.Quorum() {
consensus.SetMode(Normal)
consensus.LeaderPubKey = consensus.PubKey
if len(consensus.m1Payload) == 0 {
@ -315,7 +315,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.consensusTimeout[timeoutViewChange].Stop()
}
utils.GetLogInstance().Debug("onViewChange", "numSigs", len(consensus.bhpSigs)+len(consensus.nilSigs), "needed", (len(consensus.PublicKeys)*2)/3+1)
utils.GetLogInstance().Debug("onViewChange", "numSigs", len(consensus.bhpSigs)+len(consensus.nilSigs), "needed", consensus.Quorum())
}
// TODO: move to consensus_leader.go later

Loading…
Cancel
Save