rpc prommetrics

pull/2398/head
withchao 1 year ago
parent e3ee24df31
commit 0a9b53bbe3

@ -74,7 +74,7 @@ func Start(ctx context.Context, index int, config *Config) error {
srv := http.NewServeMux() srv := http.NewServeMux()
srv.Handle(prommetrics.ApiPath, prommetrics.ApiHandler()) srv.Handle(prommetrics.ApiPath, prommetrics.ApiHandler())
if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed { if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort))
netDone <- struct{}{} netDone <- struct{}{}
} }
}() }()

@ -39,8 +39,12 @@ func HttpCall(path string, method string, status int) {
httpCounter.With(prometheus.Labels{"path": path, "method": method, "status": strconv.Itoa(status)}).Inc() httpCounter.With(prometheus.Labels{"path": path, "method": method, "status": strconv.Itoa(status)}).Inc()
} }
//func ApiHandler() http.Handler {
// return promhttp.InstrumentMetricHandler(
// apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}),
// )
//}
func ApiHandler() http.Handler { func ApiHandler() http.Handler {
return promhttp.InstrumentMetricHandler( return promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{})
apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}),
)
} }

@ -1,57 +0,0 @@
package prommetrics
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"testing"
"time"
)
var (
apiCallCounter1 = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "api_calls_total",
Help: "Total number of API calls",
},
[]string{"endpoint", "status", "code", "error"},
)
registerer *prometheus.Registry
)
func init() {
registerer = prometheus.NewRegistry()
registerer.MustRegister(apiCallCounter1)
}
func recordAPICall(endpoint string, status string) {
apiCallCounter1.With(prometheus.Labels{"endpoint": endpoint, "status": status, "code": "200", "error": "ArgsError"}).Inc()
}
func TestName(t *testing.T) {
go func() {
for i := 0; ; i++ {
recordAPICall("/api/test", "success")
time.Sleep(time.Second)
}
}()
go func() {
for i := 0; ; i++ {
recordAPICall("/api/test", "failed")
time.Sleep(time.Second * 3)
}
}()
promhttp.Handler()
http.Handle("/metrics", promhttp.HandlerFor(registerer, promhttp.HandlerOpts{}))
if err := http.ListenAndServe(":2112", nil); err != nil {
panic(err)
}
}
func TestName2(t *testing.T) {
var d time.Duration
d = time.Second / 900
fmt.Println(durationRange(d))
}

@ -1,30 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prommetrics
//import ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
/*
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc().
*/
//var (
// ApiCustomCnt = &ginprom.Metric{
// Name: "custom_total",
// Description: "Custom counter events.",
// Type: "counter_vec",
// Args: []string{"label_one", "label_two"},
// }
//)

@ -0,0 +1,33 @@
package prommetrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"strconv"
)
const RpcPath = "/metrics"
var (
rpcRegistry = prometheus.NewRegistry()
rpcCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_count",
Help: "Total number of RPC calls",
},
[]string{"name", "path", "code"},
)
)
func init() {
rpcRegistry.MustRegister(rpcCounter)
}
func RPCCall(name string, path string, code int) {
rpcCounter.With(prometheus.Labels{"name": name, "path": path, "code": strconv.Itoa(code)}).Inc()
}
func RPCHandler() http.Handler {
return promhttp.HandlerFor(rpcRegistry, promhttp.HandlerOpts{})
}

@ -19,7 +19,7 @@ import (
"fmt" "fmt"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/status"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -29,7 +29,6 @@ import (
"syscall" "syscall"
"time" "time"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
@ -38,7 +37,6 @@ import (
"github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/network" "github.com/openimsdk/tools/utils/network"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )
@ -77,13 +75,14 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC
return err return err
} }
var reg *prometheus.Registry //var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics //var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable { if prometheusConfig.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) //cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) //reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), //options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) // grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
options = append(options, mw.GrpcServer(), prommetricsInterceptor(rpcRegisterName))
} else { } else {
options = append(options, mw.GrpcServer()) options = append(options, mw.GrpcServer())
} }
@ -122,13 +121,19 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC
netDone <- struct{}{} netDone <- struct{}{}
return return
} }
metric.InitializeMetrics(srv) srv := http.NewServeMux()
// Create a HTTP server for prometheus. srv.Handle(prommetrics.RpcPath, prommetrics.RPCHandler())
httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
netDone <- struct{}{} netDone <- struct{}{}
} }
//metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
//httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
//if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
// netDone <- struct{}{}
//}
}() }()
} }
@ -175,3 +180,21 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error {
return nil return nil
} }
} }
func prommetricsInterceptor(rpcRegisterName string) grpc.ServerOption {
getCode := func(err error) int {
if err == nil {
return 0
}
rpcErr, ok := err.(interface{ GRPCStatus() *status.Status })
if !ok {
return -1
}
return int(rpcErr.GRPCStatus().Code())
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
resp, err := handler(ctx, req)
prommetrics.RPCCall(rpcRegisterName, info.FullMethod, getCode(err))
return resp, err
})
}

Loading…
Cancel
Save