check leader for N blocks

pull/4377/head
frozen 2 years ago committed by Casey Gardiner
parent a6d7bdaad1
commit be0f339930
  1. 5
      consensus/consensus.go
  2. 29
      consensus/consensus_service.go
  3. 66
      consensus/consensus_v2.go
  4. 2
      consensus/view_change.go

@ -181,6 +181,11 @@ func (consensus *Consensus) GetLeaderPubKey() *bls_cosi.PublicKeyWrapper {
defer consensus.pubKeyLock.Unlock() defer consensus.pubKeyLock.Unlock()
return consensus.LeaderPubKey return consensus.LeaderPubKey
} }
func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
consensus.pubKeyLock.Lock()
consensus.LeaderPubKey = pub
consensus.pubKeyLock.Unlock()
}
func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys { func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys {
return consensus.priKey return consensus.priKey

@ -77,8 +77,20 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
// TODO: use mutex for updating public keys pointer. No need to lock on all these logic. // TODO: use mutex for updating public keys pointer. No need to lock on all these logic.
consensus.pubKeyLock.Lock() consensus.pubKeyLock.Lock()
consensus.Decider.UpdateParticipants(pubKeys, allowlist) consensus.Decider.UpdateParticipants(pubKeys, allowlist)
allKeys := consensus.Decider.Participants()
consensus.pubKeyLock.Unlock() consensus.pubKeyLock.Unlock()
consensus.getLogger().Info().Msg("My Committee updated") if len(allKeys) != 0 {
first := consensus.Decider.FirstParticipant(
shard.Schedule.InstanceForEpoch(consensus.Blockchain.CurrentHeader().Epoch()))
consensus.pubKeyLock.Lock()
consensus.LeaderPubKey = first
consensus.pubKeyLock.Unlock()
consensus.getLogger().Info().
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("My Leader")
} else {
consensus.getLogger().Error().
Msg("[UpdatePublicKeys] Participants is empty")
}
for i := range pubKeys { for i := range pubKeys {
consensus.getLogger().Info(). consensus.getLogger().Info().
Int("index", i). Int("index", i).
@ -87,20 +99,6 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
} }
if consensus.Blockchain.Config().IsLeaderRotation(consensus.GetCurEpoch()) { if consensus.Blockchain.Config().IsLeaderRotation(consensus.GetCurEpoch()) {
consensus.updateLeader() consensus.updateLeader()
} else {
consensus.pubKeyLock.Lock()
allKeys := consensus.Decider.Participants()
consensus.pubKeyLock.Unlock()
if len(allKeys) != 0 {
consensus.pubKeyLock.Lock()
consensus.LeaderPubKey = &allKeys[0]
consensus.pubKeyLock.Unlock()
consensus.getLogger().Info().
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("My Leader")
} else {
consensus.getLogger().Error().
Msg("[UpdatePublicKeys] Participants is empty")
}
} }
// reset states after update public keys // reset states after update public keys
@ -483,7 +481,6 @@ func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
} }
func (consensus *Consensus) SetCurEpoch(epoch uint64) { func (consensus *Consensus) SetCurEpoch(epoch uint64) {
fmt.Println("SetCurEpoch", epoch)
atomic.StoreUint64(&consensus.epoch, epoch) atomic.StoreUint64(&consensus.epoch, epoch)
} }

@ -4,13 +4,13 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"fmt"
"math/big" "math/big"
"sync/atomic" "sync/atomic"
"time" "time"
bls2 "github.com/harmony-one/bls/ffi/go/bls" bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/signature" "github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
@ -393,7 +393,6 @@ func (consensus *Consensus) Start(
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc()
case newBlock := <-blockChannel: case newBlock := <-blockChannel:
//consensus.ReshardingNextLeader(newBlock)
consensus.getLogger().Info(). consensus.getLogger().Info().
Uint64("MsgBlockNum", newBlock.NumberU64()). Uint64("MsgBlockNum", newBlock.NumberU64()).
Msg("[ConsensusMainLoop] Received Proposed New Block!") Msg("[ConsensusMainLoop] Received Proposed New Block!")
@ -405,7 +404,6 @@ func (consensus *Consensus) Start(
} }
// Sleep to wait for the full block time // Sleep to wait for the full block time
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time")
<-time.After(time.Until(consensus.NextBlockDue)) <-time.After(time.Until(consensus.NextBlockDue))
consensus.StartFinalityCount() consensus.StartFinalityCount()
@ -530,7 +528,6 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl
// preCommitAndPropose commit the current block with 67% commit signatures and start // preCommitAndPropose commit the current block with 67% commit signatures and start
// proposing new block which will wait on the full commit signatures to finish // proposing new block which will wait on the full commit signatures to finish
func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
//fmt.Println("preCommitAndPropose", utils.GetPort(), blk.NumberU64())
if blk == nil { if blk == nil {
return errors.New("block to pre-commit is nil") return errors.New("block to pre-commit is nil")
} }
@ -687,30 +684,48 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
} }
func (consensus *Consensus) updateLeader() { func (consensus *Consensus) updateLeader() {
curBlockViewID := consensus.GetCurBlockViewID()
prev := consensus.GetLeaderPubKey() prev := consensus.GetLeaderPubKey()
epoch := consensus.GetCurEpoch() epoch := consensus.GetCurEpoch()
curNumber := consensus.Blockchain.CurrentHeader().Number().Uint64()
if consensus.Blockchain.Config().IsLeaderRotation(epoch) { if consensus.Blockchain.Config().IsLeaderRotation(epoch) {
epochBlockViewID, err := consensus.getEpochFirstBlockViewID(epoch) leader := consensus.GetLeaderPubKey()
if err != nil { for i := uint64(0); i < 5; i++ {
consensus.getLogger().Error().Err(err).Msgf("[SetupForNewConsensus] Failed to get epoch block viewID for epoch %d", epoch) header := consensus.Blockchain.GetHeaderByNumber(curNumber - i)
return if header == nil {
return
}
// Previous epoch, we should not change leader.
if header.Epoch().Uint64() != epoch.Uint64() {
return
}
// Check if the same leader.
pub, err := chain.GetLeaderPubKeyFromCoinbase(consensus.Blockchain, header)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to get leader public key from coinbase")
return
}
if !pub.Object.IsEqual(leader.Object) {
// Another leader.
return
}
}
// Passed all checks, we can change leader.
var (
wasFound bool
next *bls.PublicKeyWrapper
)
// The same leader for N blocks.
if consensus.Blockchain.Config().IsAllowlistEpoch(epoch) {
wasFound, next = consensus.Decider.NthNextHmyExt(shard.Schedule.InstanceForEpoch(epoch), leader, 1)
} else {
wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, 1)
} }
if epochBlockViewID > curBlockViewID { if !wasFound {
consensus.getLogger().Error().Msg("[SetupForNewConsensus] Epoch block viewID is greater than current block viewID") utils.Logger().Error().Msg("Failed to get next leader")
return return
} else {
consensus.SetLeaderPubKey(next)
} }
diff := curBlockViewID - epochBlockViewID
pps := consensus.Decider.Participants()
fmt.Println("diff ", diff, "epochBlockViewID: ", curBlockViewID, "epochBlockViewID", epochBlockViewID, (int(diff) / 5), len(pps), epoch)
idx := (int(diff) / 5) % len(pps)
consensus.pubKeyLock.Lock()
//fmt.Println("(int(diff)/3)%len(pps) == ", idx)
consensus.LeaderPubKey = &pps[idx]
//fmt.Printf("SetupForNewConsensus :%d idx: %d future v%d new: %s prev: %s %v\n", utils.GetPort(), idx, curBlockViewID, consensus.LeaderPubKey.Bytes.Hex(), prev.Bytes.Hex(), consensus.isLeader())
consensus.pubKeyLock.Unlock()
if consensus.IsLeader() && !consensus.GetLeaderPubKey().Object.IsEqual(prev.Object) { if consensus.IsLeader() && !consensus.GetLeaderPubKey().Object.IsEqual(prev.Object) {
// leader changed // leader changed
go func() { go func() {
@ -729,6 +744,9 @@ func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg
} else { } else {
consensus.SetCurEpoch(blk.Epoch().Uint64()) consensus.SetCurEpoch(blk.Epoch().Uint64())
} }
consensus.pubKeyLock.Lock()
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]
consensus.pubKeyLock.Unlock()
//prev := consensus.GetLeaderPubKey() //prev := consensus.GetLeaderPubKey()
if consensus.Blockchain.Config().IsLeaderRotation(consensus.GetCurEpoch()) { if consensus.Blockchain.Config().IsLeaderRotation(consensus.GetCurEpoch()) {
//consensus.updateLeader() //consensus.updateLeader()
@ -761,10 +779,6 @@ func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg
}() }()
} }
}*/ }*/
} else {
consensus.pubKeyLock.Lock()
consensus.LeaderPubKey = committedMsg.SenderPubkeys[0]
consensus.pubKeyLock.Unlock()
} }
// Update consensus keys at last so the change of leader status doesn't mess up normal flow // Update consensus keys at last so the change of leader status doesn't mess up normal flow

@ -176,7 +176,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe
cur := consensus.GetCurBlockViewID() cur := consensus.GetCurBlockViewID()
if viewID > cur { if viewID > cur {
gap = int(viewID - consensus.GetCurBlockViewID()) gap = int(viewID - cur)
} }
var lastLeaderPubKey *bls.PublicKeyWrapper var lastLeaderPubKey *bls.PublicKeyWrapper
var err error var err error

Loading…
Cancel
Save