From 477e1f2b6d5d5395d36a25b2dcd6b6e2a74a55d5 Mon Sep 17 00:00:00 2001 From: "lin.huang" Date: Mon, 30 Oct 2023 03:31:26 +0800 Subject: [PATCH] Add monitoring function and struct for Prometheus on gin and GRPC --- pkg/common/ginPrometheus/ginPrometheus.go | 429 +++++++++++++++++++++ pkg/common/prom-metrics/grpc-msggateway.go | 23 ++ pkg/common/prom-metrics/grpcOjbFun.go | 28 ++ pkg/common/prom-metrics/http-api.go | 24 ++ 4 files changed, 504 insertions(+) create mode 100644 pkg/common/ginPrometheus/ginPrometheus.go create mode 100644 pkg/common/prom-metrics/grpc-msggateway.go create mode 100644 pkg/common/prom-metrics/grpcOjbFun.go create mode 100644 pkg/common/prom-metrics/http-api.go diff --git a/pkg/common/ginPrometheus/ginPrometheus.go b/pkg/common/ginPrometheus/ginPrometheus.go new file mode 100644 index 000000000..c14650370 --- /dev/null +++ b/pkg/common/ginPrometheus/ginPrometheus.go @@ -0,0 +1,429 @@ +package ginPrometheus + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "os" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var defaultMetricPath = "/metrics" + +type CusMetrics struct { + MetricsMap map[string]*Metric +} + +func (m *CusMetrics) MetricList() []*Metric { + var ret []*Metric + for _, v := range m.MetricsMap { + ret = append(ret, v) + } + return ret +} + +// counter, counter_vec, gauge, gauge_vec, +// histogram, histogram_vec, summary, summary_vec +var reqCnt = &Metric{ + ID: "reqCnt", + Name: "requests_total", + Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", + Type: "counter_vec", + Args: []string{"code", "method", "handler", "host", "url"}} + +var reqDur = &Metric{ + ID: "reqDur", + Name: "request_duration_seconds", + Description: "The HTTP request latencies in seconds.", + Type: "histogram_vec", + Args: []string{"code", "method", "url"}, +} + +var resSz = &Metric{ + ID: "resSz", + Name: "response_size_bytes", + Description: "The HTTP response sizes in bytes.", + Type: "summary"} + +var reqSz = &Metric{ + ID: "reqSz", + Name: "request_size_bytes", + Description: "The HTTP request sizes in bytes.", + Type: "summary"} + +var standardMetrics = []*Metric{ + reqCnt, + reqDur, + resSz, + reqSz, +} + +/* +RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control +the cardinality of the request counter's "url" label, which might be required in some contexts. +For instance, if for a "/customer/:name" route you don't want to generate a time series for every +possible customer name, you could use this function: + + func(c *gin.Context) string { + url := c.Request.URL.Path + for _, p := range c.Params { + if p.Key == "name" { + url = strings.Replace(url, p.Value, ":name", 1) + break + } + } + return url + } + +which would map "/customer/alice" and "/customer/bob" to their template "/customer/:name". +*/ +type RequestCounterURLLabelMappingFn func(c *gin.Context) string + +// Metric is a definition for the name, description, type, ID, and +// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric +type Metric struct { + MetricCollector prometheus.Collector + ID string + Name string + Description string + Type string + Args []string +} + +// Prometheus contains the metrics gathered by the instance and its path +type Prometheus struct { + reqCnt *prometheus.CounterVec + reqDur *prometheus.HistogramVec + reqSz, resSz prometheus.Summary + router *gin.Engine + listenAddress string + Ppg PrometheusPushGateway + + MetricsList []*Metric + MetricsPath string + + ReqCntURLLabelMappingFn RequestCounterURLLabelMappingFn + + // gin.Context string to use as a prometheus URL label + URLLabelFromContext string +} + +// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional) +type PrometheusPushGateway struct { + + // Push interval in seconds + PushIntervalSeconds time.Duration + + // Push Gateway URL in format http://domain:port + // where JOBNAME can be any string of your choice + PushGatewayURL string + + // Local metrics URL where metrics are fetched from, this could be ommited in the future + // if implemented using prometheus common/expfmt instead + MetricsURL string + + // pushgateway job name, defaults to "gin" + Job string +} + +// NewPrometheus generates a new set of metrics with a certain subsystem name +func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus { + subsystem = "app" + + var metricsList []*Metric + + if len(customMetricsList) > 1 { + panic("Too many args. NewPrometheus( string, ).") + } else if len(customMetricsList) == 1 { + metricsList = customMetricsList[0] + } + + for _, metric := range standardMetrics { + metricsList = append(metricsList, metric) + } + + p := &Prometheus{ + MetricsList: metricsList, + MetricsPath: defaultMetricPath, + ReqCntURLLabelMappingFn: func(c *gin.Context) string { + return c.Request.URL.Path + }, + } + + p.registerMetrics(subsystem) + + return p +} + +// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL +// every pushIntervalSeconds. Metrics are fetched from metricsURL +func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) { + p.Ppg.PushGatewayURL = pushGatewayURL + p.Ppg.MetricsURL = metricsURL + p.Ppg.PushIntervalSeconds = pushIntervalSeconds + p.startPushTicker() +} + +// SetPushGatewayJob job name, defaults to "gin" +func (p *Prometheus) SetPushGatewayJob(j string) { + p.Ppg.Job = j +} + +// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the +// same address of the gin engine that is being used +func (p *Prometheus) SetListenAddress(address string) { + p.listenAddress = address + if p.listenAddress != "" { + p.router = gin.Default() + } +} + +// SetListenAddressWithRouter for using a separate router to expose metrics. (this keeps things like GET /metrics out of +// your content's access log). +func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Engine) { + p.listenAddress = listenAddress + if len(p.listenAddress) > 0 { + p.router = r + } +} + +// SetMetricsPath set metrics paths +func (p *Prometheus) SetMetricsPath(e *gin.Engine) { + + if p.listenAddress != "" { + p.router.GET(p.MetricsPath, prometheusHandler()) + p.runServer() + } else { + e.GET(p.MetricsPath, prometheusHandler()) + } +} + +// SetMetricsPathWithAuth set metrics paths with authentication +func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) { + + if p.listenAddress != "" { + p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) + p.runServer() + } else { + e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) + } + +} + +func (p *Prometheus) runServer() { + if p.listenAddress != "" { + go p.router.Run(p.listenAddress) + } +} + +func (p *Prometheus) getMetrics() []byte { + response, _ := http.Get(p.Ppg.MetricsURL) + + defer response.Body.Close() + body, _ := ioutil.ReadAll(response.Body) + + return body +} + +func (p *Prometheus) getPushGatewayURL() string { + h, _ := os.Hostname() + if p.Ppg.Job == "" { + p.Ppg.Job = "gin" + } + return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + h +} + +func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) { + req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics)) + client := &http.Client{} + if _, err = client.Do(req); err != nil { + fmt.Println("Error sending to push gateway error:", err.Error()) + } +} + +func (p *Prometheus) startPushTicker() { + ticker := time.NewTicker(time.Second * p.Ppg.PushIntervalSeconds) + go func() { + for range ticker.C { + p.sendMetricsToPushGateway(p.getMetrics()) + } + }() +} + +// NewMetric associates prometheus.Collector based on Metric.Type +func NewMetric(m *Metric, subsystem string) prometheus.Collector { + var metric prometheus.Collector + switch m.Type { + case "counter_vec": + metric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "counter": + metric = prometheus.NewCounter( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + case "gauge_vec": + metric = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "gauge": + metric = prometheus.NewGauge( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + case "histogram_vec": + metric = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "histogram": + metric = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + case "summary_vec": + metric = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "summary": + metric = prometheus.NewSummary( + prometheus.SummaryOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + } + return metric +} + +func (p *Prometheus) registerMetrics(subsystem string) { + + for _, metricDef := range p.MetricsList { + metric := NewMetric(metricDef, subsystem) + if err := prometheus.Register(metric); err != nil { + fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error()) + } + switch metricDef { + case reqCnt: + p.reqCnt = metric.(*prometheus.CounterVec) + case reqDur: + p.reqDur = metric.(*prometheus.HistogramVec) + case resSz: + p.resSz = metric.(prometheus.Summary) + case reqSz: + p.reqSz = metric.(prometheus.Summary) + } + metricDef.MetricCollector = metric + } +} + +// Use adds the middleware to a gin engine. +func (p *Prometheus) Use(e *gin.Engine) { + e.Use(p.HandlerFunc()) + p.SetMetricsPath(e) +} + +// UseWithAuth adds the middleware to a gin engine with BasicAuth. +func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) { + e.Use(p.HandlerFunc()) + p.SetMetricsPathWithAuth(e, accounts) +} + +// HandlerFunc defines handler function for middleware +func (p *Prometheus) HandlerFunc() gin.HandlerFunc { + return func(c *gin.Context) { + if c.Request.URL.Path == p.MetricsPath { + c.Next() + return + } + + start := time.Now() + reqSz := computeApproximateRequestSize(c.Request) + + c.Next() + + status := strconv.Itoa(c.Writer.Status()) + elapsed := float64(time.Since(start)) / float64(time.Second) + resSz := float64(c.Writer.Size()) + + url := p.ReqCntURLLabelMappingFn(c) + if len(p.URLLabelFromContext) > 0 { + u, found := c.Get(p.URLLabelFromContext) + if !found { + u = "unknown" + } + url = u.(string) + } + p.reqDur.WithLabelValues(status, c.Request.Method, url).Observe(elapsed) + p.reqCnt.WithLabelValues(status, c.Request.Method, c.HandlerName(), c.Request.Host, url).Inc() + p.reqSz.Observe(float64(reqSz)) + p.resSz.Observe(resSz) + } +} + +func prometheusHandler() gin.HandlerFunc { + h := promhttp.Handler() + return func(c *gin.Context) { + h.ServeHTTP(c.Writer, c.Request) + } +} + +func computeApproximateRequestSize(r *http.Request) int { + s := 0 + if r.URL != nil { + s = len(r.URL.Path) + } + + s += len(r.Method) + s += len(r.Proto) + for name, values := range r.Header { + s += len(name) + for _, value := range values { + s += len(value) + } + } + s += len(r.Host) + + // r.Form and r.MultipartForm are assumed to be included in r.URL. + + if r.ContentLength != -1 { + s += int(r.ContentLength) + } + return s +} diff --git a/pkg/common/prom-metrics/grpc-msggateway.go b/pkg/common/prom-metrics/grpc-msggateway.go new file mode 100644 index 000000000..13398e77f --- /dev/null +++ b/pkg/common/prom-metrics/grpc-msggateway.go @@ -0,0 +1,23 @@ +package prom_metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +/* +labels := prometheus.Labels{"label_one": "any", "label_two": "value"} +G_grpc_msggateway_metrics.MetricsMap["demo_server_say_hello_method_handle_count"].(*prometheus.CounterVec).With(labels).Inc() +*/ +var ( + G_grpc_msggateway_metrics *GrpcCusMetricsMap +) + +func init() { + customizedCounterMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "demo_server_say_hello_method_handle_count", + Help: "Total number of RPCs handled on the server.", + }, []string{"name"}) + tMetrics := make(map[string]prometheus.Collector) + tMetrics["demo_server_say_hello_method_handle_count"] = customizedCounterMetric + G_grpc_msggateway_metrics = &GrpcCusMetricsMap{MetricsMap: tMetrics} +} diff --git a/pkg/common/prom-metrics/grpcOjbFun.go b/pkg/common/prom-metrics/grpcOjbFun.go new file mode 100644 index 000000000..cefcc3fba --- /dev/null +++ b/pkg/common/prom-metrics/grpcOjbFun.go @@ -0,0 +1,28 @@ +package prom_metrics + +import ( + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/prometheus/client_golang/prometheus" +) + +type GrpcCusMetricsMap struct { + MetricsMap map[string]prometheus.Collector +} + +func (m *GrpcCusMetricsMap) MetricList() []prometheus.Collector { + var ret []prometheus.Collector + for _, v := range m.MetricsMap { + ret = append(ret, v) + } + return ret +} + +func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *grpc_prometheus.ServerMetrics, error) { + //////////////////////////////////////////////////////// + reg := prometheus.NewRegistry() + grpcMetrics := grpc_prometheus.NewServerMetrics() + grpcMetrics.EnableHandlingTimeHistogram() + cusMetrics = append(cusMetrics, grpcMetrics, prometheus.NewGoCollector()) + reg.MustRegister(cusMetrics...) + return reg, grpcMetrics, nil +} diff --git a/pkg/common/prom-metrics/http-api.go b/pkg/common/prom-metrics/http-api.go new file mode 100644 index 000000000..ebd3cdb9f --- /dev/null +++ b/pkg/common/prom-metrics/http-api.go @@ -0,0 +1,24 @@ +package prom_metrics + +import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus" + +/* +labels := prometheus.Labels{"label_one": "any", "label_two": "value"} +G_api_metrics.MetricsMap["custom_total"].MetricCollector.(*prometheus.CounterVec).With(labels).Inc() +*/ +var ( + G_api_metrics *ginProm.CusMetrics +) + +func init() { + + CustomCnt := &ginProm.Metric{ + Name: "custom_total", + Description: "Custom counter events.", + Type: "counter_vec", + Args: []string{"label_one", "label_two"}, + } + tMetrics := make(map[string]*ginProm.Metric) + tMetrics["custom_total"] = CustomCnt + G_api_metrics = &ginProm.CusMetrics{MetricsMap: tMetrics} +}