commit
2086993573
@ -0,0 +1,240 @@ |
||||
package metrics |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math/big" |
||||
"net" |
||||
"net/http" |
||||
"strconv" |
||||
|
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
msg_pb "github.com/harmony-one/harmony/api/proto/message" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
libp2p_peer "github.com/libp2p/go-libp2p-peer" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promhttp" |
||||
"github.com/prometheus/client_golang/prometheus/push" |
||||
) |
||||
|
||||
// Constants for metrics service.
|
||||
const ( |
||||
BalanceScale int = 18 |
||||
BalancePrecision int = 13 |
||||
ConnectionsNumberPush int = 0 |
||||
BlockHeightPush int = 1 |
||||
NodeBalancePush int = 2 |
||||
LastConsensusPush int = 3 |
||||
BlockRewardPush int = 4 |
||||
metricsServicePortDifference = 2000 |
||||
) |
||||
|
||||
// Service is the struct for metrics service.
|
||||
type Service struct { |
||||
BlsPublicKey string |
||||
IP string |
||||
Port string |
||||
PushgatewayIP string |
||||
PushgatewayPort string |
||||
GetNodeIDs func() []libp2p_peer.ID |
||||
storage *Storage |
||||
pusher *push.Pusher |
||||
messageChan chan *msg_pb.Message |
||||
} |
||||
|
||||
// init vars for prometheus
|
||||
var ( |
||||
curBlockHeight = uint64(0) |
||||
curBlocks = uint64(0) |
||||
curBalance = big.NewInt(0) |
||||
curConnectionsNumber = 0 |
||||
lastBlockReward = big.NewInt(0) |
||||
lastConsensusTime = int64(0) |
||||
metricsPush = make(chan int) |
||||
blockHeightCounter = prometheus.NewCounter(prometheus.CounterOpts{ |
||||
Name: "block_height", |
||||
Help: "Get current block height.", |
||||
}) |
||||
blocksAcceptedGauge = prometheus.NewGauge(prometheus.GaugeOpts{ |
||||
Name: "blocks_accepted", |
||||
Help: "Get accepted blocks.", |
||||
}) |
||||
connectionsNumberGauge = prometheus.NewGauge(prometheus.GaugeOpts{ |
||||
Name: "connections_number", |
||||
Help: "Get current connections number for a node.", |
||||
}) |
||||
nodeBalanceCounter = prometheus.NewCounter(prometheus.CounterOpts{ |
||||
Name: "node_balance", |
||||
Help: "Get current node balance.", |
||||
}) |
||||
lastConsensusGauge = prometheus.NewGauge(prometheus.GaugeOpts{ |
||||
Name: "last_consensus", |
||||
Help: "Get last consensus time.", |
||||
}) |
||||
blockRewardGauge = prometheus.NewGauge(prometheus.GaugeOpts{ |
||||
Name: "block_reward", |
||||
Help: "Get last block reward.", |
||||
}) |
||||
) |
||||
|
||||
// ConnectionsLog struct for connections stats for prometheus
|
||||
type ConnectionsLog struct { |
||||
Time int |
||||
ConnectionsNumber int |
||||
} |
||||
|
||||
// ConnectionsStatsHTTP struct for returning all connections logs
|
||||
type ConnectionsStatsHTTP struct { |
||||
ConnectionsLogs []ConnectionsLog |
||||
} |
||||
|
||||
// New returns metrics service.
|
||||
func New(selfPeer *p2p.Peer, blsPublicKey, pushgatewayIP, pushgatewayPort string, GetNodeIDs func() []libp2p_peer.ID) *Service { |
||||
return &Service{ |
||||
BlsPublicKey: blsPublicKey, |
||||
IP: selfPeer.IP, |
||||
Port: selfPeer.Port, |
||||
PushgatewayIP: pushgatewayIP, |
||||
PushgatewayPort: pushgatewayPort, |
||||
GetNodeIDs: GetNodeIDs, |
||||
} |
||||
} |
||||
|
||||
// StartService starts metrics service.
|
||||
func (s *Service) StartService() { |
||||
utils.Logger().Info().Msg("Starting metrics service.") |
||||
s.Run() |
||||
} |
||||
|
||||
// StopService shutdowns metrics service.
|
||||
func (s *Service) StopService() { |
||||
utils.Logger().Info().Msg("Shutting down metrics service.") |
||||
metricsPush <- -1 |
||||
} |
||||
|
||||
// GetMetricsServicePort returns the port serving metrics service dashboard. This port is metricsServicePortDifference less than the node port.
|
||||
func GetMetricsServicePort(nodePort string) string { |
||||
if port, err := strconv.Atoi(nodePort); err == nil { |
||||
return fmt.Sprintf("%d", port-metricsServicePortDifference) |
||||
} |
||||
utils.Logger().Error().Msg("Error on parsing.") |
||||
return "" |
||||
} |
||||
|
||||
// Run is to run http serving metrics service.
|
||||
func (s *Service) Run() { |
||||
// Init local storage for metrics.
|
||||
s.storage = GetStorageInstance(s.IP, s.Port, true) |
||||
// Init address.
|
||||
addr := net.JoinHostPort("", GetMetricsServicePort(s.Port)) |
||||
|
||||
registry := prometheus.NewRegistry() |
||||
registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge) |
||||
|
||||
s.pusher = push.New("http://"+s.PushgatewayIP+":"+s.PushgatewayPort, "node_metrics").Gatherer(registry).Grouping("instance", s.IP+":"+s.Port).Grouping("bls_key", s.BlsPublicKey) |
||||
go s.PushMetrics() |
||||
// Pull metrics http server
|
||||
utils.Logger().Info().Str("port", GetMetricsServicePort(s.Port)).Msg("Listening.") |
||||
go func() { |
||||
http.Handle("/node_metrics", promhttp.Handler()) |
||||
if err := http.ListenAndServe(addr, nil); err != nil { |
||||
utils.Logger().Warn().Err(err).Msg("http.ListenAndServe()") |
||||
} |
||||
}() |
||||
return |
||||
} |
||||
|
||||
// FormatBalance formats big.Int balance with precision.
|
||||
func FormatBalance(balance *big.Int) float64 { |
||||
stringBalance := balance.String() |
||||
if len(stringBalance) < BalanceScale { |
||||
return 0.0 |
||||
} |
||||
if len(stringBalance) == BalanceScale { |
||||
stringBalance = "0." + stringBalance[len(stringBalance)-BalanceScale:len(stringBalance)-BalancePrecision] |
||||
} else { |
||||
stringBalance = stringBalance[:len(stringBalance)-BalanceScale] + "." + stringBalance[len(stringBalance)-BalanceScale:len(stringBalance)-BalancePrecision] |
||||
} |
||||
if res, err := strconv.ParseFloat(stringBalance, 64); err == nil { |
||||
return res |
||||
} |
||||
return 0.0 |
||||
} |
||||
|
||||
// UpdateBlockHeight updates block height.
|
||||
func UpdateBlockHeight(blockHeight uint64) { |
||||
blockHeightCounter.Add(float64(blockHeight) - float64(curBlockHeight)) |
||||
blocksAcceptedGauge.Set(float64(blockHeight) - float64(curBlockHeight)) |
||||
curBlockHeight = blockHeight |
||||
metricsPush <- BlockHeightPush |
||||
} |
||||
|
||||
// UpdateNodeBalance updates node balance.
|
||||
func UpdateNodeBalance(balance *big.Int) { |
||||
nodeBalanceCounter.Add(FormatBalance(balance) - FormatBalance(curBalance)) |
||||
curBalance = balance |
||||
metricsPush <- NodeBalancePush |
||||
} |
||||
|
||||
// UpdateBlockReward updates block reward.
|
||||
func UpdateBlockReward(blockReward *big.Int) { |
||||
blockRewardGauge.Set(FormatBalance(blockReward)) |
||||
lastBlockReward = blockReward |
||||
metricsPush <- BlockRewardPush |
||||
} |
||||
|
||||
// UpdateLastConsensus updates last consensus time.
|
||||
func UpdateLastConsensus(consensusTime int64) { |
||||
lastConsensusGauge.Set(float64(consensusTime)) |
||||
lastConsensusTime = consensusTime |
||||
metricsPush <- LastConsensusPush |
||||
} |
||||
|
||||
// UpdateConnectionsNumber updates connections number.
|
||||
func UpdateConnectionsNumber(connectionsNumber int) { |
||||
connectionsNumberGauge.Set(float64(connectionsNumber)) |
||||
curConnectionsNumber = connectionsNumber |
||||
metricsPush <- ConnectionsNumberPush |
||||
} |
||||
|
||||
// PushMetrics pushes metrics updates to prometheus pushgateway.
|
||||
func (s *Service) PushMetrics() { |
||||
for metricType := range metricsPush { |
||||
if metricType == -1 { |
||||
break |
||||
} |
||||
if err := s.pusher.Add(); err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Could not push to a prometheus pushgateway.") |
||||
} |
||||
/*switch metricType { |
||||
case ConnectionsNumberPush: |
||||
s.storage.Dump(curConnectionsNumber, ConnectionsNumberPrefix) |
||||
case BlockHeightPush: |
||||
fmt.Println("LOL") |
||||
s.storage.Dump(curBlockHeight, BlockHeightPrefix) |
||||
s.storage.Dump(curBlocks, BlocksPrefix) |
||||
case BlockRewardPush: |
||||
s.storage.Dump(lastBlockReward, BlockHeightPrefix) |
||||
case NodeBalancePush: |
||||
s.storage.Dump(curBalance, BalancePrefix) |
||||
case LastConsensusPush: |
||||
s.storage.Dump(lastConsensusTime, ConsensusTimePrefix) |
||||
}*/ |
||||
} |
||||
return |
||||
} |
||||
|
||||
// NotifyService notify service
|
||||
func (s *Service) NotifyService(params map[string]interface{}) { |
||||
return |
||||
} |
||||
|
||||
// SetMessageChan sets up message channel to service.
|
||||
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { |
||||
s.messageChan = messageChan |
||||
} |
||||
|
||||
// APIs for the services.
|
||||
func (s *Service) APIs() []rpc.API { |
||||
return nil |
||||
} |
@ -0,0 +1,103 @@ |
||||
package metrics |
||||
|
||||
import ( |
||||
"fmt" |
||||
"os" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/rlp" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
) |
||||
|
||||
// Constants for storage.
|
||||
const ( |
||||
BalancePrefix = "bap" |
||||
BlockHeightPrefix = "bhp" |
||||
BlocksPrefix = "bp" |
||||
BlockRewardPrefix = "brp" |
||||
ConnectionsNumberPrefix = "cnp" |
||||
ConsensusTimePrefix = "ltp" |
||||
TransactionsPrefix = "tsp" |
||||
) |
||||
|
||||
// GetKey returns key by prefix and pushed time momemnt.
|
||||
func GetKey(prefix string, moment int64) string { |
||||
return fmt.Sprintf("%s_%d", prefix, moment) |
||||
} |
||||
|
||||
// storage instance
|
||||
var storage *Storage |
||||
var onceMetrics sync.Once |
||||
|
||||
// Storage storage dump the block info into leveldb.
|
||||
type Storage struct { |
||||
db *ethdb.LDBDatabase |
||||
} |
||||
|
||||
// GetStorageInstance returns attack model by using singleton pattern.
|
||||
func GetStorageInstance(ip, port string, remove bool) *Storage { |
||||
onceMetrics.Do(func() { |
||||
storage = &Storage{} |
||||
storage.Init(ip, port, remove) |
||||
}) |
||||
return storage |
||||
} |
||||
|
||||
// Init initializes storage.
|
||||
func (storage *Storage) Init(ip, port string, remove bool) { |
||||
dbFileName := "/.hmy/db-metrics-" + ip + "-" + port |
||||
var err error |
||||
if remove { |
||||
var err = os.RemoveAll(dbFileName) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Failed to remove existing database files.") |
||||
} |
||||
} |
||||
if storage.db, err = ethdb.NewLDBDatabase(dbFileName, 0, 0); err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Failed to create new database.") |
||||
} |
||||
} |
||||
|
||||
// GetDB returns the LDBDatabase of the storage.
|
||||
func (storage *Storage) GetDB() *ethdb.LDBDatabase { |
||||
return storage.db |
||||
} |
||||
|
||||
// Dump data into lvdb by value and prefix.
|
||||
func (storage *Storage) Dump(value interface{}, prefix string) error { |
||||
currentTime := time.Now().Unix() |
||||
utils.Logger().Info().Msgf("Store %s %v at time %d", prefix, value, currentTime) |
||||
if storage.db == nil { |
||||
} |
||||
batch := storage.db.NewBatch() |
||||
// Update database.
|
||||
if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value))); err != nil { |
||||
utils.Logger().Warn().Err(err).Msgf("Cannot batch %s.", prefix) |
||||
return err |
||||
} |
||||
if err := batch.Write(); err != nil { |
||||
utils.Logger().Warn().Err(err).Msg("Cannot write batch.") |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Read returns data list of a particular metric by since, until, prefix, interface.
|
||||
func (storage *Storage) Read(since, until int64, prefix string, varType interface{}) []interface{} { |
||||
dataList := make([]interface{}, 0) |
||||
for i := since; i <= until; i++ { |
||||
data, err := storage.db.Get([]byte(GetKey(prefix, i))) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
decodedData := varType |
||||
if rlp.DecodeBytes(data, decodedData) != nil { |
||||
utils.Logger().Error().Msg("Error on getting data from db.") |
||||
os.Exit(1) |
||||
} |
||||
dataList = append(dataList, decodedData) |
||||
} |
||||
return dataList |
||||
} |
@ -0,0 +1,74 @@ |
||||
package node |
||||
|
||||
import ( |
||||
"math/big" |
||||
"time" |
||||
|
||||
metrics "github.com/harmony-one/harmony/api/service/metrics" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
) |
||||
|
||||
// UpdateBlockHeightForMetrics updates block height for metrics service.
|
||||
func (node *Node) UpdateBlockHeightForMetrics(prevBlockHeight uint64) uint64 { |
||||
curBlock := node.Blockchain().CurrentBlock() |
||||
curBlockHeight := curBlock.NumberU64() |
||||
if curBlockHeight == prevBlockHeight { |
||||
return prevBlockHeight |
||||
} |
||||
utils.Logger().Info().Msgf("Updating metrics block height %d", curBlockHeight) |
||||
metrics.UpdateBlockHeight(curBlockHeight) |
||||
blockReward := node.Consensus.GetBlockReward() |
||||
if blockReward != nil { |
||||
utils.Logger().Info().Msgf("Updating metrics block reward %d", blockReward.Uint64()) |
||||
metrics.UpdateBlockReward(blockReward) |
||||
} |
||||
return curBlockHeight |
||||
} |
||||
|
||||
// UpdateConnectionsNumberForMetrics uppdates connections number for metrics service.
|
||||
func (node *Node) UpdateConnectionsNumberForMetrics(prevNumPeers int) int { |
||||
curNumPeers := node.numPeers |
||||
if curNumPeers == prevNumPeers { |
||||
return prevNumPeers |
||||
} |
||||
utils.Logger().Info().Msgf("Updating metrics connections number %d", curNumPeers) |
||||
metrics.UpdateConnectionsNumber(curNumPeers) |
||||
return curNumPeers |
||||
} |
||||
|
||||
// UpdateBalanceForMetrics uppdates node balance for metrics service.
|
||||
func (node *Node) UpdateBalanceForMetrics(prevBalance *big.Int) *big.Int { |
||||
curBalance, err := node.GetBalanceOfAddress(node.Consensus.SelfAddress) |
||||
if err != nil || curBalance.Cmp(prevBalance) == 0 { |
||||
return prevBalance |
||||
} |
||||
utils.Logger().Info().Msgf("Updating metrics node balance %d", curBalance.Uint64()) |
||||
metrics.UpdateNodeBalance(curBalance) |
||||
return curBalance |
||||
} |
||||
|
||||
// UpdateLastConsensusTimeForMetrics uppdates last consensus reached time for metrics service.
|
||||
func (node *Node) UpdateLastConsensusTimeForMetrics(prevLastConsensusTime int64) int64 { |
||||
lastConsensusTime := node.lastConsensusTime |
||||
if lastConsensusTime == prevLastConsensusTime { |
||||
return prevLastConsensusTime |
||||
} |
||||
utils.Logger().Info().Msgf("Updating metrics last consensus time reached %d", lastConsensusTime) |
||||
metrics.UpdateLastConsensus(lastConsensusTime) |
||||
return lastConsensusTime |
||||
} |
||||
|
||||
// CollectMetrics collects metrics: block height, connections number, node balance, block reward, last consensus, accepted blocks.
|
||||
func (node *Node) CollectMetrics() { |
||||
utils.Logger().Info().Msg("[Metrics Service] Update metrics") |
||||
prevNumPeers := 0 |
||||
prevBlockHeight := uint64(0) |
||||
prevBalance := big.NewInt(0) |
||||
prevLastConsensusTime := int64(0) |
||||
for range time.Tick(1000 * time.Millisecond) { |
||||
prevBlockHeight = node.UpdateBlockHeightForMetrics(prevBlockHeight) |
||||
prevNumPeers = node.UpdateConnectionsNumberForMetrics(prevNumPeers) |
||||
prevBalance = node.UpdateBalanceForMetrics(prevBalance) |
||||
prevLastConsensusTime = node.UpdateLastConsensusTimeForMetrics(prevLastConsensusTime) |
||||
} |
||||
} |
Loading…
Reference in new issue