Merge pull request #3459 from LeoHChen/prometheus_consensus

Prometheus consensus
pull/3464/head
Leo Chen 4 years ago committed by GitHub
commit 37b62bba8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      consensus/consensus_service.go
  2. 11
      consensus/consensus_v2.go
  3. 85
      consensus/metrics.go
  4. 2
      consensus/validator.go
  5. 3
      consensus/view_change.go
  6. 27
      node/node_handler.go

@ -8,6 +8,7 @@ import (
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/ethereum/go-ethereum/common"
@ -512,6 +513,7 @@ func (consensus *Consensus) StartFinalityCount() {
func (consensus *Consensus) FinishFinalityCount() {
d := time.Now().UnixNano()
consensus.finality = (d - consensus.finalityCounter) / 1000000
consensusFinalityHistogram.Observe(float64(consensus.finality))
}
// GetFinality returns the finality time in milliseconds of previous consensus
@ -596,6 +598,27 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
return nil
}
// NumSignaturesIncludedInBlock returns the number of signatures included in the block
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
members := consensus.Decider.Participants()
// TODO(audit): do not reconstruct the Mask
mask, err := bls.NewMask(members, nil)
if err != nil {
return count
}
err = mask.SetMask(block.Header().LastCommitBitmap())
if err != nil {
return count
}
for _, key := range consensus.GetPublicKeys() {
if ok, err := mask.KeyEnabled(key.Bytes); err == nil && ok {
count++
}
}
return count
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logOnce.Do(func() {

@ -22,6 +22,7 @@ import (
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/vdf/src/vdf_go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
var (
@ -112,8 +113,10 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb
}
func (consensus *Consensus) finalCommit() {
numCommits := consensus.Decider.SignersCount(quorum.Commit)
consensus.getLogger().Info().
Int64("NumCommits", consensus.Decider.SignersCount(quorum.Commit)).
Int64("NumCommits", numCommits).
Msg("[finalCommit] Finalizing Consensus")
beforeCatchupNum := consensus.blockNum
@ -212,6 +215,8 @@ func (consensus *Consensus) finalCommit() {
Int("numStakingTxns", len(block.StakingTransactions())).
Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!")
consensus.UpdateLeaderMetrics(float64(numCommits), float64(block.NumberU64()))
// If still the leader, send commit sig/bitmap to finish the new block proposal,
// else, the block proposal will timeout by itself.
if consensus.IsLeader() {
@ -326,6 +331,7 @@ func (consensus *Consensus) Start(
consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer")
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC")
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc()
}
consensus.mutex.Unlock()
@ -334,6 +340,7 @@ func (consensus *Consensus) Start(
consensus.SetBlockNum(consensus.Blockchain.CurrentHeader().Number().Uint64() + 1)
consensus.current.SetMode(Syncing)
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC")
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc()
case newBlock := <-blockChannel:
consensus.getLogger().Info().
@ -625,9 +632,9 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
return errIncorrectSender
}
consensus.FinishFinalityCount()
consensus.PostConsensusJob(blk)
consensus.SetupForNewConsensus(blk, committedMsg)
consensus.FinishFinalityCount()
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()).
Str("hash", blk.Header().Hash().Hex()).
Msg("Added New Block to Blockchain!!!")

@ -0,0 +1,85 @@
package consensus
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// consensusCounterVec is used to keep track of consensus reached
consensusCounterVec = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "consensus",
Name: "bingo",
Help: "counter of consensus",
},
[]string{
"consensus",
},
)
// consensusVCCounterVec is used to keep track of number of view change
consensusVCCounterVec = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "consensus",
Name: "viewchange",
Help: "counter of view chagne",
},
[]string{
"viewchange",
},
)
// consensusSyncCounterVec is used to keep track of consensus syncing state
consensusSyncCounterVec = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "consensus",
Name: "sync",
Help: "counter of blockchain syncing state",
},
[]string{
"consensus",
},
)
// consensusGaugeVec is used to keep track of gauge number of the consensus
consensusGaugeVec = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "consensus",
Name: "signatures",
Help: "number of signatures or commits",
},
[]string{
"consensus",
},
)
// consensusFinalityHistogram is used to keep track of finality
// 10 ExponentialBuckets are in the unit of millisecond:
// 800, 1000, 1250, 1562, 1953, 2441, 3051, 3814, 4768, 5960, inf
consensusFinalityHistogram = promauto.NewHistogram(
prometheus.HistogramOpts{
Namespace: "hmy",
Subsystem: "consensus",
Name: "finality",
Help: "the latency of the finality",
Buckets: prometheus.ExponentialBuckets(800, 1.25, 10),
},
)
)
// UpdateValidatorMetrics will udpate validator metrics
func (consensus *Consensus) UpdateValidatorMetrics(numSig float64, blockNum float64) {
consensusCounterVec.With(prometheus.Labels{"consensus": "bingo"}).Inc()
consensusGaugeVec.With(prometheus.Labels{"consensus": "signatures"}).Set(numSig)
consensusCounterVec.With(prometheus.Labels{"consensus": "signatures"}).Add(numSig)
consensusGaugeVec.With(prometheus.Labels{"consensus": "block_num"}).Set(blockNum)
}
// UpdateLeaderMetrics will udpate leader metrics
func (consensus *Consensus) UpdateLeaderMetrics(numCommits float64, blockNum float64) {
consensusCounterVec.With(prometheus.Labels{"consensus": "hooray"}).Inc()
consensusGaugeVec.With(prometheus.Labels{"consensus": "block_num"}).Set(blockNum)
consensusCounterVec.With(prometheus.Labels{"consensus": "num_commits"}).Add(numCommits)
consensusGaugeVec.With(prometheus.Labels{"consensus": "num_commits"}).Set(numCommits)
}

@ -29,6 +29,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
if !consensus.onAnnounceSanityChecks(recvMsg) {
return
}
consensus.StartFinalityCount()
consensus.getLogger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
@ -55,7 +56,6 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
}
return
}
consensus.StartFinalityCount()
consensus.prepare()
}

@ -15,6 +15,7 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
// MaxViewIDDiff limits the received view ID to only 249 further from the current view ID
@ -240,6 +241,7 @@ func (consensus *Consensus) startViewChange() {
Dur("timeoutDuration", duration).
Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()).
Msg("[startViewChange]")
consensusVCCounterVec.With(prometheus.Labels{"viewchange": "started"}).Inc()
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
defer consensus.consensusTimeout[timeoutViewChange].Start()
@ -544,6 +546,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
Str("newLeaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Msg("new leader changed")
consensus.consensusTimeout[timeoutConsensus].Start()
consensusVCCounterVec.With(prometheus.Labels{"viewchange": "finished"}).Inc()
}
// ResetViewChangeState resets the view change structure

@ -12,7 +12,6 @@ import (
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types"
internal_bls "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
@ -328,26 +327,6 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
return nil
}
func (node *Node) numSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
members := node.Consensus.Decider.Participants()
// TODO(audit): do not reconstruct the Mask
mask, err := internal_bls.NewMask(members, nil)
if err != nil {
return count
}
err = mask.SetMask(block.Header().LastCommitBitmap())
if err != nil {
return count
}
for _, key := range node.Consensus.GetPublicKeys() {
if ok, err := mask.KeyEnabled(key.Bytes); err == nil && ok {
count++
}
}
return count
}
// PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// 1. add the new block to blockchain
// 2. [leader] send new block to the client
@ -367,8 +346,12 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
Str("blockHash", newBlock.Hash().String()).
Int("numTxns", len(newBlock.Transactions())).
Int("numStakingTxns", len(newBlock.StakingTransactions())).
Uint32("numSignatures", node.numSignaturesIncludedInBlock(newBlock)).
Uint32("numSignatures", node.Consensus.NumSignaturesIncludedInBlock(newBlock)).
Msg("BINGO !!! Reached Consensus")
numSig := float64(node.Consensus.NumSignaturesIncludedInBlock(newBlock))
node.Consensus.UpdateValidatorMetrics(numSig, float64(newBlock.NumberU64()))
// 1% of the validator also need to do broadcasting
rand.Seed(time.Now().UTC().UnixNano())
rnd := rand.Intn(100)

Loading…
Cancel
Save