diff --git a/internal/api/init.go b/internal/api/init.go index e6fe46d22..679d901f4 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -74,7 +74,7 @@ func Start(ctx context.Context, index int, config *Config) error { srv := http.NewServeMux() srv.Handle(prommetrics.ApiPath, prommetrics.ApiHandler()) if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) + netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) netDone <- struct{}{} } }() diff --git a/pkg/common/prommetrics/api.go b/pkg/common/prommetrics/api.go index b80b8df85..806198977 100644 --- a/pkg/common/prommetrics/api.go +++ b/pkg/common/prommetrics/api.go @@ -39,8 +39,12 @@ func HttpCall(path string, method string, status int) { httpCounter.With(prometheus.Labels{"path": path, "method": method, "status": strconv.Itoa(status)}).Inc() } +//func ApiHandler() http.Handler { +// return promhttp.InstrumentMetricHandler( +// apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), +// ) +//} + func ApiHandler() http.Handler { - return promhttp.InstrumentMetricHandler( - apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), - ) + return promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}) } diff --git a/pkg/common/prommetrics/api_test.go b/pkg/common/prommetrics/api_test.go deleted file mode 100644 index b0e0f7b20..000000000 --- a/pkg/common/prommetrics/api_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package prommetrics - -import ( - "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "net/http" - "testing" - "time" -) - -var ( - apiCallCounter1 = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "api_calls_total", - Help: "Total number of API calls", - }, - []string{"endpoint", "status", "code", "error"}, - ) - registerer *prometheus.Registry -) - -func init() { - registerer = prometheus.NewRegistry() - registerer.MustRegister(apiCallCounter1) -} - -func recordAPICall(endpoint string, status string) { - apiCallCounter1.With(prometheus.Labels{"endpoint": endpoint, "status": status, "code": "200", "error": "ArgsError"}).Inc() -} - -func TestName(t *testing.T) { - go func() { - for i := 0; ; i++ { - recordAPICall("/api/test", "success") - time.Sleep(time.Second) - } - }() - - go func() { - for i := 0; ; i++ { - recordAPICall("/api/test", "failed") - time.Sleep(time.Second * 3) - } - }() - promhttp.Handler() - http.Handle("/metrics", promhttp.HandlerFor(registerer, promhttp.HandlerOpts{})) - if err := http.ListenAndServe(":2112", nil); err != nil { - panic(err) - } -} - -func TestName2(t *testing.T) { - var d time.Duration - d = time.Second / 900 - fmt.Println(durationRange(d)) -} diff --git a/pkg/common/prommetrics/gin_api.go b/pkg/common/prommetrics/gin_api.go deleted file mode 100644 index 47b043f64..000000000 --- a/pkg/common/prommetrics/gin_api.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prommetrics - -//import ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" - -/* -labels := prometheus.Labels{"label_one": "any", "label_two": "value"} -ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc(). -*/ -//var ( -// ApiCustomCnt = &ginprom.Metric{ -// Name: "custom_total", -// Description: "Custom counter events.", -// Type: "counter_vec", -// Args: []string{"label_one", "label_two"}, -// } -//) diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go new file mode 100644 index 000000000..834f76a9f --- /dev/null +++ b/pkg/common/prommetrics/rpc.go @@ -0,0 +1,33 @@ +package prommetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "net/http" + "strconv" +) + +const RpcPath = "/metrics" + +var ( + rpcRegistry = prometheus.NewRegistry() + rpcCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "rpc_count", + Help: "Total number of RPC calls", + }, + []string{"name", "path", "code"}, + ) +) + +func init() { + rpcRegistry.MustRegister(rpcCounter) +} + +func RPCCall(name string, path string, code int) { + rpcCounter.With(prometheus.Labels{"name": name, "path": path, "code": strconv.Itoa(code)}).Inc() +} + +func RPCHandler() http.Handler { + return promhttp.HandlerFor(rpcRegistry, promhttp.HandlerOpts{}) +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index b531daa47..a88797793 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -19,7 +19,7 @@ import ( "fmt" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/utils/datautil" - "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/status" "net" "net/http" "os" @@ -29,7 +29,6 @@ import ( "syscall" "time" - grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -38,7 +37,6 @@ import ( "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/network" - "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -77,13 +75,14 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC return err } - var reg *prometheus.Registry - var metric *grpcprometheus.ServerMetrics + //var reg *prometheus.Registry + //var metric *grpcprometheus.ServerMetrics if prometheusConfig.Enable { - cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) - options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), - grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) + //cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + //reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) + //options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), + // grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) + options = append(options, mw.GrpcServer(), prommetricsInterceptor(rpcRegisterName)) } else { options = append(options, mw.GrpcServer()) } @@ -122,13 +121,19 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC netDone <- struct{}{} return } - metric.InitializeMetrics(srv) - // Create a HTTP server for prometheus. - httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} - if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) + srv := http.NewServeMux() + srv.Handle(prommetrics.RpcPath, prommetrics.RPCHandler()) + if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed { + netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) netDone <- struct{}{} } + //metric.InitializeMetrics(srv) + // Create a HTTP server for prometheus. + //httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} + //if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) + // netDone <- struct{}{} + //} }() } @@ -175,3 +180,21 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error { return nil } } + +func prommetricsInterceptor(rpcRegisterName string) grpc.ServerOption { + getCode := func(err error) int { + if err == nil { + return 0 + } + rpcErr, ok := err.(interface{ GRPCStatus() *status.Status }) + if !ok { + return -1 + } + return int(rpcErr.GRPCStatus().Code()) + } + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + resp, err := handler(ctx, req) + prommetrics.RPCCall(rpcRegisterName, info.FullMethod, getCode(err)) + return resp, err + }) +}