From 282cd1314cb52824721b994f0eb6e44c589510f1 Mon Sep 17 00:00:00 2001 From: flicker-harmony Date: Tue, 6 Aug 2019 22:36:07 +0300 Subject: [PATCH] Add tx pool size metric --- api/service/metrics/service.go | 17 ++++++++++++++++- api/service/metrics/storage.go | 4 ++-- core/tx_pool.go | 5 +++++ node/node_metrics.go | 9 ++++++++- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/api/service/metrics/service.go b/api/service/metrics/service.go index 4337365d0..63e37cfbd 100644 --- a/api/service/metrics/service.go +++ b/api/service/metrics/service.go @@ -26,6 +26,7 @@ const ( NodeBalancePush int = 2 LastConsensusPush int = 3 BlockRewardPush int = 4 + TxPoolPush int = 5 metricsServicePortDifference = 2000 ) @@ -44,6 +45,7 @@ type Service struct { // init vars for prometheus var ( + curTxPoolSize = uint64(0) curBlockHeight = uint64(0) curBlocks = uint64(0) curBalance = big.NewInt(0) @@ -55,6 +57,10 @@ var ( Name: "block_height", Help: "Get current block height.", }) + txPoolGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tx_pool_size", + Help: "Get current tx pool size.", + }) blocksAcceptedGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "blocks_accepted", Help: "Get accepted blocks.", @@ -129,7 +135,7 @@ func (s *Service) Run() { addr := net.JoinHostPort("", GetMetricsServicePort(s.Port)) registry := prometheus.NewRegistry() - registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge) + registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge, txPoolGauge) s.pusher = push.New("http://"+s.PushgatewayIP+":"+s.PushgatewayPort, "node_metrics").Gatherer(registry).Grouping("instance", s.IP+":"+s.Port).Grouping("bls_key", s.BlsPublicKey) go s.PushMetrics() @@ -176,6 +182,13 @@ func UpdateNodeBalance(balance *big.Int) { metricsPush <- NodeBalancePush } +// UpdateTxPoolSize updates tx pool size. +func UpdateTxPoolSize(txPoolSize uint64) { + txPoolGauge.Set(float64(txPoolSize)) + curTxPoolSize = txPoolSize + metricsPush <- TxPoolPush +} + // UpdateBlockReward updates block reward. func UpdateBlockReward(blockReward *big.Int) { blockRewardGauge.Set(FormatBalance(blockReward)) @@ -219,6 +232,8 @@ func (s *Service) PushMetrics() { s.storage.Dump(curBalance, BalancePrefix) case LastConsensusPush: s.storage.Dump(lastConsensusTime, ConsensusTimePrefix) + case TxPoolPush: + s.storage.Dump(curTxPoolSize, TxPoolPrefix) }*/ } return diff --git a/api/service/metrics/storage.go b/api/service/metrics/storage.go index bf57939bb..5b7fc9d7c 100644 --- a/api/service/metrics/storage.go +++ b/api/service/metrics/storage.go @@ -19,7 +19,7 @@ const ( BlockRewardPrefix = "brp" ConnectionsNumberPrefix = "cnp" ConsensusTimePrefix = "ltp" - TransactionsPrefix = "tsp" + TxPoolPrefix = "tpp" ) // GetKey returns key by prefix and pushed time momemnt. @@ -73,7 +73,7 @@ func (storage *Storage) Dump(value interface{}, prefix string) error { } batch := storage.db.NewBatch() // Update database. - if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value))); err != nil { + if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value.(interface{})))); err != nil { utils.Logger().Warn().Err(err).Msgf("Cannot batch %s.", prefix) return err } diff --git a/core/tx_pool.go b/core/tx_pool.go index ef10cb834..8042a4856 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -463,6 +463,11 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { pool.promoteExecutables(nil) } +// GetTxPoolSize returns tx pool size. +func (pool *TxPool) GetTxPoolSize() uint64 { + return uint64(len(pool.pending)) + uint64(len(pool.queue)) +} + // Stop terminates the transaction pool. func (pool *TxPool) Stop() { // Unsubscribe all subscriptions registered from txpool diff --git a/node/node_metrics.go b/node/node_metrics.go index 704e4fcc1..23e62ab6d 100644 --- a/node/node_metrics.go +++ b/node/node_metrics.go @@ -36,6 +36,12 @@ func (node *Node) UpdateConnectionsNumberForMetrics(prevNumPeers int) int { return curNumPeers } +// UpdateTxPoolSizeForMetrics updates tx pool size for metrics service. +func (node *Node) UpdateTxPoolSizeForMetrics(txPoolSize uint64) { + utils.Logger().Info().Msgf("Updating metrics tx pool size %d", txPoolSize) + metrics.UpdateTxPoolSize(txPoolSize) +} + // UpdateBalanceForMetrics uppdates node balance for metrics service. func (node *Node) UpdateBalanceForMetrics(prevBalance *big.Int) *big.Int { curBalance, err := node.GetBalanceOfAddress(node.Consensus.SelfAddress) @@ -65,10 +71,11 @@ func (node *Node) CollectMetrics() { prevBlockHeight := uint64(0) prevBalance := big.NewInt(0) prevLastConsensusTime := int64(0) - for range time.Tick(1000 * time.Millisecond) { + for range time.Tick(100 * time.Millisecond) { prevBlockHeight = node.UpdateBlockHeightForMetrics(prevBlockHeight) prevNumPeers = node.UpdateConnectionsNumberForMetrics(prevNumPeers) prevBalance = node.UpdateBalanceForMetrics(prevBalance) prevLastConsensusTime = node.UpdateLastConsensusTimeForMetrics(prevLastConsensusTime) + node.UpdateTxPoolSizeForMetrics(node.TxPool.GetTxPoolSize()) } }