diff --git a/api/service/config.go b/api/service/config.go index a31934bbf..95fedae15 100644 --- a/api/service/config.go +++ b/api/service/config.go @@ -20,6 +20,7 @@ type NodeConfig struct { Actions map[p2p.GroupID]p2p.ActionType // actions on the groups PushgatewayIP string // prometheus pushgateway ip PushgatewayPort string // prometheus pushgateway port + MetricsFlag bool // flag to collect metrics or not } // GroupIDShards is a map of ShardGroupID ID diff --git a/api/service/metrics/service.go b/api/service/metrics/service.go index 4337365d0..46cd2ac8d 100644 --- a/api/service/metrics/service.go +++ b/api/service/metrics/service.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "math" "math/big" "net" "net/http" @@ -26,6 +27,8 @@ const ( NodeBalancePush int = 2 LastConsensusPush int = 3 BlockRewardPush int = 4 + TxPoolPush int = 5 + IsLeaderPush int = 6 metricsServicePortDifference = 2000 ) @@ -44,10 +47,12 @@ type Service struct { // init vars for prometheus var ( + curTxPoolSize = uint64(0) curBlockHeight = uint64(0) curBlocks = uint64(0) curBalance = big.NewInt(0) curConnectionsNumber = 0 + curIsLeader = false lastBlockReward = big.NewInt(0) lastConsensusTime = int64(0) metricsPush = make(chan int) @@ -55,6 +60,14 @@ var ( Name: "block_height", Help: "Get current block height.", }) + txPoolGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tx_pool_size", + Help: "Get current tx pool size.", + }) + isLeaderGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "is_leader", + Help: "Is node a leader now.", + }) blocksAcceptedGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "blocks_accepted", Help: "Get accepted blocks.", @@ -129,7 +142,7 @@ func (s *Service) Run() { addr := net.JoinHostPort("", GetMetricsServicePort(s.Port)) registry := prometheus.NewRegistry() - registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge) + registry.MustRegister(blockHeightCounter, connectionsNumberGauge, nodeBalanceCounter, lastConsensusGauge, blockRewardGauge, blocksAcceptedGauge, txPoolGauge, isLeaderGauge) s.pusher = push.New("http://"+s.PushgatewayIP+":"+s.PushgatewayPort, "node_metrics").Gatherer(registry).Grouping("instance", s.IP+":"+s.Port).Grouping("bls_key", s.BlsPublicKey) go s.PushMetrics() @@ -146,19 +159,9 @@ func (s *Service) Run() { // FormatBalance formats big.Int balance with precision. func FormatBalance(balance *big.Int) float64 { - stringBalance := balance.String() - if len(stringBalance) < BalanceScale { - return 0.0 - } - if len(stringBalance) == BalanceScale { - stringBalance = "0." + stringBalance[len(stringBalance)-BalanceScale:len(stringBalance)-BalancePrecision] - } else { - stringBalance = stringBalance[:len(stringBalance)-BalanceScale] + "." + stringBalance[len(stringBalance)-BalanceScale:len(stringBalance)-BalancePrecision] - } - if res, err := strconv.ParseFloat(stringBalance, 64); err == nil { - return res - } - return 0.0 + scaledBalance := new(big.Float).Quo(new(big.Float).SetInt(balance), new(big.Float).SetFloat64(math.Pow10(BalanceScale))) + floatBalance, _ := scaledBalance.Float64() + return floatBalance } // UpdateBlockHeight updates block height. @@ -176,6 +179,13 @@ func UpdateNodeBalance(balance *big.Int) { metricsPush <- NodeBalancePush } +// UpdateTxPoolSize updates tx pool size. +func UpdateTxPoolSize(txPoolSize uint64) { + txPoolGauge.Set(float64(txPoolSize)) + curTxPoolSize = txPoolSize + metricsPush <- TxPoolPush +} + // UpdateBlockReward updates block reward. func UpdateBlockReward(blockReward *big.Int) { blockRewardGauge.Set(FormatBalance(blockReward)) @@ -197,6 +207,17 @@ func UpdateConnectionsNumber(connectionsNumber int) { metricsPush <- ConnectionsNumberPush } +// UpdateIsLeader updates if node is a leader. +func UpdateIsLeader(isLeader bool) { + if isLeader { + isLeaderGauge.Set(1.0) + } else { + isLeaderGauge.Set(0.0) + } + curIsLeader = isLeader + metricsPush <- IsLeaderPush +} + // PushMetrics pushes metrics updates to prometheus pushgateway. func (s *Service) PushMetrics() { for metricType := range metricsPush { @@ -205,21 +226,25 @@ func (s *Service) PushMetrics() { } if err := s.pusher.Add(); err != nil { utils.Logger().Error().Err(err).Msg("Could not push to a prometheus pushgateway.") + // Dump metrics to db if couldn't push to prometheus + switch metricType { + case ConnectionsNumberPush: + s.storage.Dump(curConnectionsNumber, ConnectionsNumberPrefix) + case BlockHeightPush: + s.storage.Dump(curBlockHeight, BlockHeightPrefix) + s.storage.Dump(curBlocks, BlocksPrefix) + case BlockRewardPush: + s.storage.Dump(lastBlockReward, BlockHeightPrefix) + case NodeBalancePush: + s.storage.Dump(curBalance, BalancePrefix) + case LastConsensusPush: + s.storage.Dump(lastConsensusTime, ConsensusTimePrefix) + case TxPoolPush: + s.storage.Dump(curTxPoolSize, TxPoolPrefix) + case IsLeaderPush: + s.storage.Dump(curIsLeader, IsLeaderPrefix) + } } - /*switch metricType { - case ConnectionsNumberPush: - s.storage.Dump(curConnectionsNumber, ConnectionsNumberPrefix) - case BlockHeightPush: - fmt.Println("LOL") - s.storage.Dump(curBlockHeight, BlockHeightPrefix) - s.storage.Dump(curBlocks, BlocksPrefix) - case BlockRewardPush: - s.storage.Dump(lastBlockReward, BlockHeightPrefix) - case NodeBalancePush: - s.storage.Dump(curBalance, BalancePrefix) - case LastConsensusPush: - s.storage.Dump(lastConsensusTime, ConsensusTimePrefix) - }*/ } return } diff --git a/api/service/metrics/storage.go b/api/service/metrics/storage.go index bf57939bb..a379d83e8 100644 --- a/api/service/metrics/storage.go +++ b/api/service/metrics/storage.go @@ -19,7 +19,8 @@ const ( BlockRewardPrefix = "brp" ConnectionsNumberPrefix = "cnp" ConsensusTimePrefix = "ltp" - TransactionsPrefix = "tsp" + IsLeaderPrefix = "ilp" + TxPoolPrefix = "tpp" ) // GetKey returns key by prefix and pushed time momemnt. @@ -47,7 +48,7 @@ func GetStorageInstance(ip, port string, remove bool) *Storage { // Init initializes storage. func (storage *Storage) Init(ip, port string, remove bool) { - dbFileName := "/.hmy/db-metrics-" + ip + "-" + port + dbFileName := "/tmp/db_metrics_" + ip + "_" + port var err error if remove { var err = os.RemoveAll(dbFileName) @@ -67,13 +68,11 @@ func (storage *Storage) GetDB() *ethdb.LDBDatabase { // Dump data into lvdb by value and prefix. func (storage *Storage) Dump(value interface{}, prefix string) error { - currentTime := time.Now().Unix() + currentTime := time.Now().UnixNano() utils.Logger().Info().Msgf("Store %s %v at time %d", prefix, value, currentTime) - if storage.db == nil { - } batch := storage.db.NewBatch() // Update database. - if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value))); err != nil { + if err := batch.Put([]byte(GetKey(prefix, currentTime)), []byte(fmt.Sprintf("%v", value.(interface{})))); err != nil { utils.Logger().Warn().Err(err).Msgf("Cannot batch %s.", prefix) return err } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 8fb0697c6..ae24dda3f 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -119,9 +119,10 @@ var ( // Disable view change. disableViewChange = flag.Bool("disable_view_change", false, "Do not propose view change (testing only)") - // pushgateway ip and port - pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "metrics view ip") - pushgatewayPort = flag.String("pushgateway_port", "9091", "metrics view port") + // metrics flag to collct meetrics or not, pushgateway ip and port for metrics + metricsFlag = flag.Bool("metrics", false, "Collect and upload node metrics") + pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "Metrics view ip") + pushgatewayPort = flag.String("pushgateway_port", "9091", "Metrics view port") ) func initSetup() { @@ -244,6 +245,7 @@ func createGlobalConfig() *nodeconfig.ConfigType { nodeConfig.SetPushgatewayIP(*pushgatewayIP) nodeConfig.SetPushgatewayPort(*pushgatewayPort) + nodeConfig.SetMetricsFlag(*metricsFlag) // P2p private key is used for secure message transfer between p2p nodes. nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile) @@ -307,6 +309,7 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // Set up prometheus pushgateway for metrics monitoring serivce. currentNode.NodeConfig.SetPushgatewayIP(nodeConfig.PushgatewayIP) currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort) + currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag) if *isExplorer { currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode) diff --git a/core/tx_pool.go b/core/tx_pool.go index ef10cb834..8042a4856 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -463,6 +463,11 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { pool.promoteExecutables(nil) } +// GetTxPoolSize returns tx pool size. +func (pool *TxPool) GetTxPoolSize() uint64 { + return uint64(len(pool.pending)) + uint64(len(pool.queue)) +} + // Stop terminates the transaction pool. func (pool *TxPool) Stop() { // Unsubscribe all subscriptions registered from txpool diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index 7de262343..1a2b3278c 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -76,6 +76,7 @@ type ConfigType struct { Port string // Port of the node. IP string // IP of the node. + MetricsFlag bool // collect and upload metrics flag PushgatewayIP string // metrics pushgateway prometheus ip PushgatewayPort string // metrics pushgateway prometheus port StringRole string @@ -176,6 +177,16 @@ func (conf *ConfigType) SetPushgatewayPort(port string) { conf.PushgatewayPort = port } +// SetMetricsFlag set the metrics flag +func (conf *ConfigType) SetMetricsFlag(flag bool) { + conf.MetricsFlag = flag +} + +// GetMetricsFlag get the metrics flag +func (conf *ConfigType) GetMetricsFlag() bool { + return conf.MetricsFlag +} + // GetPushgatewayIP get the pushgateway ip func (conf *ConfigType) GetPushgatewayIP() string { return conf.PushgatewayIP diff --git a/node/node.go b/node/node.go index 6bd08307b..bac68b4d7 100644 --- a/node/node.go +++ b/node/node.go @@ -387,8 +387,10 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc // FIXME (leo): we use beacon client topic as the global topic for now go node.ReceiveGlobalMessage() - // start the goroutine to collect metrics - go node.CollectMetrics() + // if metrics flag is set start the goroutine to collect metrics + if node.NodeConfig.MetricsFlag { + go node.CollectMetrics() + } // Setup initial state of syncing. node.peerRegistrationRecord = make(map[string]*syncConfig) diff --git a/node/node_metrics.go b/node/node_metrics.go index 704e4fcc1..fd55d13c2 100644 --- a/node/node_metrics.go +++ b/node/node_metrics.go @@ -36,6 +36,12 @@ func (node *Node) UpdateConnectionsNumberForMetrics(prevNumPeers int) int { return curNumPeers } +// UpdateTxPoolSizeForMetrics updates tx pool size for metrics service. +func (node *Node) UpdateTxPoolSizeForMetrics(txPoolSize uint64) { + utils.Logger().Info().Msgf("Updating metrics tx pool size %d", txPoolSize) + metrics.UpdateTxPoolSize(txPoolSize) +} + // UpdateBalanceForMetrics uppdates node balance for metrics service. func (node *Node) UpdateBalanceForMetrics(prevBalance *big.Int) *big.Int { curBalance, err := node.GetBalanceOfAddress(node.Consensus.SelfAddress) @@ -58,6 +64,17 @@ func (node *Node) UpdateLastConsensusTimeForMetrics(prevLastConsensusTime int64) return lastConsensusTime } +// UpdateIsLeaderForMetrics updates if node is a leader now for metrics serivce. +func (node *Node) UpdateIsLeaderForMetrics() { + if node.Consensus.LeaderPubKey.SerializeToHexStr() == node.Consensus.PubKey.SerializeToHexStr() { + utils.Logger().Info().Msgf("Node %s is a leader now", node.Consensus.PubKey.SerializeToHexStr()) + metrics.UpdateIsLeader(true) + } else { + utils.Logger().Info().Msgf("Node %s is not a leader now", node.Consensus.PubKey.SerializeToHexStr()) + metrics.UpdateIsLeader(false) + } +} + // CollectMetrics collects metrics: block height, connections number, node balance, block reward, last consensus, accepted blocks. func (node *Node) CollectMetrics() { utils.Logger().Info().Msg("[Metrics Service] Update metrics") @@ -65,10 +82,12 @@ func (node *Node) CollectMetrics() { prevBlockHeight := uint64(0) prevBalance := big.NewInt(0) prevLastConsensusTime := int64(0) - for range time.Tick(1000 * time.Millisecond) { + for range time.Tick(100 * time.Millisecond) { prevBlockHeight = node.UpdateBlockHeightForMetrics(prevBlockHeight) prevNumPeers = node.UpdateConnectionsNumberForMetrics(prevNumPeers) prevBalance = node.UpdateBalanceForMetrics(prevBalance) prevLastConsensusTime = node.UpdateLastConsensusTimeForMetrics(prevLastConsensusTime) + node.UpdateTxPoolSizeForMetrics(node.TxPool.GetTxPoolSize()) + node.UpdateIsLeaderForMetrics() } } diff --git a/node/service_setup.go b/node/service_setup.go index 0f5c078bc..6b11dccb8 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -30,7 +30,9 @@ func (node *Node) setupForValidator() { // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register new metrics service - node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs)) + if node.NodeConfig.GetMetricsFlag() { + node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs)) + } // Register randomness service // TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection. @@ -51,8 +53,9 @@ func (node *Node) setupForNewNode() { // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetBeaconGroupID(), chanPeer, nil)) // Register new metrics service - node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs)) - + if node.NodeConfig.GetMetricsFlag() { + node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs)) + } // TODO: how to restart networkinfo and discovery service after receiving shard id info from beacon chain? } @@ -64,7 +67,9 @@ func (node *Node) setupForClientNode() { // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) // Register new metrics service - node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs)) + if node.NodeConfig.GetMetricsFlag() { + node.serviceManager.RegisterService(service.Metrics, metrics.New(&node.SelfPeer, node.NodeConfig.ConsensusPubKey.SerializeToHexStr(), node.NodeConfig.GetPushgatewayIP(), node.NodeConfig.GetPushgatewayPort(), node.Consensus.GetNodeIDs)) + } } func (node *Node) setupForExplorerNode() { diff --git a/scripts/node.sh b/scripts/node.sh index dae8282a5..789d01a1b 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -102,6 +102,7 @@ usage: ${progname} [-1ch] [-k KEYFILE] -S run the ${progname} as non-root user (default: run as root) -p passfile use the given BLS passphrase file -D do not download Harmony binaries (default: download when start) + -m collect and upload node metrics to harmony prometheus + grafana example: @@ -121,6 +122,7 @@ start_clean=false loop=true run_as_root=true do_not_download=false +metrics=false ${BLSKEYFILE=} unset OPTIND OPTARG opt @@ -138,6 +140,7 @@ do S) run_as_root=false ;; p) blspass="${OPTARG}";; D) do_not_download=true;; + m) metrics=true;; *) err 70 "unhandled option -${OPTARG}";; # EX_SOFTWARE esac done @@ -217,6 +220,9 @@ download_binaries || err 69 "initial node software update failed" NODE_PORT=9000 PUB_IP= +METRICS= +PUSHGATEWAY_IP= +PUSHGATEWAY_PORT= if [ "$OS" == "Linux" ]; then if ${run_as_root}; then @@ -361,15 +367,15 @@ do if [ "$OS" == "Linux" ]; then # Run Harmony Node if [ -z "${blspass}" ]; then - echo -n "${passphrase}" | LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin + echo -n "${passphrase}" | LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT else - LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass} + LD_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass} -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT fi else if [ -z "${blspass}" ]; then - echo -n "${passphrase}" | DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin + echo -n "${passphrase}" | DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass stdin -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT else - DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass} + DYLD_FALLBACK_LIBRARY_PATH=$(pwd) ./harmony -bootnodes $BN_MA -ip $PUB_IP -port $NODE_PORT -is_genesis -blskey_file "${BLSKEYFILE}" -blspass file:${blspass} -metrics $METRICS -pushgateway_ip $PUSHGATEWAY_IP -pushgateway_port $PUSHGATEWAY_PORT fi fi || msg "node process finished with status $?" ${loop} || break