You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
265 lines
8.0 KiB
265 lines
8.0 KiB
package metrics
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
msg_pb "github.com/harmony-one/harmony/api/proto/message"
|
|
"github.com/harmony-one/harmony/internal/utils"
|
|
"github.com/harmony-one/harmony/p2p"
|
|
libp2p_peer "github.com/libp2p/go-libp2p-peer"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/prometheus/client_golang/prometheus/push"
|
|
)
|
|
|
|
// Constants for metrics service.
|
|
const (
|
|
BalanceScale int = 18
|
|
BalancePrecision int = 13
|
|
ConnectionsNumberPush int = 0
|
|
BlockHeightPush int = 1
|
|
NodeBalancePush int = 2
|
|
LastConsensusPush int = 3
|
|
BlockRewardPush int = 4
|
|
TxPoolPush int = 5
|
|
IsLeaderPush int = 6
|
|
metricsServicePortDifference = 2000
|
|
)
|
|
|
|
// Service is the struct for metrics service.
|
|
type Service struct {
|
|
BlsPublicKey string
|
|
IP string
|
|
Port string
|
|
PushgatewayIP string
|
|
PushgatewayPort string
|
|
GetNodeIDs func() []libp2p_peer.ID
|
|
storage *Storage
|
|
pusher *push.Pusher
|
|
messageChan chan *msg_pb.Message
|
|
}
|
|
|
|
// init vars for prometheus
|
|
var (
|
|
curTxPoolSize = uint64(0)
|
|
curBlockHeight = uint64(0)
|
|
curBlocks = uint64(0)
|
|
curBalance = big.NewInt(0)
|
|
curConnectionsNumber = 0
|
|
curIsLeader = false
|
|
lastBlockReward = big.NewInt(0)
|
|
lastConsensusTime = int64(0)
|
|
metricsPush = make(chan int)
|
|
blockHeightCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "block_height",
|
|
Help: "Get current block height.",
|
|
})
|
|
txPoolGauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "tx_pool_size",
|
|
Help: "Get current tx pool size.",
|
|
})
|
|
isLeaderGauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "is_leader",
|
|
Help: "Is node a leader now.",
|
|
})
|
|
blocksAcceptedGauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "blocks_accepted",
|
|
Help: "Get accepted blocks.",
|
|
})
|
|
connectionsNumberGauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "connections_number",
|
|
Help: "Get current connections number for a node.",
|
|
})
|
|
nodeBalanceCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "node_balance",
|
|
Help: "Get current node balance.",
|
|
})
|
|
lastConsensusGauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "last_consensus",
|
|
Help: "Get last consensus time.",
|
|
})
|
|
blockRewardGauge = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "block_reward",
|
|
Help: "Get last block reward.",
|
|
})
|
|
)
|
|
|
|
// ConnectionsLog struct for connections stats for prometheus
|
|
type ConnectionsLog struct {
|
|
Time int
|
|
ConnectionsNumber int
|
|
}
|
|
|
|
// ConnectionsStatsHTTP struct for returning all connections logs
|
|
type ConnectionsStatsHTTP struct {
|
|
ConnectionsLogs []ConnectionsLog
|
|
}
|
|
|
|
// New returns metrics service.
|
|
func New(selfPeer *p2p.Peer, blsPublicKey, pushgatewayIP, pushgatewayPort string, GetNodeIDs func() []libp2p_peer.ID) *Service {
|
|
return &Service{
|
|
BlsPublicKey: blsPublicKey,
|
|
IP: selfPeer.IP,
|
|
Port: selfPeer.Port,
|
|
PushgatewayIP: pushgatewayIP,
|
|
PushgatewayPort: pushgatewayPort,
|
|
GetNodeIDs: GetNodeIDs,
|
|
}
|
|
}
|
|
|
|
// StartService starts metrics service.
|
|
func (s *Service) StartService() {
|
|
utils.Logger().Info().Msg("Starting metrics service.")
|
|
s.Run()
|
|
}
|
|
|
|
// StopService shutdowns metrics service.
|
|
func (s *Service) StopService() {
|
|
utils.Logger().Info().Msg("Shutting down metrics service.")
|
|
metricsPush <- -1
|
|
}
|
|
|
|
// GetMetricsServicePort returns the port serving metrics service dashboard. This port is metricsServicePortDifference less than the node port.
|
|
func GetMetricsServicePort(nodePort string) string {
|
|
if port, err := strconv.Atoi(nodePort); err == nil {
|
|
return fmt.Sprintf("%d", port-metricsServicePortDifference)
|
|
}
|
|
utils.Logger().Error().Msg("Error on parsing.")
|
|
return ""
|
|
}
|
|
|
|
// Run is to run http serving metrics service.
|
|
func (s *Service) Run() {
|
|
// Init local storage for metrics.
|
|
s.storage = GetStorageInstance(s.IP, s.Port, true)
|
|
// Init address.
|
|
addr := net.JoinHostPort("", GetMetricsServicePort(s.Port))
|
|
|
|
registry := prometheus.NewRegistry()
|
|
registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge, txPoolGauge, isLeaderGauge)
|
|
|
|
s.pusher = push.New("http://"+s.PushgatewayIP+":"+s.PushgatewayPort, "node_metrics").Gatherer(registry).Grouping("instance", s.IP+":"+s.Port).Grouping("bls_key", s.BlsPublicKey)
|
|
go s.PushMetrics()
|
|
// Pull metrics http server
|
|
utils.Logger().Info().Str("port", GetMetricsServicePort(s.Port)).Msg("Listening.")
|
|
go func() {
|
|
http.Handle("/node_metrics", promhttp.Handler())
|
|
if err := http.ListenAndServe(addr, nil); err != nil {
|
|
utils.Logger().Warn().Err(err).Msg("http.ListenAndServe()")
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
// FormatBalance formats big.Int balance with precision.
|
|
func FormatBalance(balance *big.Int) float64 {
|
|
scaledBalance := new(big.Float).Quo(new(big.Float).SetInt(balance), new(big.Float).SetFloat64(math.Pow10(BalanceScale)))
|
|
floatBalance, _ := scaledBalance.Float64()
|
|
return floatBalance
|
|
}
|
|
|
|
// UpdateBlockHeight updates block height.
|
|
func UpdateBlockHeight(blockHeight uint64) {
|
|
blockHeightCounter.Add(float64(blockHeight) - float64(curBlockHeight))
|
|
blocksAcceptedGauge.Set(float64(blockHeight) - float64(curBlockHeight))
|
|
curBlockHeight = blockHeight
|
|
metricsPush <- BlockHeightPush
|
|
}
|
|
|
|
// UpdateNodeBalance updates node balance.
|
|
func UpdateNodeBalance(balance *big.Int) {
|
|
nodeBalanceCounter.Add(FormatBalance(balance) - FormatBalance(curBalance))
|
|
curBalance = balance
|
|
metricsPush <- NodeBalancePush
|
|
}
|
|
|
|
// UpdateTxPoolSize updates tx pool size.
|
|
func UpdateTxPoolSize(txPoolSize uint64) {
|
|
txPoolGauge.Set(float64(txPoolSize))
|
|
curTxPoolSize = txPoolSize
|
|
metricsPush <- TxPoolPush
|
|
}
|
|
|
|
// UpdateBlockReward updates block reward.
|
|
func UpdateBlockReward(blockReward *big.Int) {
|
|
blockRewardGauge.Set(FormatBalance(blockReward))
|
|
lastBlockReward = blockReward
|
|
metricsPush <- BlockRewardPush
|
|
}
|
|
|
|
// UpdateLastConsensus updates last consensus time.
|
|
func UpdateLastConsensus(consensusTime int64) {
|
|
lastConsensusGauge.Set(float64(consensusTime))
|
|
lastConsensusTime = consensusTime
|
|
metricsPush <- LastConsensusPush
|
|
}
|
|
|
|
// UpdateConnectionsNumber updates connections number.
|
|
func UpdateConnectionsNumber(connectionsNumber int) {
|
|
connectionsNumberGauge.Set(float64(connectionsNumber))
|
|
curConnectionsNumber = connectionsNumber
|
|
metricsPush <- ConnectionsNumberPush
|
|
}
|
|
|
|
// UpdateIsLeader updates if node is a leader.
|
|
func UpdateIsLeader(isLeader bool) {
|
|
if isLeader {
|
|
isLeaderGauge.Set(1.0)
|
|
} else {
|
|
isLeaderGauge.Set(0.0)
|
|
}
|
|
curIsLeader = isLeader
|
|
metricsPush <- IsLeaderPush
|
|
}
|
|
|
|
// PushMetrics pushes metrics updates to prometheus pushgateway.
|
|
func (s *Service) PushMetrics() {
|
|
for metricType := range metricsPush {
|
|
if metricType == -1 {
|
|
break
|
|
}
|
|
if err := s.pusher.Add(); err != nil {
|
|
utils.Logger().Error().Err(err).Msg("Could not push to a prometheus pushgateway.")
|
|
// Dump metrics to db if couldn't push to prometheus
|
|
switch metricType {
|
|
case ConnectionsNumberPush:
|
|
s.storage.Dump(curConnectionsNumber, ConnectionsNumberPrefix)
|
|
case BlockHeightPush:
|
|
s.storage.Dump(curBlockHeight, BlockHeightPrefix)
|
|
s.storage.Dump(curBlocks, BlocksPrefix)
|
|
case BlockRewardPush:
|
|
s.storage.Dump(lastBlockReward, BlockHeightPrefix)
|
|
case NodeBalancePush:
|
|
s.storage.Dump(curBalance, BalancePrefix)
|
|
case LastConsensusPush:
|
|
s.storage.Dump(lastConsensusTime, ConsensusTimePrefix)
|
|
case TxPoolPush:
|
|
s.storage.Dump(curTxPoolSize, TxPoolPrefix)
|
|
case IsLeaderPush:
|
|
s.storage.Dump(curIsLeader, IsLeaderPrefix)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// NotifyService notify service
|
|
func (s *Service) NotifyService(params map[string]interface{}) {
|
|
return
|
|
}
|
|
|
|
// SetMessageChan sets up message channel to service.
|
|
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
|
|
s.messageChan = messageChan
|
|
}
|
|
|
|
// APIs for the services.
|
|
func (s *Service) APIs() []rpc.API {
|
|
return nil
|
|
}
|
|
|