The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
woop/api/service/prometheus/service.go

205 lines
5.7 KiB

// Package prometheus defines a service which is used for metrics collection
// and health of a node in Woop.
package prometheus
import (
"context"
"fmt"
"net/http"
"runtime/debug"
"runtime/pprof"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
eth_prometheus "github.com/ethereum/go-ethereum/metrics/prometheus"
"github.com/woop-chain/woop/internal/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/push"
)
// Config is the config for the prometheus service
type Config struct {
Enabled bool
IP string
Port int
EnablePush bool // enable pushgateway support
Gateway string // address of the pushgateway
Network string // network type, used as job prefix
Legacy bool // legacy or not, legacy is woop internal node
NodeType string // node type, validator or exlorer node
Shard uint32 // shard id, used as job suffix
Instance string // identifier of the instance in prometheus metrics
TikvRole string // use for tikv explorer node
}
func (p Config) String() string {
return fmt.Sprintf("%v, %v:%v, %v/%v, %v/%v/%v/%v:%v", p.Enabled, p.IP, p.Port, p.EnablePush, p.Gateway, p.Network, p.Legacy, p.NodeType, p.Shard, p.Instance)
}
func (p Config) IsUsedTiKV() bool {
return p.TikvRole != ""
}
// Service provides Prometheus metrics via the /metrics route. This route will
// show all the metrics registered with the Prometheus DefaultRegisterer.
type Service struct {
server *http.Server
registry *prometheus.Registry
pusher *push.Pusher
failStatus error
config Config
registryOnce sync.Once
}
// Handler represents a path and handler func to serve on the same port as /metrics, /healthz, /goroutinez, etc.
type Handler struct {
Path string
Handler func(http.ResponseWriter, *http.Request)
}
var (
initOnce sync.Once
svc = &Service{}
)
func (s *Service) getJobName() string {
var node string
if s.config.IsUsedTiKV() { // tikv node must be explorer node, eg: te_reader0, te_writer0
node = "te_" + strings.ToLower(s.config.TikvRole)
} else if s.config.Legacy { // legacy nodes are woop nodes: s0,s1,s2,s3
node = "s"
} else {
if s.config.NodeType == "validator" {
// regular validator nodes are: v0,v1,v2,v3
node = "v"
} else {
// explorer nodes are: e0,e1,e2,e3
node = "e"
}
}
return fmt.Sprintf("%s/%s%d", s.config.Network, node, s.config.Shard)
}
// NewService sets up a new instance for a given address host:port.
// Anq empty host will match with any IP so an address like ":19000" is perfectly acceptable.
func NewService(cfg Config, additionalHandlers ...Handler) *Service {
initOnce.Do(func() {
svc = newService(cfg, additionalHandlers...)
})
return svc
}
func newService(cfg Config, additionalHandlers ...Handler) *Service {
if !cfg.Enabled {
utils.Logger().Info().Msg("Prometheus http server disabled...")
return nil
}
utils.Logger().Debug().Str("cfg", cfg.String()).Msg("Prometheus")
svc.config = cfg
if svc.registry == nil {
svc.registry = prometheus.NewRegistry()
}
handler := promhttp.InstrumentMetricHandler(
svc.registry,
promhttp.HandlerFor(svc.registry, promhttp.HandlerOpts{}),
)
mux := http.NewServeMux()
mux.Handle("/metrics", handler)
mux.Handle("/metrics/eth", eth_prometheus.Handler(metrics.DefaultRegistry))
mux.HandleFunc("/goroutinez", svc.goroutinezHandler)
// Register additional handlers.
for _, h := range additionalHandlers {
mux.HandleFunc(h.Path, h.Handler)
}
utils.Logger().Debug().Int("port", svc.config.Port).
Str("ip", svc.config.IP).
Msg("Starting Prometheus server")
endpoint := fmt.Sprintf("%s:%d", svc.config.IP, svc.config.Port)
svc.server = &http.Server{Addr: endpoint, Handler: mux}
return svc
}
// Start start the prometheus service
func (s *Service) Start() error {
go func() {
utils.Logger().Info().Str("address", s.server.Addr).Msg("Starting prometheus service")
err := s.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
utils.Logger().Error().Msgf("Could not listen to host:port :%s: %v", s.server.Addr, err)
s.failStatus = err
}
}()
if s.config.EnablePush {
job := s.getJobName()
utils.Logger().Info().Str("Job", job).Msg("Prometheus enabled pushgateway support ...")
svc.pusher = push.New(s.config.Gateway, job).
Gatherer(svc.registry).
Grouping("instance", s.config.Instance)
// start pusher to push metrics to prometheus pushgateway
// every minute
go func(s *Service) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := s.pusher.Add(); err != nil {
utils.Logger().Warn().Err(err).Msg("Pushgateway Error")
}
}
}
}(s)
}
return nil
}
// Stop stop the Prometheus service
func (s *Service) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return s.server.Shutdown(ctx)
}
func (s *Service) goroutinezHandler(w http.ResponseWriter, _ *http.Request) {
stack := debug.Stack()
if _, err := w.Write(stack); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to write goroutines stack")
}
if err := pprof.Lookup("goroutine").WriteTo(w, 2); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to write pprof goroutines")
}
}
// Status checks for any service failure conditions.
func (s *Service) Status() error {
if s.failStatus != nil {
return s.failStatus
}
return nil
}
func (s *Service) getRegistry() *prometheus.Registry {
s.registryOnce.Do(func() {
if svc.registry == nil {
svc.registry = prometheus.NewRegistry()
}
})
return s.registry
}
// PromRegistry return the registry of prometheus service
func PromRegistry() *prometheus.Registry {
return svc.getRegistry()
}