Merge remote-tracking branch 'origin/count' into allmerge

# Conflicts:
#	go.mod
#	go.sum
pull/2409/head
withchao 1 year ago
commit 98b3846d38

@ -262,8 +262,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.27 h1:0Ctpu9VBXVCkKno6vVNBgUTyo9W9bG7SZuAhQr/4H8Y=
github.com/openimsdk/protocol v0.0.69-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc=
github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0=
github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -29,7 +29,6 @@ import (
"time"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
@ -72,10 +71,10 @@ func Start(ctx context.Context, index int, config *Config) error {
netDone <- struct{}{}
return
}
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort))
if err = p.Use(router); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort))
srv := http.NewServeMux()
srv.Handle(prommetrics.ApiPath, prommetrics.ApiHandler())
if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort))
netDone <- struct{}{}
}
}()

@ -2,12 +2,10 @@ package api
import (
"fmt"
"net/http"
"strings"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
@ -17,8 +15,25 @@ import (
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"net/http"
"strings"
)
func prommetricsGin() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
path := c.FullPath()
if c.Writer.Status() == http.StatusNotFound {
prommetrics.HttpCall("<404>", c.Request.Method, c.Writer.Status())
} else {
prommetrics.HttpCall(path, c.Request.Method, c.Writer.Status())
}
if resp := apiresp.GetGinApiResponse(c); resp != nil {
prommetrics.APICall(path, c.Request.Method, resp.ErrCode)
}
}
}
func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
@ -37,7 +52,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL)
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
userRouterGroup := r.Group("/user")

@ -14,430 +14,431 @@
package ginprometheus
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var defaultMetricPath = "/metrics"
// counter, counter_vec, gauge, gauge_vec,
// histogram, histogram_vec, summary, summary_vec.
var (
reqCounter = &Metric{
ID: "reqCnt",
Name: "requests_total",
Description: "How many HTTP requests processed, partitioned by status code and HTTP method.",
Type: "counter_vec",
Args: []string{"code", "method", "handler", "host", "url"}}
reqDuration = &Metric{
ID: "reqDur",
Name: "request_duration_seconds",
Description: "The HTTP request latencies in seconds.",
Type: "histogram_vec",
Args: []string{"code", "method", "url"},
}
resSize = &Metric{
ID: "resSz",
Name: "response_size_bytes",
Description: "The HTTP response sizes in bytes.",
Type: "summary"}
reqSize = &Metric{
ID: "reqSz",
Name: "request_size_bytes",
Description: "The HTTP request sizes in bytes.",
Type: "summary"}
standardMetrics = []*Metric{
reqCounter,
reqDuration,
resSize,
reqSize,
}
)
/*
RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control
the cardinality of the request counter's "url" label, which might be required in some contexts.
For instance, if for a "/customer/:name" route you don't want to generate a time series for every
possible customer name, you could use this function:
func(c *gin.Context) string {
url := c.Request.URL.Path
for _, p := range c.Params {
if p.Key == "name" {
url = strings.Replace(url, p.Value, ":name", 1)
break
}
}
return url
}
which would map "/customer/alice" and "/customer/bob" to their template "/customer/:name".
*/
type RequestCounterURLLabelMappingFn func(c *gin.Context) string
// Metric is a definition for the name, description, type, ID, and
// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric.
type Metric struct {
MetricCollector prometheus.Collector
ID string
Name string
Description string
Type string
Args []string
}
// Prometheus contains the metrics gathered by the instance and its path.
type Prometheus struct {
reqCnt *prometheus.CounterVec
reqDur *prometheus.HistogramVec
reqSz, resSz prometheus.Summary
router *gin.Engine
listenAddress string
Ppg PrometheusPushGateway
MetricsList []*Metric
MetricsPath string
ReqCntURLLabelMappingFn RequestCounterURLLabelMappingFn
// gin.Context string to use as a prometheus URL label
URLLabelFromContext string
}
// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional).
type PrometheusPushGateway struct {
// Push interval in seconds
PushIntervalSeconds time.Duration
// Push Gateway URL in format http://domain:port
// where JOBNAME can be any string of your choice
PushGatewayURL string
// Local metrics URL where metrics are fetched from, this could be omitted in the future
// if implemented using prometheus common/expfmt instead
MetricsURL string
// pushgateway job name, defaults to "gin"
Job string
}
// NewPrometheus generates a new set of metrics with a certain subsystem name.
func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus {
if subsystem == "" {
subsystem = "app"
}
var metricsList []*Metric
if len(customMetricsList) > 1 {
panic("Too many args. NewPrometheus( string, <optional []*Metric> ).")
} else if len(customMetricsList) == 1 {
metricsList = customMetricsList[0]
}
metricsList = append(metricsList, standardMetrics...)
p := &Prometheus{
MetricsList: metricsList,
MetricsPath: defaultMetricPath,
ReqCntURLLabelMappingFn: func(c *gin.Context) string {
return c.FullPath() // e.g. /user/:id , /user/:id/info
},
}
p.registerMetrics(subsystem)
return p
}
// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL
// every pushIntervalSeconds. Metrics are fetched from metricsURL.
func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) {
p.Ppg.PushGatewayURL = pushGatewayURL
p.Ppg.MetricsURL = metricsURL
p.Ppg.PushIntervalSeconds = pushIntervalSeconds
p.startPushTicker()
}
// SetPushGatewayJob job name, defaults to "gin".
func (p *Prometheus) SetPushGatewayJob(j string) {
p.Ppg.Job = j
}
// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the
// same address of the gin engine that is being used.
func (p *Prometheus) SetListenAddress(address string) {
p.listenAddress = address
if p.listenAddress != "" {
p.router = gin.Default()
}
}
// SetListenAddressWithRouter for using a separate router to expose metrics. (this keeps things like GET /metrics out of
// your content's access log).
func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Engine) {
p.listenAddress = listenAddress
if len(p.listenAddress) > 0 {
p.router = r
}
}
// SetMetricsPath set metrics paths.
func (p *Prometheus) SetMetricsPath(e *gin.Engine) error {
if p.listenAddress != "" {
p.router.GET(p.MetricsPath, prometheusHandler())
return p.runServer()
} else {
e.GET(p.MetricsPath, prometheusHandler())
return nil
}
}
// SetMetricsPathWithAuth set metrics paths with authentication.
func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) error {
if p.listenAddress != "" {
p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
return p.runServer()
} else {
e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
return nil
}
}
func (p *Prometheus) runServer() error {
return p.router.Run(p.listenAddress)
}
func (p *Prometheus) getMetrics() []byte {
response, err := http.Get(p.Ppg.MetricsURL)
if err != nil {
return nil
}
defer response.Body.Close()
body, _ := io.ReadAll(response.Body)
return body
}
var hostname, _ = os.Hostname()
func (p *Prometheus) getPushGatewayURL() string {
if p.Ppg.Job == "" {
p.Ppg.Job = "gin"
}
return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + hostname
}
func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) {
req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics))
if err != nil {
return
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Println("Error sending to push gateway error:", err.Error())
}
resp.Body.Close()
}
func (p *Prometheus) startPushTicker() {
ticker := time.NewTicker(time.Second * p.Ppg.PushIntervalSeconds)
go func() {
for range ticker.C {
p.sendMetricsToPushGateway(p.getMetrics())
}
}()
}
// NewMetric associates prometheus.Collector based on Metric.Type.
func NewMetric(m *Metric, subsystem string) prometheus.Collector {
var metric prometheus.Collector
switch m.Type {
case "counter_vec":
metric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "counter":
metric = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "gauge_vec":
metric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "gauge":
metric = prometheus.NewGauge(
prometheus.GaugeOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "histogram_vec":
metric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "histogram":
metric = prometheus.NewHistogram(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "summary_vec":
metric = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "summary":
metric = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
}
return metric
}
func (p *Prometheus) registerMetrics(subsystem string) {
for _, metricDef := range p.MetricsList {
metric := NewMetric(metricDef, subsystem)
if err := prometheus.Register(metric); err != nil {
fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error())
}
switch metricDef {
case reqCounter:
p.reqCnt = metric.(*prometheus.CounterVec)
case reqDuration:
p.reqDur = metric.(*prometheus.HistogramVec)
case resSize:
p.resSz = metric.(prometheus.Summary)
case reqSize:
p.reqSz = metric.(prometheus.Summary)
}
metricDef.MetricCollector = metric
}
}
// Use adds the middleware to a gin engine.
func (p *Prometheus) Use(e *gin.Engine) error {
e.Use(p.HandlerFunc())
return p.SetMetricsPath(e)
}
// UseWithAuth adds the middleware to a gin engine with BasicAuth.
func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) error {
e.Use(p.HandlerFunc())
return p.SetMetricsPathWithAuth(e, accounts)
}
// HandlerFunc defines handler function for middleware.
func (p *Prometheus) HandlerFunc() gin.HandlerFunc {
return func(c *gin.Context) {
if c.Request.URL.Path == p.MetricsPath {
c.Next()
return
}
start := time.Now()
reqSz := computeApproximateRequestSize(c.Request)
c.Next()
status := strconv.Itoa(c.Writer.Status())
elapsed := float64(time.Since(start)) / float64(time.Second)
resSz := float64(c.Writer.Size())
url := p.ReqCntURLLabelMappingFn(c)
if len(p.URLLabelFromContext) > 0 {
u, found := c.Get(p.URLLabelFromContext)
if !found {
u = "unknown"
}
url = u.(string)
}
p.reqDur.WithLabelValues(status, c.Request.Method, url).Observe(elapsed)
p.reqCnt.WithLabelValues(status, c.Request.Method, c.HandlerName(), c.Request.Host, url).Inc()
p.reqSz.Observe(float64(reqSz))
p.resSz.Observe(resSz)
}
}
func prometheusHandler() gin.HandlerFunc {
h := promhttp.Handler()
return func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
}
}
func computeApproximateRequestSize(r *http.Request) int {
var s int
if r.URL != nil {
s = len(r.URL.Path)
}
s += len(r.Method)
s += len(r.Proto)
for name, values := range r.Header {
s += len(name)
for _, value := range values {
s += len(value)
}
}
s += len(r.Host)
// r.FormData and r.MultipartForm are assumed to be included in r.URL.
if r.ContentLength != -1 {
s += int(r.ContentLength)
}
return s
}
//
//import (
// "bytes"
// "fmt"
// "io"
// "net/http"
// "os"
// "strconv"
// "time"
//
// "github.com/gin-gonic/gin"
// "github.com/prometheus/client_golang/prometheus"
// "github.com/prometheus/client_golang/prometheus/promhttp"
//)
//
//var defaultMetricPath = "/metrics"
//
//// counter, counter_vec, gauge, gauge_vec,
//// histogram, histogram_vec, summary, summary_vec.
//var (
// reqCounter = &Metric{
// ID: "reqCnt",
// Name: "requests_total",
// Description: "How many HTTP requests processed, partitioned by status code and HTTP method.",
// Type: "counter_vec",
// Args: []string{"code", "method", "handler", "host", "url"}}
//
// reqDuration = &Metric{
// ID: "reqDur",
// Name: "request_duration_seconds",
// Description: "The HTTP request latencies in seconds.",
// Type: "histogram_vec",
// Args: []string{"code", "method", "url"},
// }
//
// resSize = &Metric{
// ID: "resSz",
// Name: "response_size_bytes",
// Description: "The HTTP response sizes in bytes.",
// Type: "summary"}
//
// reqSize = &Metric{
// ID: "reqSz",
// Name: "request_size_bytes",
// Description: "The HTTP request sizes in bytes.",
// Type: "summary"}
//
// standardMetrics = []*Metric{
// reqCounter,
// reqDuration,
// resSize,
// reqSize,
// }
//)
//
///*
//RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control
//the cardinality of the request counter's "url" label, which might be required in some contexts.
//For instance, if for a "/customer/:name" route you don't want to generate a time series for every
//possible customer name, you could use this function:
//
// func(c *gin.Context) string {
// url := c.Request.URL.Path
// for _, p := range c.Params {
// if p.Key == "name" {
// url = strings.Replace(url, p.Value, ":name", 1)
// break
// }
// }
// return url
// }
//
//which would map "/customer/alice" and "/customer/bob" to their template "/customer/:name".
//*/
//type RequestCounterURLLabelMappingFn func(c *gin.Context) string
//
//// Metric is a definition for the name, description, type, ID, and
//// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric.
//type Metric struct {
// MetricCollector prometheus.Collector
// ID string
// Name string
// Description string
// Type string
// Args []string
//}
//
//// Prometheus contains the metrics gathered by the instance and its path.
//type Prometheus struct {
// reqCnt *prometheus.CounterVec
// reqDur *prometheus.HistogramVec
// reqSz, resSz prometheus.Summary
// router *gin.Engine
// listenAddress string
// Ppg PrometheusPushGateway
//
// MetricsList []*Metric
// MetricsPath string
//
// ReqCntURLLabelMappingFn RequestCounterURLLabelMappingFn
//
// // gin.Context string to use as a prometheus URL label
// URLLabelFromContext string
//}
//
//// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional).
//type PrometheusPushGateway struct {
//
// // Push interval in seconds
// PushIntervalSeconds time.Duration
//
// // Push Gateway URL in format http://domain:port
// // where JOBNAME can be any string of your choice
// PushGatewayURL string
//
// // Local metrics URL where metrics are fetched from, this could be omitted in the future
// // if implemented using prometheus common/expfmt instead
// MetricsURL string
//
// // pushgateway job name, defaults to "gin"
// Job string
//}
//
//// NewPrometheus generates a new set of metrics with a certain subsystem name.
//func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus {
// if subsystem == "" {
// subsystem = "app"
// }
//
// var metricsList []*Metric
//
// if len(customMetricsList) > 1 {
// panic("Too many args. NewPrometheus( string, <optional []*Metric> ).")
// } else if len(customMetricsList) == 1 {
// metricsList = customMetricsList[0]
// }
// metricsList = append(metricsList, standardMetrics...)
//
// p := &Prometheus{
// MetricsList: metricsList,
// MetricsPath: defaultMetricPath,
// ReqCntURLLabelMappingFn: func(c *gin.Context) string {
// return c.FullPath() // e.g. /user/:id , /user/:id/info
// },
// }
//
// p.registerMetrics(subsystem)
//
// return p
//}
//
//// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL
//// every pushIntervalSeconds. Metrics are fetched from metricsURL.
//func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) {
// p.Ppg.PushGatewayURL = pushGatewayURL
// p.Ppg.MetricsURL = metricsURL
// p.Ppg.PushIntervalSeconds = pushIntervalSeconds
// p.startPushTicker()
//}
//
//// SetPushGatewayJob job name, defaults to "gin".
//func (p *Prometheus) SetPushGatewayJob(j string) {
// p.Ppg.Job = j
//}
//
//// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the
//// same address of the gin engine that is being used.
//func (p *Prometheus) SetListenAddress(address string) {
// p.listenAddress = address
// if p.listenAddress != "" {
// p.router = gin.Default()
// }
//}
//
//// SetListenAddressWithRouter for using a separate router to expose metrics. (this keeps things like GET /metrics out of
//// your content's access log).
//func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Engine) {
// p.listenAddress = listenAddress
// if len(p.listenAddress) > 0 {
// p.router = r
// }
//}
//
//// SetMetricsPath set metrics paths.
//func (p *Prometheus) SetMetricsPath(e *gin.Engine) error {
//
// if p.listenAddress != "" {
// p.router.GET(p.MetricsPath, prometheusHandler())
// return p.runServer()
// } else {
// e.GET(p.MetricsPath, prometheusHandler())
// return nil
// }
//}
//
//// SetMetricsPathWithAuth set metrics paths with authentication.
//func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) error {
//
// if p.listenAddress != "" {
// p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
// return p.runServer()
// } else {
// e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
// return nil
// }
//
//}
//
//func (p *Prometheus) runServer() error {
// return p.router.Run(p.listenAddress)
//}
//
//func (p *Prometheus) getMetrics() []byte {
// response, err := http.Get(p.Ppg.MetricsURL)
// if err != nil {
// return nil
// }
//
// defer response.Body.Close()
//
// body, _ := io.ReadAll(response.Body)
// return body
//}
//
//var hostname, _ = os.Hostname()
//
//func (p *Prometheus) getPushGatewayURL() string {
// if p.Ppg.Job == "" {
// p.Ppg.Job = "gin"
// }
// return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + hostname
//}
//
//func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) {
// req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics))
// if err != nil {
// return
// }
//
// client := &http.Client{}
// resp, err := client.Do(req)
// if err != nil {
// fmt.Println("Error sending to push gateway error:", err.Error())
// }
//
// resp.Body.Close()
//}
//
//func (p *Prometheus) startPushTicker() {
// ticker := time.NewTicker(time.Second * p.Ppg.PushIntervalSeconds)
// go func() {
// for range ticker.C {
// p.sendMetricsToPushGateway(p.getMetrics())
// }
// }()
//}
//
//// NewMetric associates prometheus.Collector based on Metric.Type.
//func NewMetric(m *Metric, subsystem string) prometheus.Collector {
// var metric prometheus.Collector
// switch m.Type {
// case "counter_vec":
// metric = prometheus.NewCounterVec(
// prometheus.CounterOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// m.Args,
// )
// case "counter":
// metric = prometheus.NewCounter(
// prometheus.CounterOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// )
// case "gauge_vec":
// metric = prometheus.NewGaugeVec(
// prometheus.GaugeOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// m.Args,
// )
// case "gauge":
// metric = prometheus.NewGauge(
// prometheus.GaugeOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// )
// case "histogram_vec":
// metric = prometheus.NewHistogramVec(
// prometheus.HistogramOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// m.Args,
// )
// case "histogram":
// metric = prometheus.NewHistogram(
// prometheus.HistogramOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// )
// case "summary_vec":
// metric = prometheus.NewSummaryVec(
// prometheus.SummaryOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// m.Args,
// )
// case "summary":
// metric = prometheus.NewSummary(
// prometheus.SummaryOpts{
// Subsystem: subsystem,
// Name: m.Name,
// Help: m.Description,
// },
// )
// }
// return metric
//}
//
//func (p *Prometheus) registerMetrics(subsystem string) {
// for _, metricDef := range p.MetricsList {
// metric := NewMetric(metricDef, subsystem)
// if err := prometheus.Register(metric); err != nil {
// fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error())
// }
//
// switch metricDef {
// case reqCounter:
// p.reqCnt = metric.(*prometheus.CounterVec)
// case reqDuration:
// p.reqDur = metric.(*prometheus.HistogramVec)
// case resSize:
// p.resSz = metric.(prometheus.Summary)
// case reqSize:
// p.reqSz = metric.(prometheus.Summary)
// }
// metricDef.MetricCollector = metric
// }
//}
//
//// Use adds the middleware to a gin engine.
//func (p *Prometheus) Use(e *gin.Engine) error {
// e.Use(p.HandlerFunc())
// return p.SetMetricsPath(e)
//}
//
//// UseWithAuth adds the middleware to a gin engine with BasicAuth.
//func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) error {
// e.Use(p.HandlerFunc())
// return p.SetMetricsPathWithAuth(e, accounts)
//}
//
//// HandlerFunc defines handler function for middleware.
//func (p *Prometheus) HandlerFunc() gin.HandlerFunc {
// return func(c *gin.Context) {
// if c.Request.URL.Path == p.MetricsPath {
// c.Next()
// return
// }
//
// start := time.Now()
// reqSz := computeApproximateRequestSize(c.Request)
//
// c.Next()
//
// status := strconv.Itoa(c.Writer.Status())
// elapsed := float64(time.Since(start)) / float64(time.Second)
// resSz := float64(c.Writer.Size())
//
// url := p.ReqCntURLLabelMappingFn(c)
// if len(p.URLLabelFromContext) > 0 {
// u, found := c.Get(p.URLLabelFromContext)
// if !found {
// u = "unknown"
// }
// url = u.(string)
// }
// p.reqDur.WithLabelValues(status, c.Request.Method, url).Observe(elapsed)
// p.reqCnt.WithLabelValues(status, c.Request.Method, c.HandlerName(), c.Request.Host, url).Inc()
// p.reqSz.Observe(float64(reqSz))
// p.resSz.Observe(resSz)
// }
//}
//
//func prometheusHandler() gin.HandlerFunc {
// h := promhttp.Handler()
// return func(c *gin.Context) {
// h.ServeHTTP(c.Writer, c.Request)
// }
//}
//
//func computeApproximateRequestSize(r *http.Request) int {
// var s int
// if r.URL != nil {
// s = len(r.URL.Path)
// }
//
// s += len(r.Method)
// s += len(r.Proto)
// for name, values := range r.Header {
// s += len(name)
// for _, value := range values {
// s += len(value)
// }
// }
// s += len(r.Host)
//
// // r.FormData and r.MultipartForm are assumed to be included in r.URL.
//
// if r.ContentLength != -1 {
// s += int(r.ContentLength)
// }
// return s
//}

@ -0,0 +1,50 @@
package prommetrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"strconv"
)
const ApiPath = "/metrics"
var (
apiRegistry = prometheus.NewRegistry()
apiCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "api_count",
Help: "Total number of API calls",
},
[]string{"path", "method", "code"},
)
httpCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_count",
Help: "Total number of HTTP calls",
},
[]string{"path", "method", "status"},
)
)
func init() {
apiRegistry.MustRegister(apiCounter, httpCounter)
}
func APICall(path string, method string, apiCode int) {
apiCounter.With(prometheus.Labels{"path": path, "method": method, "code": strconv.Itoa(apiCode)}).Inc()
}
func HttpCall(path string, method string, status int) {
httpCounter.With(prometheus.Labels{"path": path, "method": method, "status": strconv.Itoa(status)}).Inc()
}
//func ApiHandler() http.Handler {
// return promhttp.InstrumentMetricHandler(
// apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}),
// )
//}
func ApiHandler() http.Handler {
return promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{})
}

@ -1,30 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prommetrics
import ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
/*
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc().
*/
var (
ApiCustomCnt = &ginprom.Metric{
Name: "custom_total",
Description: "Custom counter events.",
Type: "counter_vec",
Args: []string{"label_one", "label_two"},
}
)

@ -17,7 +17,6 @@ package prommetrics
import (
gp "github.com/grpc-ecosystem/go-grpc-prometheus"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)
@ -48,11 +47,11 @@ func GetGrpcCusMetrics(registerName string, share *config2.Share) []prometheus.C
}
}
func GetGinCusMetrics(name string) []*ginprometheus.Metric {
switch name {
case "Api":
return []*ginprometheus.Metric{ApiCustomCnt}
default:
return []*ginprometheus.Metric{ApiCustomCnt}
}
}
//func GetGinCusMetrics(name string) []*ginprometheus.Metric {
// switch name {
// case "Api":
// return []*ginprometheus.Metric{ApiCustomCnt}
// default:
// return []*ginprometheus.Metric{ApiCustomCnt}
// }
//}

@ -0,0 +1,33 @@
package prommetrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"strconv"
)
const RpcPath = "/metrics"
var (
rpcRegistry = prometheus.NewRegistry()
rpcCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_count",
Help: "Total number of RPC calls",
},
[]string{"name", "path", "code"},
)
)
func init() {
rpcRegistry.MustRegister(rpcCounter)
}
func RPCCall(name string, path string, code int) {
rpcCounter.With(prometheus.Labels{"name": name, "path": path, "code": strconv.Itoa(code)}).Inc()
}
func RPCHandler() http.Handler {
return promhttp.HandlerFor(rpcRegistry, promhttp.HandlerOpts{})
}

@ -19,7 +19,7 @@ import (
"fmt"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/status"
"net"
"net/http"
"os"
@ -29,7 +29,6 @@ import (
"syscall"
"time"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
@ -38,7 +37,6 @@ import (
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/network"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
@ -77,13 +75,14 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC
return err
}
var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics
//var reg *prometheus.Registry
//var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
//cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
//reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
//options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
options = append(options, mw.GrpcServer(), prommetricsInterceptor(rpcRegisterName))
} else {
options = append(options, mw.GrpcServer())
}
@ -122,13 +121,19 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC
netDone <- struct{}{}
return
}
metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
srv := http.NewServeMux()
srv.Handle(prommetrics.RpcPath, prommetrics.RPCHandler())
if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
netDone <- struct{}{}
}
//metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
//httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
//if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
// netDone <- struct{}{}
//}
}()
}
@ -175,3 +180,21 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error {
return nil
}
}
func prommetricsInterceptor(rpcRegisterName string) grpc.ServerOption {
getCode := func(err error) int {
if err == nil {
return 0
}
rpcErr, ok := err.(interface{ GRPCStatus() *status.Status })
if !ok {
return -1
}
return int(rpcErr.GRPCStatus().Code())
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
resp, err := handler(ctx, req)
prommetrics.RPCCall(rpcRegisterName, info.FullMethod, getCode(err))
return resp, err
})
}

Loading…
Cancel
Save