diff --git a/go.sum b/go.sum index 246315957..577154a1b 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.27 h1:0Ctpu9VBXVCkKno6vVNBgUTyo9W9bG7SZuAhQr/4H8Y= -github.com/openimsdk/protocol v0.0.69-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc= +github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0= github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/api/init.go b/internal/api/init.go index 23866c4a0..679d901f4 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -29,7 +29,6 @@ import ( "time" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" - ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -72,10 +71,10 @@ func Start(ctx context.Context, index int, config *Config) error { netDone <- struct{}{} return } - p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) - p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) - if err = p.Use(router); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) + srv := http.NewServeMux() + srv.Handle(prommetrics.ApiPath, prommetrics.ApiHandler()) + if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed { + netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) netDone <- struct{}{} } }() diff --git a/internal/api/router.go b/internal/api/router.go index 0f46f26ba..0a42d82bd 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -2,12 +2,10 @@ package api import ( "fmt" - "net/http" - "strings" - "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" @@ -17,8 +15,25 @@ import ( "github.com/openimsdk/tools/mw" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "net/http" + "strings" ) +func prommetricsGin() gin.HandlerFunc { + return func(c *gin.Context) { + c.Next() + path := c.FullPath() + if c.Writer.Status() == http.StatusNotFound { + prommetrics.HttpCall("<404>", c.Request.Method, c.Writer.Status()) + } else { + prommetrics.HttpCall(path, c.Request.Method, c.Writer.Status()) + } + if resp := apiresp.GetGinApiResponse(c); resp != nil { + prommetrics.APICall(path, c.Request.Method, resp.ErrCode) + } + } +} + func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine { disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) @@ -37,7 +52,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) - r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc)) + r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc)) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) userRouterGroup := r.Group("/user") diff --git a/pkg/common/ginprometheus/ginprometheus.go b/pkg/common/ginprometheus/ginprometheus.go index c2e6bdcca..64f8a0d8a 100644 --- a/pkg/common/ginprometheus/ginprometheus.go +++ b/pkg/common/ginprometheus/ginprometheus.go @@ -14,430 +14,431 @@ package ginprometheus -import ( - "bytes" - "fmt" - "io" - "net/http" - "os" - "strconv" - "time" - - "github.com/gin-gonic/gin" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -var defaultMetricPath = "/metrics" - -// counter, counter_vec, gauge, gauge_vec, -// histogram, histogram_vec, summary, summary_vec. -var ( - reqCounter = &Metric{ - ID: "reqCnt", - Name: "requests_total", - Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", - Type: "counter_vec", - Args: []string{"code", "method", "handler", "host", "url"}} - - reqDuration = &Metric{ - ID: "reqDur", - Name: "request_duration_seconds", - Description: "The HTTP request latencies in seconds.", - Type: "histogram_vec", - Args: []string{"code", "method", "url"}, - } - - resSize = &Metric{ - ID: "resSz", - Name: "response_size_bytes", - Description: "The HTTP response sizes in bytes.", - Type: "summary"} - - reqSize = &Metric{ - ID: "reqSz", - Name: "request_size_bytes", - Description: "The HTTP request sizes in bytes.", - Type: "summary"} - - standardMetrics = []*Metric{ - reqCounter, - reqDuration, - resSize, - reqSize, - } -) - -/* -RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control -the cardinality of the request counter's "url" label, which might be required in some contexts. -For instance, if for a "/customer/:name" route you don't want to generate a time series for every -possible customer name, you could use this function: - - func(c *gin.Context) string { - url := c.Request.URL.Path - for _, p := range c.Params { - if p.Key == "name" { - url = strings.Replace(url, p.Value, ":name", 1) - break - } - } - return url - } - -which would map "/customer/alice" and "/customer/bob" to their template "/customer/:name". -*/ -type RequestCounterURLLabelMappingFn func(c *gin.Context) string - -// Metric is a definition for the name, description, type, ID, and -// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric. -type Metric struct { - MetricCollector prometheus.Collector - ID string - Name string - Description string - Type string - Args []string -} - -// Prometheus contains the metrics gathered by the instance and its path. -type Prometheus struct { - reqCnt *prometheus.CounterVec - reqDur *prometheus.HistogramVec - reqSz, resSz prometheus.Summary - router *gin.Engine - listenAddress string - Ppg PrometheusPushGateway - - MetricsList []*Metric - MetricsPath string - - ReqCntURLLabelMappingFn RequestCounterURLLabelMappingFn - - // gin.Context string to use as a prometheus URL label - URLLabelFromContext string -} - -// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional). -type PrometheusPushGateway struct { - - // Push interval in seconds - PushIntervalSeconds time.Duration - - // Push Gateway URL in format http://domain:port - // where JOBNAME can be any string of your choice - PushGatewayURL string - - // Local metrics URL where metrics are fetched from, this could be omitted in the future - // if implemented using prometheus common/expfmt instead - MetricsURL string - - // pushgateway job name, defaults to "gin" - Job string -} - -// NewPrometheus generates a new set of metrics with a certain subsystem name. -func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus { - if subsystem == "" { - subsystem = "app" - } - - var metricsList []*Metric - - if len(customMetricsList) > 1 { - panic("Too many args. NewPrometheus( string, ).") - } else if len(customMetricsList) == 1 { - metricsList = customMetricsList[0] - } - metricsList = append(metricsList, standardMetrics...) - - p := &Prometheus{ - MetricsList: metricsList, - MetricsPath: defaultMetricPath, - ReqCntURLLabelMappingFn: func(c *gin.Context) string { - return c.FullPath() // e.g. /user/:id , /user/:id/info - }, - } - - p.registerMetrics(subsystem) - - return p -} - -// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL -// every pushIntervalSeconds. Metrics are fetched from metricsURL. -func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) { - p.Ppg.PushGatewayURL = pushGatewayURL - p.Ppg.MetricsURL = metricsURL - p.Ppg.PushIntervalSeconds = pushIntervalSeconds - p.startPushTicker() -} - -// SetPushGatewayJob job name, defaults to "gin". -func (p *Prometheus) SetPushGatewayJob(j string) { - p.Ppg.Job = j -} - -// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the -// same address of the gin engine that is being used. -func (p *Prometheus) SetListenAddress(address string) { - p.listenAddress = address - if p.listenAddress != "" { - p.router = gin.Default() - } -} - -// SetListenAddressWithRouter for using a separate router to expose metrics. (this keeps things like GET /metrics out of -// your content's access log). -func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Engine) { - p.listenAddress = listenAddress - if len(p.listenAddress) > 0 { - p.router = r - } -} - -// SetMetricsPath set metrics paths. -func (p *Prometheus) SetMetricsPath(e *gin.Engine) error { - - if p.listenAddress != "" { - p.router.GET(p.MetricsPath, prometheusHandler()) - return p.runServer() - } else { - e.GET(p.MetricsPath, prometheusHandler()) - return nil - } -} - -// SetMetricsPathWithAuth set metrics paths with authentication. -func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) error { - - if p.listenAddress != "" { - p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) - return p.runServer() - } else { - e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) - return nil - } - -} - -func (p *Prometheus) runServer() error { - return p.router.Run(p.listenAddress) -} - -func (p *Prometheus) getMetrics() []byte { - response, err := http.Get(p.Ppg.MetricsURL) - if err != nil { - return nil - } - - defer response.Body.Close() - - body, _ := io.ReadAll(response.Body) - return body -} - -var hostname, _ = os.Hostname() - -func (p *Prometheus) getPushGatewayURL() string { - if p.Ppg.Job == "" { - p.Ppg.Job = "gin" - } - return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + hostname -} - -func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) { - req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics)) - if err != nil { - return - } - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - fmt.Println("Error sending to push gateway error:", err.Error()) - } - - resp.Body.Close() -} - -func (p *Prometheus) startPushTicker() { - ticker := time.NewTicker(time.Second * p.Ppg.PushIntervalSeconds) - go func() { - for range ticker.C { - p.sendMetricsToPushGateway(p.getMetrics()) - } - }() -} - -// NewMetric associates prometheus.Collector based on Metric.Type. -func NewMetric(m *Metric, subsystem string) prometheus.Collector { - var metric prometheus.Collector - switch m.Type { - case "counter_vec": - metric = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - m.Args, - ) - case "counter": - metric = prometheus.NewCounter( - prometheus.CounterOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - ) - case "gauge_vec": - metric = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - m.Args, - ) - case "gauge": - metric = prometheus.NewGauge( - prometheus.GaugeOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - ) - case "histogram_vec": - metric = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - m.Args, - ) - case "histogram": - metric = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - ) - case "summary_vec": - metric = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - m.Args, - ) - case "summary": - metric = prometheus.NewSummary( - prometheus.SummaryOpts{ - Subsystem: subsystem, - Name: m.Name, - Help: m.Description, - }, - ) - } - return metric -} - -func (p *Prometheus) registerMetrics(subsystem string) { - for _, metricDef := range p.MetricsList { - metric := NewMetric(metricDef, subsystem) - if err := prometheus.Register(metric); err != nil { - fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error()) - } - - switch metricDef { - case reqCounter: - p.reqCnt = metric.(*prometheus.CounterVec) - case reqDuration: - p.reqDur = metric.(*prometheus.HistogramVec) - case resSize: - p.resSz = metric.(prometheus.Summary) - case reqSize: - p.reqSz = metric.(prometheus.Summary) - } - metricDef.MetricCollector = metric - } -} - -// Use adds the middleware to a gin engine. -func (p *Prometheus) Use(e *gin.Engine) error { - e.Use(p.HandlerFunc()) - return p.SetMetricsPath(e) -} - -// UseWithAuth adds the middleware to a gin engine with BasicAuth. -func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) error { - e.Use(p.HandlerFunc()) - return p.SetMetricsPathWithAuth(e, accounts) -} - -// HandlerFunc defines handler function for middleware. -func (p *Prometheus) HandlerFunc() gin.HandlerFunc { - return func(c *gin.Context) { - if c.Request.URL.Path == p.MetricsPath { - c.Next() - return - } - - start := time.Now() - reqSz := computeApproximateRequestSize(c.Request) - - c.Next() - - status := strconv.Itoa(c.Writer.Status()) - elapsed := float64(time.Since(start)) / float64(time.Second) - resSz := float64(c.Writer.Size()) - - url := p.ReqCntURLLabelMappingFn(c) - if len(p.URLLabelFromContext) > 0 { - u, found := c.Get(p.URLLabelFromContext) - if !found { - u = "unknown" - } - url = u.(string) - } - p.reqDur.WithLabelValues(status, c.Request.Method, url).Observe(elapsed) - p.reqCnt.WithLabelValues(status, c.Request.Method, c.HandlerName(), c.Request.Host, url).Inc() - p.reqSz.Observe(float64(reqSz)) - p.resSz.Observe(resSz) - } -} - -func prometheusHandler() gin.HandlerFunc { - h := promhttp.Handler() - return func(c *gin.Context) { - h.ServeHTTP(c.Writer, c.Request) - } -} - -func computeApproximateRequestSize(r *http.Request) int { - var s int - if r.URL != nil { - s = len(r.URL.Path) - } - - s += len(r.Method) - s += len(r.Proto) - for name, values := range r.Header { - s += len(name) - for _, value := range values { - s += len(value) - } - } - s += len(r.Host) - - // r.FormData and r.MultipartForm are assumed to be included in r.URL. - - if r.ContentLength != -1 { - s += int(r.ContentLength) - } - return s -} +// +//import ( +// "bytes" +// "fmt" +// "io" +// "net/http" +// "os" +// "strconv" +// "time" +// +// "github.com/gin-gonic/gin" +// "github.com/prometheus/client_golang/prometheus" +// "github.com/prometheus/client_golang/prometheus/promhttp" +//) +// +//var defaultMetricPath = "/metrics" +// +//// counter, counter_vec, gauge, gauge_vec, +//// histogram, histogram_vec, summary, summary_vec. +//var ( +// reqCounter = &Metric{ +// ID: "reqCnt", +// Name: "requests_total", +// Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", +// Type: "counter_vec", +// Args: []string{"code", "method", "handler", "host", "url"}} +// +// reqDuration = &Metric{ +// ID: "reqDur", +// Name: "request_duration_seconds", +// Description: "The HTTP request latencies in seconds.", +// Type: "histogram_vec", +// Args: []string{"code", "method", "url"}, +// } +// +// resSize = &Metric{ +// ID: "resSz", +// Name: "response_size_bytes", +// Description: "The HTTP response sizes in bytes.", +// Type: "summary"} +// +// reqSize = &Metric{ +// ID: "reqSz", +// Name: "request_size_bytes", +// Description: "The HTTP request sizes in bytes.", +// Type: "summary"} +// +// standardMetrics = []*Metric{ +// reqCounter, +// reqDuration, +// resSize, +// reqSize, +// } +//) +// +///* +//RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control +//the cardinality of the request counter's "url" label, which might be required in some contexts. +//For instance, if for a "/customer/:name" route you don't want to generate a time series for every +//possible customer name, you could use this function: +// +// func(c *gin.Context) string { +// url := c.Request.URL.Path +// for _, p := range c.Params { +// if p.Key == "name" { +// url = strings.Replace(url, p.Value, ":name", 1) +// break +// } +// } +// return url +// } +// +//which would map "/customer/alice" and "/customer/bob" to their template "/customer/:name". +//*/ +//type RequestCounterURLLabelMappingFn func(c *gin.Context) string +// +//// Metric is a definition for the name, description, type, ID, and +//// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric. +//type Metric struct { +// MetricCollector prometheus.Collector +// ID string +// Name string +// Description string +// Type string +// Args []string +//} +// +//// Prometheus contains the metrics gathered by the instance and its path. +//type Prometheus struct { +// reqCnt *prometheus.CounterVec +// reqDur *prometheus.HistogramVec +// reqSz, resSz prometheus.Summary +// router *gin.Engine +// listenAddress string +// Ppg PrometheusPushGateway +// +// MetricsList []*Metric +// MetricsPath string +// +// ReqCntURLLabelMappingFn RequestCounterURLLabelMappingFn +// +// // gin.Context string to use as a prometheus URL label +// URLLabelFromContext string +//} +// +//// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional). +//type PrometheusPushGateway struct { +// +// // Push interval in seconds +// PushIntervalSeconds time.Duration +// +// // Push Gateway URL in format http://domain:port +// // where JOBNAME can be any string of your choice +// PushGatewayURL string +// +// // Local metrics URL where metrics are fetched from, this could be omitted in the future +// // if implemented using prometheus common/expfmt instead +// MetricsURL string +// +// // pushgateway job name, defaults to "gin" +// Job string +//} +// +//// NewPrometheus generates a new set of metrics with a certain subsystem name. +//func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus { +// if subsystem == "" { +// subsystem = "app" +// } +// +// var metricsList []*Metric +// +// if len(customMetricsList) > 1 { +// panic("Too many args. NewPrometheus( string, ).") +// } else if len(customMetricsList) == 1 { +// metricsList = customMetricsList[0] +// } +// metricsList = append(metricsList, standardMetrics...) +// +// p := &Prometheus{ +// MetricsList: metricsList, +// MetricsPath: defaultMetricPath, +// ReqCntURLLabelMappingFn: func(c *gin.Context) string { +// return c.FullPath() // e.g. /user/:id , /user/:id/info +// }, +// } +// +// p.registerMetrics(subsystem) +// +// return p +//} +// +//// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL +//// every pushIntervalSeconds. Metrics are fetched from metricsURL. +//func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) { +// p.Ppg.PushGatewayURL = pushGatewayURL +// p.Ppg.MetricsURL = metricsURL +// p.Ppg.PushIntervalSeconds = pushIntervalSeconds +// p.startPushTicker() +//} +// +//// SetPushGatewayJob job name, defaults to "gin". +//func (p *Prometheus) SetPushGatewayJob(j string) { +// p.Ppg.Job = j +//} +// +//// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the +//// same address of the gin engine that is being used. +//func (p *Prometheus) SetListenAddress(address string) { +// p.listenAddress = address +// if p.listenAddress != "" { +// p.router = gin.Default() +// } +//} +// +//// SetListenAddressWithRouter for using a separate router to expose metrics. (this keeps things like GET /metrics out of +//// your content's access log). +//func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Engine) { +// p.listenAddress = listenAddress +// if len(p.listenAddress) > 0 { +// p.router = r +// } +//} +// +//// SetMetricsPath set metrics paths. +//func (p *Prometheus) SetMetricsPath(e *gin.Engine) error { +// +// if p.listenAddress != "" { +// p.router.GET(p.MetricsPath, prometheusHandler()) +// return p.runServer() +// } else { +// e.GET(p.MetricsPath, prometheusHandler()) +// return nil +// } +//} +// +//// SetMetricsPathWithAuth set metrics paths with authentication. +//func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) error { +// +// if p.listenAddress != "" { +// p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) +// return p.runServer() +// } else { +// e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) +// return nil +// } +// +//} +// +//func (p *Prometheus) runServer() error { +// return p.router.Run(p.listenAddress) +//} +// +//func (p *Prometheus) getMetrics() []byte { +// response, err := http.Get(p.Ppg.MetricsURL) +// if err != nil { +// return nil +// } +// +// defer response.Body.Close() +// +// body, _ := io.ReadAll(response.Body) +// return body +//} +// +//var hostname, _ = os.Hostname() +// +//func (p *Prometheus) getPushGatewayURL() string { +// if p.Ppg.Job == "" { +// p.Ppg.Job = "gin" +// } +// return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + hostname +//} +// +//func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) { +// req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics)) +// if err != nil { +// return +// } +// +// client := &http.Client{} +// resp, err := client.Do(req) +// if err != nil { +// fmt.Println("Error sending to push gateway error:", err.Error()) +// } +// +// resp.Body.Close() +//} +// +//func (p *Prometheus) startPushTicker() { +// ticker := time.NewTicker(time.Second * p.Ppg.PushIntervalSeconds) +// go func() { +// for range ticker.C { +// p.sendMetricsToPushGateway(p.getMetrics()) +// } +// }() +//} +// +//// NewMetric associates prometheus.Collector based on Metric.Type. +//func NewMetric(m *Metric, subsystem string) prometheus.Collector { +// var metric prometheus.Collector +// switch m.Type { +// case "counter_vec": +// metric = prometheus.NewCounterVec( +// prometheus.CounterOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// m.Args, +// ) +// case "counter": +// metric = prometheus.NewCounter( +// prometheus.CounterOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// ) +// case "gauge_vec": +// metric = prometheus.NewGaugeVec( +// prometheus.GaugeOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// m.Args, +// ) +// case "gauge": +// metric = prometheus.NewGauge( +// prometheus.GaugeOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// ) +// case "histogram_vec": +// metric = prometheus.NewHistogramVec( +// prometheus.HistogramOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// m.Args, +// ) +// case "histogram": +// metric = prometheus.NewHistogram( +// prometheus.HistogramOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// ) +// case "summary_vec": +// metric = prometheus.NewSummaryVec( +// prometheus.SummaryOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// m.Args, +// ) +// case "summary": +// metric = prometheus.NewSummary( +// prometheus.SummaryOpts{ +// Subsystem: subsystem, +// Name: m.Name, +// Help: m.Description, +// }, +// ) +// } +// return metric +//} +// +//func (p *Prometheus) registerMetrics(subsystem string) { +// for _, metricDef := range p.MetricsList { +// metric := NewMetric(metricDef, subsystem) +// if err := prometheus.Register(metric); err != nil { +// fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error()) +// } +// +// switch metricDef { +// case reqCounter: +// p.reqCnt = metric.(*prometheus.CounterVec) +// case reqDuration: +// p.reqDur = metric.(*prometheus.HistogramVec) +// case resSize: +// p.resSz = metric.(prometheus.Summary) +// case reqSize: +// p.reqSz = metric.(prometheus.Summary) +// } +// metricDef.MetricCollector = metric +// } +//} +// +//// Use adds the middleware to a gin engine. +//func (p *Prometheus) Use(e *gin.Engine) error { +// e.Use(p.HandlerFunc()) +// return p.SetMetricsPath(e) +//} +// +//// UseWithAuth adds the middleware to a gin engine with BasicAuth. +//func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) error { +// e.Use(p.HandlerFunc()) +// return p.SetMetricsPathWithAuth(e, accounts) +//} +// +//// HandlerFunc defines handler function for middleware. +//func (p *Prometheus) HandlerFunc() gin.HandlerFunc { +// return func(c *gin.Context) { +// if c.Request.URL.Path == p.MetricsPath { +// c.Next() +// return +// } +// +// start := time.Now() +// reqSz := computeApproximateRequestSize(c.Request) +// +// c.Next() +// +// status := strconv.Itoa(c.Writer.Status()) +// elapsed := float64(time.Since(start)) / float64(time.Second) +// resSz := float64(c.Writer.Size()) +// +// url := p.ReqCntURLLabelMappingFn(c) +// if len(p.URLLabelFromContext) > 0 { +// u, found := c.Get(p.URLLabelFromContext) +// if !found { +// u = "unknown" +// } +// url = u.(string) +// } +// p.reqDur.WithLabelValues(status, c.Request.Method, url).Observe(elapsed) +// p.reqCnt.WithLabelValues(status, c.Request.Method, c.HandlerName(), c.Request.Host, url).Inc() +// p.reqSz.Observe(float64(reqSz)) +// p.resSz.Observe(resSz) +// } +//} +// +//func prometheusHandler() gin.HandlerFunc { +// h := promhttp.Handler() +// return func(c *gin.Context) { +// h.ServeHTTP(c.Writer, c.Request) +// } +//} +// +//func computeApproximateRequestSize(r *http.Request) int { +// var s int +// if r.URL != nil { +// s = len(r.URL.Path) +// } +// +// s += len(r.Method) +// s += len(r.Proto) +// for name, values := range r.Header { +// s += len(name) +// for _, value := range values { +// s += len(value) +// } +// } +// s += len(r.Host) +// +// // r.FormData and r.MultipartForm are assumed to be included in r.URL. +// +// if r.ContentLength != -1 { +// s += int(r.ContentLength) +// } +// return s +//} diff --git a/pkg/common/prommetrics/api.go b/pkg/common/prommetrics/api.go new file mode 100644 index 000000000..806198977 --- /dev/null +++ b/pkg/common/prommetrics/api.go @@ -0,0 +1,50 @@ +package prommetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "net/http" + "strconv" +) + +const ApiPath = "/metrics" + +var ( + apiRegistry = prometheus.NewRegistry() + apiCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "api_count", + Help: "Total number of API calls", + }, + []string{"path", "method", "code"}, + ) + httpCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "http_count", + Help: "Total number of HTTP calls", + }, + []string{"path", "method", "status"}, + ) +) + +func init() { + apiRegistry.MustRegister(apiCounter, httpCounter) +} + +func APICall(path string, method string, apiCode int) { + apiCounter.With(prometheus.Labels{"path": path, "method": method, "code": strconv.Itoa(apiCode)}).Inc() +} + +func HttpCall(path string, method string, status int) { + httpCounter.With(prometheus.Labels{"path": path, "method": method, "status": strconv.Itoa(status)}).Inc() +} + +//func ApiHandler() http.Handler { +// return promhttp.InstrumentMetricHandler( +// apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), +// ) +//} + +func ApiHandler() http.Handler { + return promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}) +} diff --git a/pkg/common/prommetrics/gin_api.go b/pkg/common/prommetrics/gin_api.go deleted file mode 100644 index 9f2e4c99d..000000000 --- a/pkg/common/prommetrics/gin_api.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prommetrics - -import ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" - -/* -labels := prometheus.Labels{"label_one": "any", "label_two": "value"} -ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc(). -*/ -var ( - ApiCustomCnt = &ginprom.Metric{ - Name: "custom_total", - Description: "Custom counter events.", - Type: "counter_vec", - Args: []string{"label_one", "label_two"}, - } -) diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 47e5d02b8..3955d8ea5 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -17,7 +17,6 @@ package prommetrics import ( gp "github.com/grpc-ecosystem/go-grpc-prometheus" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" ) @@ -48,11 +47,11 @@ func GetGrpcCusMetrics(registerName string, share *config2.Share) []prometheus.C } } -func GetGinCusMetrics(name string) []*ginprometheus.Metric { - switch name { - case "Api": - return []*ginprometheus.Metric{ApiCustomCnt} - default: - return []*ginprometheus.Metric{ApiCustomCnt} - } -} +//func GetGinCusMetrics(name string) []*ginprometheus.Metric { +// switch name { +// case "Api": +// return []*ginprometheus.Metric{ApiCustomCnt} +// default: +// return []*ginprometheus.Metric{ApiCustomCnt} +// } +//} diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go new file mode 100644 index 000000000..834f76a9f --- /dev/null +++ b/pkg/common/prommetrics/rpc.go @@ -0,0 +1,33 @@ +package prommetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "net/http" + "strconv" +) + +const RpcPath = "/metrics" + +var ( + rpcRegistry = prometheus.NewRegistry() + rpcCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "rpc_count", + Help: "Total number of RPC calls", + }, + []string{"name", "path", "code"}, + ) +) + +func init() { + rpcRegistry.MustRegister(rpcCounter) +} + +func RPCCall(name string, path string, code int) { + rpcCounter.With(prometheus.Labels{"name": name, "path": path, "code": strconv.Itoa(code)}).Inc() +} + +func RPCHandler() http.Handler { + return promhttp.HandlerFor(rpcRegistry, promhttp.HandlerOpts{}) +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index b531daa47..a88797793 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -19,7 +19,7 @@ import ( "fmt" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/utils/datautil" - "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/status" "net" "net/http" "os" @@ -29,7 +29,6 @@ import ( "syscall" "time" - grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -38,7 +37,6 @@ import ( "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/network" - "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -77,13 +75,14 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC return err } - var reg *prometheus.Registry - var metric *grpcprometheus.ServerMetrics + //var reg *prometheus.Registry + //var metric *grpcprometheus.ServerMetrics if prometheusConfig.Enable { - cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) - options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), - grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) + //cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + //reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) + //options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), + // grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) + options = append(options, mw.GrpcServer(), prommetricsInterceptor(rpcRegisterName)) } else { options = append(options, mw.GrpcServer()) } @@ -122,13 +121,19 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC netDone <- struct{}{} return } - metric.InitializeMetrics(srv) - // Create a HTTP server for prometheus. - httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} - if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) + srv := http.NewServeMux() + srv.Handle(prommetrics.RpcPath, prommetrics.RPCHandler()) + if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed { + netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) netDone <- struct{}{} } + //metric.InitializeMetrics(srv) + // Create a HTTP server for prometheus. + //httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} + //if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) + // netDone <- struct{}{} + //} }() } @@ -175,3 +180,21 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error { return nil } } + +func prommetricsInterceptor(rpcRegisterName string) grpc.ServerOption { + getCode := func(err error) int { + if err == nil { + return 0 + } + rpcErr, ok := err.(interface{ GRPCStatus() *status.Status }) + if !ok { + return -1 + } + return int(rpcErr.GRPCStatus().Code()) + } + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + resp, err := handler(ctx, req) + prommetrics.RPCCall(rpcRegisterName, info.FullMethod, getCode(err)) + return resp, err + }) +}