Merge pull request #1296 from flicker-harmony/pr_metrics_service_update

Metrics service update
pull/1303/head
Leo Chen 5 years ago committed by GitHub
commit d57acb9319
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      api/service/config.go
  2. 81
      api/service/metrics/service.go
  3. 11
      api/service/metrics/storage.go
  4. 9
      cmd/harmony/main.go
  5. 5
      core/tx_pool.go
  6. 11
      internal/configs/node/config.go
  7. 6
      node/node.go
  8. 21
      node/node_metrics.go
  9. 13
      node/service_setup.go
  10. 14
      scripts/node.sh

@ -20,6 +20,7 @@ type NodeConfig struct {
Actions map[p2p.GroupID]p2p.ActionType // actions on the groups
PushgatewayIP string // prometheus pushgateway ip
PushgatewayPort string // prometheus pushgateway port
MetricsFlag bool // flag to collect metrics or not
}
// GroupIDShards is a map of ShardGroupID ID

@ -2,6 +2,7 @@ package metrics
import (
"fmt"
"math"
"math/big"
"net"
"net/http"
@ -26,6 +27,8 @@ const (
NodeBalancePush int = 2
LastConsensusPush int = 3
BlockRewardPush int = 4
TxPoolPush int = 5
IsLeaderPush int = 6
metricsServicePortDifference = 2000
)
@ -44,10 +47,12 @@ type Service struct {
// init vars for prometheus
var (
curTxPoolSize = uint64(0)
curBlockHeight = uint64(0)
curBlocks = uint64(0)
curBalance = big.NewInt(0)
curConnectionsNumber = 0
curIsLeader = false
lastBlockReward = big.NewInt(0)
lastConsensusTime = int64(0)
metricsPush = make(chan int)
@ -55,6 +60,14 @@ var (
Name: "block_height",
Help: "Get current block height.",
})
txPoolGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tx_pool_size",
Help: "Get current tx pool size.",
})
isLeaderGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "is_leader",
Help: "Is node a leader now.",
})
blocksAcceptedGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "blocks_accepted",
Help: "Get accepted blocks.",
@ -129,7 +142,7 @@ func (s *Service) Run() {
addr := net.JoinHostPort("", GetMetricsServicePort(s.Port))
registry := prometheus.NewRegistry()
registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge)
registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge, txPoolGauge, isLeaderGauge)
s.pusher = push.New("http://"+s.PushgatewayIP+":"+s.PushgatewayPort, "node_metrics").Gatherer(registry).Grouping("instance", s.IP+":"+s.Port).Grouping("bls_key", s.BlsPublicKey)
go s.PushMetrics()
@ -146,19 +159,9 @@ func (s *Service) Run() {
// FormatBalance formats big.Int balance with precision.
func FormatBalance(balance *big.Int) float64 {
stringBalance := balance.String()
if len(stringBalance) < BalanceScale {
return 0.0
}
if len(stringBalance) == BalanceScale {
stringBalance = "0." + stringBalance[len(stringBalance)-BalanceScale:len(stringBalance)-BalancePrecision]
} else {
stringBalance = stringBalance[:len(stringBalance)-BalanceScale] + "." + stringBalance[len(stringBalance)-BalanceScale:len(stringBalance)-BalancePrecision]
}
if res, err := strconv.ParseFloat(stringBalance, 64); err == nil {
return res
}
return 0.0
scaledBalance := new(big.Float).Quo(new(big.Float).SetInt(balance), new(big.Float).SetFloat64(math.Pow10(BalanceScale)))
floatBalance, _ := scaledBalance.Float64()
return floatBalance
}
// UpdateBlockHeight updates block height.
@ -176,6 +179,13 @@ func UpdateNodeBalance(balance *big.Int) {
metricsPush <- NodeBalancePush
}
// UpdateTxPoolSize updates tx pool size.
func UpdateTxPoolSize(txPoolSize uint64) {
txPoolGauge.Set(float64(txPoolSize))
curTxPoolSize = txPoolSize
metricsPush <- TxPoolPush
}
// UpdateBlockReward updates block reward.
func UpdateBlockReward(blockReward *big.Int) {
blockRewardGauge.Set(FormatBalance(blockReward))
@ -197,6 +207,17 @@ func UpdateConnectionsNumber(connectionsNumber int) {
metricsPush <- ConnectionsNumberPush
}
// UpdateIsLeader updates if node is a leader.
func UpdateIsLeader(isLeader bool) {
if isLeader {
isLeaderGauge.Set(1.0)
} else {
isLeaderGauge.Set(0.0)
}
curIsLeader = isLeader
metricsPush <- IsLeaderPush
}
// PushMetrics pushes metrics updates to prometheus pushgateway.
func (s *Service) PushMetrics() {
for metricType := range metricsPush {
@ -205,21 +226,25 @@ func (s *Service) PushMetrics() {
}
if err := s.pusher.Add(); err != nil {
utils.Logger().Error().Err(err).Msg("Could not push to a prometheus pushgateway.")
// Dump metrics to db if couldn't push to prometheus
switch metricType {
case ConnectionsNumberPush:
s.storage.Dump(curConnectionsNumber, ConnectionsNumberPrefix)
case BlockHeightPush:
s.storage.Dump(curBlockHeight, BlockHeightPrefix)
s.storage.Dump(curBlocks, BlocksPrefix)
case BlockRewardPush:
s.storage.Dump(lastBlockReward, BlockHeightPrefix)
case NodeBalancePush:
s.storage.Dump(curBalance, BalancePrefix)
case LastConsensusPush:
s.storage.Dump(lastConsensusTime, ConsensusTimePrefix)
case TxPoolPush:
s.storage.Dump(curTxPoolSize, TxPoolPrefix)
case IsLeaderPush:
s.storage.Dump(curIsLeader, IsLeaderPrefix)
}
}
/*switch metricType {
case ConnectionsNumberPush:
s.storage.Dump(curConnectionsNumber, ConnectionsNumberPrefix)
case BlockHeightPush:
fmt.Println("LOL")
s.storage.Dump(curBlockHeight, BlockHeightPrefix)
s.storage.Dump(curBlocks, BlocksPrefix)
case BlockRewardPush:
s.storage.Dump(lastBlockReward, BlockHeightPrefix)
case NodeBalancePush:
s.storage.Dump(curBalance, BalancePrefix)
case LastConsensusPush:
s.storage.Dump(lastConsensusTime, ConsensusTimePrefix)
}*/
}
return
}

@ -19,7 +19,8 @@ const (
BlockRewardPrefix = "brp"
ConnectionsNumberPrefix = "cnp"
ConsensusTimePrefix = "ltp"
TransactionsPrefix = "tsp"
IsLeaderPrefix = "ilp"
TxPoolPrefix = "tpp"
)
// GetKey returns key by prefix and pushed time momemnt.
@ -47,7 +48,7 @@ func GetStorageInstance(ip, port string, remove bool) *Storage {
// Init initializes storage.
func (storage *Storage) Init(ip, port string, remove bool) {
dbFileName := "/.hmy/db-metrics-" + ip + "-" + port
dbFileName := "/tmp/db_metrics_" + ip + "_" + port
var err error
if remove {
var err = os.RemoveAll(dbFileName)
@ -67,13 +68,11 @@ func (storage *Storage) GetDB() *ethdb.LDBDatabase {
// Dump data into lvdb by value and prefix.
func (storage *Storage) Dump(value interface{}, prefix string) error {
currentTime := time.Now().Unix()
currentTime := time.Now().UnixNano()
utils.Logger().Info().Msgf("Store %s %v at time %d", prefix, value, currentTime)
if storage.db == nil {
}
batch := storage.db.NewBatch()
// Update database.
if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value))); err != nil {
if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value.(interface{})))); err != nil {
utils.Logger().Warn().Err(err).Msgf("Cannot batch %s.", prefix)
return err
}

@ -119,9 +119,10 @@ var (
// Disable view change.
disableViewChange = flag.Bool("disable_view_change", false, "Do not propose view change (testing only)")
// pushgateway ip and port
pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "metrics view ip")
pushgatewayPort = flag.String("pushgateway_port", "9091", "metrics view port")
// metrics flag to collct meetrics or not, pushgateway ip and port for metrics
metricsFlag = flag.Bool("metrics", false, "Collect and upload node metrics")
pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "Metrics view ip")
pushgatewayPort = flag.String("pushgateway_port", "9091", "Metrics view port")
)
func initSetup() {
@ -244,6 +245,7 @@ func createGlobalConfig() *nodeconfig.ConfigType {
nodeConfig.SetPushgatewayIP(*pushgatewayIP)
nodeConfig.SetPushgatewayPort(*pushgatewayPort)
nodeConfig.SetMetricsFlag(*metricsFlag)
// P2p private key is used for secure message transfer between p2p nodes.
nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile)
@ -307,6 +309,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// Set up prometheus pushgateway for metrics monitoring serivce.
currentNode.NodeConfig.SetPushgatewayIP(nodeConfig.PushgatewayIP)
currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort)
currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag)
if *isExplorer {
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)

@ -463,6 +463,11 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool.promoteExecutables(nil)
}
// GetTxPoolSize returns tx pool size.
func (pool *TxPool) GetTxPoolSize() uint64 {
return uint64(len(pool.pending)) + uint64(len(pool.queue))
}
// Stop terminates the transaction pool.
func (pool *TxPool) Stop() {
// Unsubscribe all subscriptions registered from txpool

@ -76,6 +76,7 @@ type ConfigType struct {
Port string // Port of the node.
IP string // IP of the node.
MetricsFlag bool // collect and upload metrics flag
PushgatewayIP string // metrics pushgateway prometheus ip
PushgatewayPort string // metrics pushgateway prometheus port
StringRole string
@ -176,6 +177,16 @@ func (conf *ConfigType) SetPushgatewayPort(port string) {
conf.PushgatewayPort = port
}
// SetMetricsFlag set the metrics flag
func (conf *ConfigType) SetMetricsFlag(flag bool) {
conf.MetricsFlag = flag
}
// GetMetricsFlag get the metrics flag
func (conf *ConfigType) GetMetricsFlag() bool {
return conf.MetricsFlag
}
// GetPushgatewayIP get the pushgateway ip
func (conf *ConfigType) GetPushgatewayIP() string {
return conf.PushgatewayIP

@ -387,8 +387,10 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
// FIXME (leo): we use beacon client topic as the global topic for now
go node.ReceiveGlobalMessage()
// start the goroutine to collect metrics
go node.CollectMetrics()
// if metrics flag is set start the goroutine to collect metrics
if node.NodeConfig.MetricsFlag {
go node.CollectMetrics()
}
// Setup initial state of syncing.
node.peerRegistrationRecord = make(map[string]*syncConfig)

@ -36,6 +36,12 @@ func (node *Node) UpdateConnectionsNumberForMetrics(prevNumPeers int) int {
return curNumPeers
}
// UpdateTxPoolSizeForMetrics updates tx pool size for metrics service.
func (node *Node) UpdateTxPoolSizeForMetrics(txPoolSize uint64) {
utils.Logger().Info().Msgf("Updating metrics tx pool size %d", txPoolSize)
metrics.UpdateTxPoolSize(txPoolSize)
}
// UpdateBalanceForMetrics uppdates node balance for metrics service.
func (node *Node) UpdateBalanceForMetrics(prevBalance *big.Int) *big.Int {
curBalance, err := node.GetBalanceOfAddress(node.Consensus.SelfAddress)
@ -58,6 +64,17 @@ func (node *Node) UpdateLastConsensusTimeForMetrics(prevLastConsensusTime int64)
return lastConsensusTime
}
// UpdateIsLeaderForMetrics updates if node is a leader now for metrics serivce.
func (node *Node) UpdateIsLeaderForMetrics() {
if node.Consensus.LeaderPubKey.SerializeToHexStr() == node.Consensus.PubKey.SerializeToHexStr() {
utils.Logger().Info().Msgf("Node %s is a leader now", node.Consensus.PubKey.SerializeToHexStr())
metrics.UpdateIsLeader(true)
} else {
utils.Logger().Info().Msgf("Node %s is not a leader now", node.Consensus.PubKey.SerializeToHexStr())
metrics.UpdateIsLeader(false)
}
}
// CollectMetrics collects metrics: block height, connections number, node balance, block reward, last consensus, accepted blocks.
func (node *Node) CollectMetrics() {
utils.Logger().Info().Msg("[Metrics Service] Update metrics")
@ -65,10 +82,12 @@ func (node *Node) CollectMetrics() {
prevBlockHeight := uint64(0)
prevBalance := big.NewInt(0)
prevLastConsensusTime := int64(0)
for range time.Tick(1000 * time.Millisecond) {
for range time.Tick(100 * time.Millisecond) {
prevBlockHeight = node.UpdateBlockHeightForMetrics(prevBlockHeight)
prevNumPeers = node.UpdateConnectionsNumberForMetrics(prevNumPeers)
prevBalance = node.UpdateBalanceForMetrics(prevBalance)
prevLastConsensusTime = node.UpdateLastConsensusTimeForMetrics(prevLastConsensusTime)
node.UpdateTxPoolSizeForMetrics(node.TxPool.GetTxPoolSize())
node.UpdateIsLeaderForMetrics()
}
}

@ -30,7 +30,9 @@ func (node *Node) setupForValidator() {
// Register client support service.
node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port))
// Register new metrics service
node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs))
if node.NodeConfig.GetMetricsFlag() {
node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs))
}
// Register randomness service
// TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection.
@ -51,8 +53,9 @@ func (node *Node) setupForNewNode() {
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetBeaconGroupID(), chanPeer, nil))
// Register new metrics service
node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs))
if node.NodeConfig.GetMetricsFlag() {
node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs))
}
// TODO: how to restart networkinfo and discovery service after receiving shard id info from beacon chain?
}
@ -64,7 +67,9 @@ func (node *Node) setupForClientNode() {
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil))
// Register new metrics service
node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs))
if node.NodeConfig.GetMetricsFlag() {
node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs))
}
}
func (node *Node) setupForExplorerNode() {

@ -102,6 +102,7 @@ usage: ${progname} [-1ch] [-k KEYFILE]
-S run the ${progname} as non-root user (default: run as root)
-p passfile use the given BLS passphrase file
-D do not download Harmony binaries (default: download when start)
-m collect and upload node metrics to harmony prometheus + grafana
example:
@ -121,6 +122,7 @@ start_clean=false
loop=true
run_as_root=true
do_not_download=false
metrics=false
${BLSKEYFILE=}
unset OPTIND OPTARG opt
@ -138,6 +140,7 @@ do
S) run_as_root=false ;;
p) blspass="${OPTARG}";;
D) do_not_download=true;;
m) metrics=true;;
*) err 70 "unhandled option -${OPTARG}";; # EX_SOFTWARE
esac
done
@ -217,6 +220,9 @@ download_binaries || err 69 "initial node software update failed"
NODE_PORT=9000
PUB_IP=
METRICS=
PUSHGATEWAY_IP=
PUSHGATEWAY_PORT=
if [ "$OS" == "Linux" ]; then
if ${run_as_root}; then
@ -361,15 +367,15 @@ do
if [ "$OS" == "Linux" ]; then
# Run Harmony Node
if [ -z "${blspass}" ]; then
echo -n "${passphrase}" | LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin
echo -n "${passphrase}" | LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT
else
LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass}
LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass} -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT
fi
else
if [ -z "${blspass}" ]; then
echo -n "${passphrase}" | DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin
echo -n "${passphrase}" | DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT
else
DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass}
DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass} -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT
fi
fi || msg "node process finished with status $?"
${loop} || break

Loading…
Cancel
Save