diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 174300dc7..0f50c621f 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -17,6 +17,8 @@ package main import ( "context" "fmt" + ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "net" _ "net/http/pprof" "strconv" @@ -43,11 +45,11 @@ func main() { } } -func run(port int) error { - log.ZInfo(context.Background(), "Openim api port:", "port", port) +func run(port int, proPort int) error { + log.ZInfo(context.Background(), "Openim api port:", "port", port, "proPort", proPort) - if port == 0 { - err := "port is empty" + if port == 0 || proPort == 0 { + err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) log.ZError(context.Background(), err, nil) return fmt.Errorf(err) @@ -82,6 +84,13 @@ func run(port int) error { } log.ZInfo(context.Background(), "api register public config to discov success") router := api.NewGinRouter(client, rdb) + ////////////////////////////// + if config.Config.Prometheus.Enable { + p := ginProm.NewPrometheus("app", prom_metrics.GetGinCusMetrics("Api")) + p.SetListenAddress(fmt.Sprintf(":%d", proPort)) + p.Use(router) + } + ///////////////////////////////// log.ZInfo(context.Background(), "api init router success") var address string if config.Config.Api.ListenIP != "" { diff --git a/cmd/openim-msgtransfer/main.go b/cmd/openim-msgtransfer/main.go index 722bf5960..6895bcecc 100644 --- a/cmd/openim-msgtransfer/main.go +++ b/cmd/openim-msgtransfer/main.go @@ -21,6 +21,7 @@ import ( func main() { msgTransferCmd := cmd.NewMsgTransferCmd() msgTransferCmd.AddPrometheusPortFlag() + msgTransferCmd.AddTransferProgressFlag() if err := msgTransferCmd.Exec(); err != nil { panic(err.Error()) } diff --git a/config/config.yaml b/config/config.yaml index 81da293cb..e375118f7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -382,7 +382,9 @@ callback: # The number of Prometheus ports per service needs to correspond to rpcPort # The number of ports needs to be consistent with msg_transfer_service_num in script/path_info.sh prometheus: - enable: false + enable: true + prometheusUrl: "https://openim.prometheus" + apiPrometheusPort: [20100] userPrometheusPort: [ 20110 ] friendPrometheusPort: [ 20120 ] messagePrometheusPort: [ 20130 ] diff --git a/deployments/templates/openim.yaml b/deployments/templates/openim.yaml index 92eb1cd45..44a28adff 100644 --- a/deployments/templates/openim.yaml +++ b/deployments/templates/openim.yaml @@ -383,6 +383,8 @@ callback: # The number of ports needs to be consistent with msg_transfer_service_num in script/path_info.sh prometheus: enable: ${PROMETHEUS_ENABLE} + prometheusUrl: ${PROMETHEUS_URL} + apiPrometheusPort: [${API_PROM_PORT}] userPrometheusPort: [ ${USER_PROM_PORT} ] friendPrometheusPort: [ ${FRIEND_PROM_PORT} ] messagePrometheusPort: [ ${MESSAGE_PROM_PORT} ] diff --git a/go.mod b/go.mod index 64a4db405..31f067693 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.3 - github.com/OpenIMSDK/protocol v0.0.30 + github.com/OpenIMSDK/protocol v0.0.31 github.com/OpenIMSDK/tools v0.0.16 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index 4116d5428..2fa9b872c 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= -github.com/OpenIMSDK/protocol v0.0.30 h1:MiHO6PyQMR9ojBHNnSFxCHLmsoE2xZqaiYj975JiZnM= -github.com/OpenIMSDK/protocol v0.0.30/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE= +github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.16 h1:te/GIq2imCMsrRPgU9OObYKbzZ3rT08Lih/o+3QFIz0= github.com/OpenIMSDK/tools v0.0.16/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/api/route.go b/internal/api/route.go index d714270b4..7a331d643 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -39,7 +39,6 @@ import ( "github.com/OpenIMSDK/tools/mw" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/prome" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -63,13 +62,6 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc) - if config.Config.Prometheus.Enable { - prome.NewApiRequestCounter() - prome.NewApiRequestFailedCounter() - prome.NewApiRequestSuccessCounter() - r.Use(prome.PrometheusMiddleware) - r.GET("/metrics", prome.PrometheusHandler()) - } ParseToken := GinParseToken(rdb) userRouterGroup := r.Group("/user") { @@ -151,6 +143,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive // Third service thirdGroup := r.Group("/third", ParseToken) { + thirdGroup.GET("/prometheus", GetPrometheus) t := NewThirdApi(*thirdRpc) thirdGroup.POST("/fcm_update_token", t.FcmUpdateToken) thirdGroup.POST("/set_app_badge", t.SetAppBadge) diff --git a/internal/api/third.go b/internal/api/third.go index cdb059cc0..fca133ea9 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -15,6 +15,7 @@ package api import ( + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "math/rand" "net/http" "strconv" @@ -118,3 +119,7 @@ func (o *ThirdApi) DeleteLogs(c *gin.Context) { func (o *ThirdApi) SearchLogs(c *gin.Context) { a2r.Call(third.ThirdClient.SearchLogs, o.Client, c) } + +func GetPrometheus(c *gin.Context) { + c.Redirect(http.StatusFound, config2.Config.Prometheus.PrometheusUrl) +} diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index ae12c04a3..670757850 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -33,7 +33,6 @@ import ( "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/prome" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" ) @@ -69,9 +68,10 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, longConnServer LongConnServer) *Server { +func NewServer(rpcPort int, proPort int, longConnServer LongConnServer) *Server { return &Server{ rpcPort: rpcPort, + prometheusPort: proPort, LongConnServer: longConnServer, pushTerminal: []int{constant.IOSPlatformID, constant.AndroidPlatformID}, } @@ -158,7 +158,6 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg( } else { if utils.IsContainInt(client.PlatformID, s.pushTerminal) { tempT.OnlinePush = true - prome.Inc(prome.MsgOnlinePushSuccessCounter) resp = append(resp, temp) } } diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index ce63fb21a..94f1b2011 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -41,7 +41,7 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { if err != nil { return err } - hubServer := NewServer(rpcPort, longServer) + hubServer := NewServer(rpcPort, prometheusPort, longServer) go func() { err := hubServer.Start() if err != nil { diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 83e297502..ad56c1373 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -17,6 +17,7 @@ package msggateway import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "net/http" "strconv" "sync" @@ -220,6 +221,7 @@ func (ws *WsServer) registerClient(client *Client) { if !userOK { ws.clients.Set(client.UserID, client) log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) + prom_metrics.OnlineUserGauge.Add(1) ws.onlineUserNum.Add(1) ws.onlineUserConnNum.Add(1) } else { @@ -364,6 +366,7 @@ func (ws *WsServer) unregisterClient(client *Client) { isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr()) if isDeleteUser { ws.onlineUserNum.Add(-1) + prom_metrics.OnlineUserGauge.Dec() } ws.onlineUserConnNum.Add(-1) ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 7efc35794..c18186fa8 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -15,13 +15,18 @@ package msgtransfer import ( + "errors" "fmt" - "sync" - + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "log" + "net/http" + "sync" "github.com/OpenIMSDK/tools/mw" @@ -31,7 +36,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/relation" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/common/prome" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -81,7 +85,6 @@ func StartTransfer(prometheusPort int) error { conversationRpcClient := rpcclient.NewConversationRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client) msgTransfer := NewMsgTransfer(chatLogDatabase, msgDatabase, &conversationRpcClient, &groupRpcClient) - msgTransfer.initPrometheus() return msgTransfer.Start(prometheusPort) } @@ -95,21 +98,13 @@ func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, } } -func (m *MsgTransfer) initPrometheus() { - prome.NewSeqGetSuccessCounter() - prome.NewSeqGetFailedCounter() - prome.NewSeqSetSuccessCounter() - prome.NewSeqSetFailedCounter() - prome.NewMsgInsertRedisSuccessCounter() - prome.NewMsgInsertRedisFailedCounter() - prome.NewMsgInsertMongoSuccessCounter() - prome.NewMsgInsertMongoFailedCounter() -} - func (m *MsgTransfer) Start(prometheusPort int) error { var wg sync.WaitGroup wg.Add(1) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) + if prometheusPort <= 0 { + return errors.New("prometheusPort not correct") + } if config.Config.ChatPersistenceMysql { // go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH) } else { @@ -118,10 +113,21 @@ func (m *MsgTransfer) Start(prometheusPort int) error { go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH) // go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH) - err := prome.StartPrometheusSrv(prometheusPort) + /*err := prome.StartPrometheusSrv(prometheusPort) if err != nil { return err + }*/ + //////////////////////////// + if config.Config.Prometheus.Enable { + reg := prometheus.NewRegistry() + reg.MustRegister( + collectors.NewGoCollector(), + ) + reg.MustRegister(prom_metrics.GetGrpcCusMetrics("Transfer")...) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)) } + //////////////////////////////////////// wg.Wait() return nil } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index bfea6c433..88fd256d1 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -16,6 +16,7 @@ package msgtransfer import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "github.com/IBM/sarama" "google.golang.org/protobuf/proto" @@ -74,6 +75,9 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo( "conversationID", msgFromMQ.ConversationID, ) + prom_metrics.MsgInsertMongoFailedCounter.Inc() + } else { + prom_metrics.MsgInsertMongoSuccessCounter.Inc() } var seqs []int64 for _, msg := range msgFromMQ.MsgData { diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go index 8595c1656..b72c32bb1 100644 --- a/internal/push/consumer_init.go +++ b/internal/push/consumer_init.go @@ -14,10 +14,6 @@ package push -import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/prome" -) - type Consumer struct { pushCh ConsumerHandler successCount uint64 @@ -29,11 +25,6 @@ func NewConsumer(pusher *Pusher) *Consumer { } } -func (c *Consumer) initPrometheus() { - prome.NewMsgOfflinePushSuccessCounter() - prome.NewMsgOfflinePushFailedCounter() -} - func (c *Consumer) Start() { // statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to // msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 6e9c56023..0f8f36a49 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -67,7 +67,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e go func() { defer wg.Done() consumer := NewConsumer(pusher) - consumer.initPrometheus() consumer.Start() }() wg.Wait() diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 61d094d27..2f3156c28 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" "github.com/OpenIMSDK/protocol/conversation" @@ -40,7 +41,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" - "github.com/openimsdk/open-im-server/v3/pkg/common/prome" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -288,10 +288,9 @@ func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg } err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) if err != nil { - prome.Inc(prome.MsgOfflinePushFailedCounter) + prom_metrics.MsgOfflinePushFailedCounter.Inc() return err } - prome.Inc(prome.MsgOfflinePushSuccessCounter) return nil } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 9580ef8db..bcca59152 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -16,6 +16,7 @@ package auth import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -73,6 +74,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (* if err != nil { return nil, err } + prom_metrics.UserLoginCounter.Inc() resp.Token = token resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 2add9c0d1..b43bc82be 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -28,8 +29,6 @@ import ( "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" - - promepkg "github.com/openimsdk/open-im-server/v3/pkg/common/prome" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, error error) { @@ -59,9 +58,8 @@ func (m *msgServer) sendMsgSuperGroupChat( ctx context.Context, req *pbmsg.SendMsgReq, ) (resp *pbmsg.SendMsgResp, err error) { - promepkg.Inc(promepkg.WorkSuperGroupChatMsgRecvSuccessCounter) if err = m.messageVerification(ctx, req); err != nil { - promepkg.Inc(promepkg.WorkSuperGroupChatMsgProcessFailedCounter) + prom_metrics.GroupChatMsgProcessFailedCounter.Inc() return nil, err } if err = callbackBeforeSendGroupMsg(ctx, req); err != nil { @@ -80,7 +78,7 @@ func (m *msgServer) sendMsgSuperGroupChat( if err = callbackAfterSendGroupMsg(ctx, req); err != nil { log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) } - promepkg.Inc(promepkg.WorkSuperGroupChatMsgProcessSuccessCounter) + prom_metrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} resp.SendTime = req.MsgData.SendTime resp.ServerMsgID = req.MsgData.ServerMsgID @@ -133,9 +131,7 @@ func (m *msgServer) sendMsgNotification( ctx context.Context, req *pbmsg.SendMsgReq, ) (resp *pbmsg.SendMsgResp, err error) { - promepkg.Inc(promepkg.SingleChatMsgRecvSuccessCounter) if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { - promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter) return nil, err } resp = &pbmsg.SendMsgResp{ @@ -147,7 +143,6 @@ func (m *msgServer) sendMsgNotification( } func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { - promepkg.Inc(promepkg.SingleChatMsgRecvSuccessCounter) if err := m.messageVerification(ctx, req); err != nil { return nil, err } @@ -166,7 +161,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq } } if !isSend { - promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter) + prom_metrics.SingleChatMsgProcessFailedCounter.Inc() return nil, nil } else { if err = callbackBeforeSendSingleMsg(ctx, req); err != nil { @@ -176,7 +171,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq return nil, err } if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { - promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter) + prom_metrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } err = callbackAfterSendSingleMsg(ctx, req) @@ -188,7 +183,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq ClientMsgID: req.MsgData.ClientMsgID, SendTime: req.MsgData.SendTime, } - promepkg.Inc(promepkg.SingleChatMsgProcessSuccessCounter) + prom_metrics.SingleChatMsgProcessSuccessCounter.Inc() return resp, nil } } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index e8f80914f..88be287fd 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -28,7 +28,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/common/prome" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -94,27 +93,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e } s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.addInterceptorHandler(MessageHasReadEnabled) - s.initPrometheus() msg.RegisterMsgServer(server, s) return nil } -func (m *msgServer) initPrometheus() { - prome.NewMsgPullFromRedisSuccessCounter() - prome.NewMsgPullFromRedisFailedCounter() - prome.NewMsgPullFromMongoSuccessCounter() - prome.NewMsgPullFromMongoFailedCounter() - prome.NewSingleChatMsgRecvSuccessCounter() - prome.NewGroupChatMsgRecvSuccessCounter() - prome.NewWorkSuperGroupChatMsgRecvSuccessCounter() - prome.NewSingleChatMsgProcessSuccessCounter() - prome.NewSingleChatMsgProcessFailedCounter() - prome.NewGroupChatMsgProcessSuccessCounter() - prome.NewGroupChatMsgProcessFailedCounter() - prome.NewWorkSuperGroupChatMsgProcessSuccessCounter() - prome.NewWorkSuperGroupChatMsgProcessFailedCounter() -} - func (m *msgServer) conversationAndGetRecvID(conversation *conversation.Conversation, userID string) (recvID string) { if conversation.ConversationType == constant.SingleChatType || conversation.ConversationType == constant.NotificationChatType { diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 6c74ddad9..00c6cb241 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -34,9 +34,9 @@ func NewApiCmd() *ApiCmd { return ret } -func (a *ApiCmd) AddApi(f func(port int) error) { +func (a *ApiCmd) AddApi(f func(port int, promPort int) error) { a.Command.RunE = func(cmd *cobra.Command, args []string) error { - return f(a.getPortFlag(cmd)) + return f(a.getPortFlag(cmd), a.getPrometheusPortFlag(cmd)) } } @@ -44,8 +44,8 @@ func (a *ApiCmd) GetPortFromConfig(portType string) int { fmt.Println("GetPortFromConfig:", portType) if portType == constant.FlagPort { return config2.Config.Api.OpenImApiPort[0] - } else { - - return 0 + } else if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.ApiPrometheusPort[0] } + return 0 } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 358b79f9b..7f0abb771 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -66,7 +66,7 @@ func (m *MsgGatewayCmd) GetPortFromConfig(portType string) int { } else if portType == constant.FlagPort { return v3config.Config.LongConnSvr.OpenImMessageGatewayPort[0] } else if portType == constant.FlagPrometheusPort { - return 0 + return v3config.Config.Prometheus.MessageGatewayPrometheusPort[0] } else { return 0 } diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 20349ebbb..903d1fb95 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -15,6 +15,9 @@ package cmd import ( + "fmt" + "github.com/OpenIMSDK/protocol/constant" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/spf13/cobra" "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" @@ -40,3 +43,25 @@ func (m *MsgTransferCmd) Exec() error { m.addRunE() return m.Execute() } + +func (m *MsgTransferCmd) GetPortFromConfig(portType string) int { + fmt.Println("GetPortFromConfig:", portType) + if portType == constant.FlagPort { + return 0 + } else if portType == constant.FlagPrometheusPort { + n := m.getTransferProgressFlagValue() + return config2.Config.Prometheus.MessageTransferPrometheusPort[n] + } + return 0 +} +func (m *MsgTransferCmd) AddTransferProgressFlag() { + m.Command.Flags().IntP(constant.FlagTransferProgressIndex, "n", 0, "transfer progress index") +} +func (m *MsgTransferCmd) getTransferProgressFlagValue() int { + nindex, err := m.Command.Flags().GetInt(constant.FlagTransferProgressIndex) + if err != nil { + fmt.Println("get transfercmd error,make sure it is k8s env or not") + return 0 + } + return nindex +} diff --git a/pkg/common/cmd/rpc.go b/pkg/common/cmd/rpc.go index 0ccc37fcb..6266c03b2 100644 --- a/pkg/common/cmd/rpc.go +++ b/pkg/common/cmd/rpc.go @@ -61,34 +61,58 @@ func (a *RpcCmd) GetPortFromConfig(portType string) int { if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImPushPort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.PushPrometheusPort[0] + } case RpcAuthServer: if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImAuthPort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.AuthPrometheusPort[0] + } case RpcConversationServer: if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImConversationPort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.ConversationPrometheusPort[0] + } case RpcFriendServer: if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImFriendPort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.FriendPrometheusPort[0] + } case RpcGroupServer: if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImGroupPort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.GroupPrometheusPort[0] + } case RpcMsgServer: if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImMessagePort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.MessagePrometheusPort[0] + } case RpcThirdServer: if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImThirdPort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.ThirdPrometheusPort[0] + } case RpcUserServer: if portType == constant.FlagPort { return config2.Config.RpcPort.OpenImUserPort[0] } + if portType == constant.FlagPrometheusPort { + return config2.Config.Prometheus.UserPrometheusPort[0] + } } return 0 } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 95f4a864e..d7cecc616 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -262,18 +262,20 @@ type configStruct struct { } `yaml:"callback"` Prometheus struct { - Enable bool `yaml:"enable"` - UserPrometheusPort []int `yaml:"userPrometheusPort"` - FriendPrometheusPort []int `yaml:"friendPrometheusPort"` - MessagePrometheusPort []int `yaml:"messagePrometheusPort"` - MessageGatewayPrometheusPort []int `yaml:"messageGatewayPrometheusPort"` - GroupPrometheusPort []int `yaml:"groupPrometheusPort"` - AuthPrometheusPort []int `yaml:"authPrometheusPort"` - PushPrometheusPort []int `yaml:"pushPrometheusPort"` - ConversationPrometheusPort []int `yaml:"conversationPrometheusPort"` - RtcPrometheusPort []int `yaml:"rtcPrometheusPort"` - MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"` - ThirdPrometheusPort []int `yaml:"thirdPrometheusPort"` + Enable bool `yaml:"enable"` + PrometheusUrl string `yaml:"prometheusUrl"` + ApiPrometheusPort []int `yaml:"apiPrometheusPort"` + UserPrometheusPort []int `yaml:"userPrometheusPort"` + FriendPrometheusPort []int `yaml:"friendPrometheusPort"` + MessagePrometheusPort []int `yaml:"messagePrometheusPort"` + MessageGatewayPrometheusPort []int `yaml:"messageGatewayPrometheusPort"` + GroupPrometheusPort []int `yaml:"groupPrometheusPort"` + AuthPrometheusPort []int `yaml:"authPrometheusPort"` + PushPrometheusPort []int `yaml:"pushPrometheusPort"` + ConversationPrometheusPort []int `yaml:"conversationPrometheusPort"` + RtcPrometheusPort []int `yaml:"rtcPrometheusPort"` + MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"` + ThirdPrometheusPort []int `yaml:"thirdPrometheusPort"` } `yaml:"prometheus"` Notification notification `yaml:"notification"` } diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index fee5efbe9..2a877d69b 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -35,6 +35,16 @@ const ( DefaultFolderPath = "../config/" ) +// return absolude path join ../config/, this is k8s container config path +func GetDefaultConfigPath() string { + b, err := filepath.Abs(os.Args[0]) + if err != nil { + fmt.Println("filepath.Abs error,err=", err) + return "" + } + return filepath.Join(filepath.Dir(b), "../config/") +} + // getProjectRoot returns the absolute path of the project root directory func GetProjectRoot() string { b, _ := filepath.Abs(os.Args[0]) @@ -65,9 +75,11 @@ func initConfig(config interface{}, configName, configFolderPath string) error { _, err := os.Stat(configFolderPath) if err != nil { if !os.IsNotExist(err) { + fmt.Println("stat config path error:", err.Error()) return fmt.Errorf("stat config path error: %w", err) } configFolderPath = filepath.Join(GetProjectRoot(), "config", configName) + fmt.Println("flag's path,enviment's path,default path all is not exist,using project path:", configFolderPath) } data, err := os.ReadFile(configFolderPath) if err != nil { @@ -86,7 +98,7 @@ func InitConfig(configFolderPath string) error { if envConfigPath != "" { configFolderPath = envConfigPath } else { - configFolderPath = DefaultFolderPath + configFolderPath = GetDefaultConfigPath() } } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index af678f92c..e3b6559dc 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -17,6 +17,7 @@ package controller import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "time" "github.com/redis/go-redis/v9" @@ -30,8 +31,6 @@ import ( unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" - "github.com/openimsdk/open-im-server/v3/pkg/common/prome" - "go.mongodb.org/mongo-driver/mongo" pbmsg "github.com/OpenIMSDK/protocol/msg" @@ -355,10 +354,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { - prome.Inc(prome.SeqGetFailedCounter) + log.ZError(ctx, "db.cache.GetMaxSeq", err) return 0, false, err } - prome.Inc(prome.SeqGetSuccessCounter) lenList := len(msgs) if int64(lenList) > db.msg.GetSingleGocMsgNum() { return 0, false, errors.New("too large") @@ -378,23 +376,20 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs) if err != nil { - prome.Add(prome.MsgInsertRedisFailedCounter, failedNum) + prom_metrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) } else { - prome.Inc(prome.MsgInsertRedisSuccessCounter) + prom_metrics.MsgInsertRedisSuccessCounter.Inc() } err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) if err != nil { - prome.Inc(prome.SeqSetFailedCounter) - } else { - prome.Inc(prome.SeqSetSuccessCounter) + log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID) + prom_metrics.SeqSetFailedCounter.Inc() } err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) if err != nil { log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) - prome.Inc(prome.SeqSetFailedCounter) - } else { - prome.Inc(prome.SeqSetSuccessCounter) + prom_metrics.SeqSetFailedCounter.Inc() } return lastMaxSeq, isNew, utils.Wrap(err, "") } @@ -493,7 +488,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) if err != nil { if err != redis.Nil { - prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) + log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) } } @@ -530,7 +525,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache) if err != nil { if err != redis.Nil { - prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2)) + log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache) } } @@ -543,14 +538,14 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs) } // get from cache or db - prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) + if len(failedSeqs) > 0 { mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end) if err != nil { - prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + return 0, 0, nil, err } - prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) + successMsgs = append(successMsgs, mongoMsgs...) } @@ -582,7 +577,6 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs) if err != nil { if err != redis.Nil { - prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) } } @@ -602,14 +596,14 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co "conversationID", conversationID, ) - prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) + if len(failedSeqs) > 0 { mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) if err != nil { - prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + return 0, 0, nil, err } - prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) + successMsgs = append(successMsgs, mongoMsgs...) } return minSeq, maxSeq, successMsgs, nil diff --git a/pkg/common/ginPrometheus/ginPrometheus.go b/pkg/common/ginPrometheus/ginPrometheus.go new file mode 100644 index 000000000..3f7cd65c4 --- /dev/null +++ b/pkg/common/ginPrometheus/ginPrometheus.go @@ -0,0 +1,417 @@ +package ginPrometheus + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "os" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var defaultMetricPath = "/metrics" + +// counter, counter_vec, gauge, gauge_vec, +// histogram, histogram_vec, summary, summary_vec +var reqCnt = &Metric{ + ID: "reqCnt", + Name: "requests_total", + Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", + Type: "counter_vec", + Args: []string{"code", "method", "handler", "host", "url"}} + +var reqDur = &Metric{ + ID: "reqDur", + Name: "request_duration_seconds", + Description: "The HTTP request latencies in seconds.", + Type: "histogram_vec", + Args: []string{"code", "method", "url"}, +} + +var resSz = &Metric{ + ID: "resSz", + Name: "response_size_bytes", + Description: "The HTTP response sizes in bytes.", + Type: "summary"} + +var reqSz = &Metric{ + ID: "reqSz", + Name: "request_size_bytes", + Description: "The HTTP request sizes in bytes.", + Type: "summary"} + +var standardMetrics = []*Metric{ + reqCnt, + reqDur, + resSz, + reqSz, +} + +/* +RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control +the cardinality of the request counter's "url" label, which might be required in some contexts. +For instance, if for a "/customer/:name" route you don't want to generate a time series for every +possible customer name, you could use this function: + + func(c *gin.Context) string { + url := c.Request.URL.Path + for _, p := range c.Params { + if p.Key == "name" { + url = strings.Replace(url, p.Value, ":name", 1) + break + } + } + return url + } + +which would map "/customer/alice" and "/customer/bob" to their template "/customer/:name". +*/ +type RequestCounterURLLabelMappingFn func(c *gin.Context) string + +// Metric is a definition for the name, description, type, ID, and +// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric +type Metric struct { + MetricCollector prometheus.Collector + ID string + Name string + Description string + Type string + Args []string +} + +// Prometheus contains the metrics gathered by the instance and its path +type Prometheus struct { + reqCnt *prometheus.CounterVec + reqDur *prometheus.HistogramVec + reqSz, resSz prometheus.Summary + router *gin.Engine + listenAddress string + Ppg PrometheusPushGateway + + MetricsList []*Metric + MetricsPath string + + ReqCntURLLabelMappingFn RequestCounterURLLabelMappingFn + + // gin.Context string to use as a prometheus URL label + URLLabelFromContext string +} + +// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional) +type PrometheusPushGateway struct { + + // Push interval in seconds + PushIntervalSeconds time.Duration + + // Push Gateway URL in format http://domain:port + // where JOBNAME can be any string of your choice + PushGatewayURL string + + // Local metrics URL where metrics are fetched from, this could be ommited in the future + // if implemented using prometheus common/expfmt instead + MetricsURL string + + // pushgateway job name, defaults to "gin" + Job string +} + +// NewPrometheus generates a new set of metrics with a certain subsystem name +func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus { + subsystem = "app" + + var metricsList []*Metric + + if len(customMetricsList) > 1 { + panic("Too many args. NewPrometheus( string, ).") + } else if len(customMetricsList) == 1 { + metricsList = customMetricsList[0] + } + + for _, metric := range standardMetrics { + metricsList = append(metricsList, metric) + } + + p := &Prometheus{ + MetricsList: metricsList, + MetricsPath: defaultMetricPath, + ReqCntURLLabelMappingFn: func(c *gin.Context) string { + return c.Request.URL.Path + }, + } + + p.registerMetrics(subsystem) + + return p +} + +// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL +// every pushIntervalSeconds. Metrics are fetched from metricsURL +func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) { + p.Ppg.PushGatewayURL = pushGatewayURL + p.Ppg.MetricsURL = metricsURL + p.Ppg.PushIntervalSeconds = pushIntervalSeconds + p.startPushTicker() +} + +// SetPushGatewayJob job name, defaults to "gin" +func (p *Prometheus) SetPushGatewayJob(j string) { + p.Ppg.Job = j +} + +// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the +// same address of the gin engine that is being used +func (p *Prometheus) SetListenAddress(address string) { + p.listenAddress = address + if p.listenAddress != "" { + p.router = gin.Default() + } +} + +// SetListenAddressWithRouter for using a separate router to expose metrics. (this keeps things like GET /metrics out of +// your content's access log). +func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Engine) { + p.listenAddress = listenAddress + if len(p.listenAddress) > 0 { + p.router = r + } +} + +// SetMetricsPath set metrics paths +func (p *Prometheus) SetMetricsPath(e *gin.Engine) { + + if p.listenAddress != "" { + p.router.GET(p.MetricsPath, prometheusHandler()) + p.runServer() + } else { + e.GET(p.MetricsPath, prometheusHandler()) + } +} + +// SetMetricsPathWithAuth set metrics paths with authentication +func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) { + + if p.listenAddress != "" { + p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) + p.runServer() + } else { + e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) + } + +} + +func (p *Prometheus) runServer() { + if p.listenAddress != "" { + go p.router.Run(p.listenAddress) + } +} + +func (p *Prometheus) getMetrics() []byte { + response, _ := http.Get(p.Ppg.MetricsURL) + + defer response.Body.Close() + body, _ := ioutil.ReadAll(response.Body) + + return body +} + +func (p *Prometheus) getPushGatewayURL() string { + h, _ := os.Hostname() + if p.Ppg.Job == "" { + p.Ppg.Job = "gin" + } + return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + h +} + +func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) { + req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics)) + client := &http.Client{} + if _, err = client.Do(req); err != nil { + fmt.Println("Error sending to push gateway error:", err.Error()) + } +} + +func (p *Prometheus) startPushTicker() { + ticker := time.NewTicker(time.Second * p.Ppg.PushIntervalSeconds) + go func() { + for range ticker.C { + p.sendMetricsToPushGateway(p.getMetrics()) + } + }() +} + +// NewMetric associates prometheus.Collector based on Metric.Type +func NewMetric(m *Metric, subsystem string) prometheus.Collector { + var metric prometheus.Collector + switch m.Type { + case "counter_vec": + metric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "counter": + metric = prometheus.NewCounter( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + case "gauge_vec": + metric = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "gauge": + metric = prometheus.NewGauge( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + case "histogram_vec": + metric = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "histogram": + metric = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + case "summary_vec": + metric = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + m.Args, + ) + case "summary": + metric = prometheus.NewSummary( + prometheus.SummaryOpts{ + Subsystem: subsystem, + Name: m.Name, + Help: m.Description, + }, + ) + } + return metric +} + +func (p *Prometheus) registerMetrics(subsystem string) { + + for _, metricDef := range p.MetricsList { + metric := NewMetric(metricDef, subsystem) + if err := prometheus.Register(metric); err != nil { + fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error()) + } + switch metricDef { + case reqCnt: + p.reqCnt = metric.(*prometheus.CounterVec) + case reqDur: + p.reqDur = metric.(*prometheus.HistogramVec) + case resSz: + p.resSz = metric.(prometheus.Summary) + case reqSz: + p.reqSz = metric.(prometheus.Summary) + } + metricDef.MetricCollector = metric + } +} + +// Use adds the middleware to a gin engine. +func (p *Prometheus) Use(e *gin.Engine) { + e.Use(p.HandlerFunc()) + p.SetMetricsPath(e) +} + +// UseWithAuth adds the middleware to a gin engine with BasicAuth. +func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) { + e.Use(p.HandlerFunc()) + p.SetMetricsPathWithAuth(e, accounts) +} + +// HandlerFunc defines handler function for middleware +func (p *Prometheus) HandlerFunc() gin.HandlerFunc { + return func(c *gin.Context) { + if c.Request.URL.Path == p.MetricsPath { + c.Next() + return + } + + start := time.Now() + reqSz := computeApproximateRequestSize(c.Request) + + c.Next() + + status := strconv.Itoa(c.Writer.Status()) + elapsed := float64(time.Since(start)) / float64(time.Second) + resSz := float64(c.Writer.Size()) + + url := p.ReqCntURLLabelMappingFn(c) + if len(p.URLLabelFromContext) > 0 { + u, found := c.Get(p.URLLabelFromContext) + if !found { + u = "unknown" + } + url = u.(string) + } + p.reqDur.WithLabelValues(status, c.Request.Method, url).Observe(elapsed) + p.reqCnt.WithLabelValues(status, c.Request.Method, c.HandlerName(), c.Request.Host, url).Inc() + p.reqSz.Observe(float64(reqSz)) + p.resSz.Observe(resSz) + } +} + +func prometheusHandler() gin.HandlerFunc { + h := promhttp.Handler() + return func(c *gin.Context) { + h.ServeHTTP(c.Writer, c.Request) + } +} + +func computeApproximateRequestSize(r *http.Request) int { + s := 0 + if r.URL != nil { + s = len(r.URL.Path) + } + + s += len(r.Method) + s += len(r.Proto) + for name, values := range r.Header { + s += len(name) + for _, value := range values { + s += len(value) + } + } + s += len(r.Host) + + // r.Form and r.MultipartForm are assumed to be included in r.URL. + + if r.ContentLength != -1 { + s += int(r.ContentLength) + } + return s +} diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 4a52d2bef..1766afa97 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -28,8 +28,6 @@ import ( "github.com/IBM/sarama" "google.golang.org/protobuf/proto" - - prome "github.com/openimsdk/open-im-server/v3/pkg/common/prome" ) const ( @@ -131,8 +129,8 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag kMsg.Headers = header partition, offset, err := p.producer.SendMessage(kMsg) log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length()) - if err == nil { - prome.Inc(prome.SendMsgCounter) + if err != nil { + log.ZWarn(ctx, "p.producer.SendMessage error", err) } return partition, offset, utils.Wrap(err, "") } diff --git a/pkg/common/prom_metrics/func.go b/pkg/common/prom_metrics/func.go new file mode 100644 index 000000000..e451c441b --- /dev/null +++ b/pkg/common/prom_metrics/func.go @@ -0,0 +1,45 @@ +package prom_metrics + +import ( + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" +) + +func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *grpc_prometheus.ServerMetrics, error) { + //////////////////////////////////////////////////////// + reg := prometheus.NewRegistry() + grpcMetrics := grpc_prometheus.NewServerMetrics() + grpcMetrics.EnableHandlingTimeHistogram() + cusMetrics = append(cusMetrics, grpcMetrics, collectors.NewGoCollector()) + reg.MustRegister(cusMetrics...) + return reg, grpcMetrics, nil +} + +func GetGrpcCusMetrics(registerName string) []prometheus.Collector { + switch registerName { + case config2.Config.RpcRegisterName.OpenImMessageGatewayName: + return []prometheus.Collector{OnlineUserGauge} + case config2.Config.RpcRegisterName.OpenImMsgName: + return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} + case "Transfer": + return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter} + case config2.Config.RpcRegisterName.OpenImPushName: + return []prometheus.Collector{MsgOfflinePushFailedCounter} + case config2.Config.RpcRegisterName.OpenImAuthName: + return []prometheus.Collector{UserLoginCounter} + default: + return nil + } +} + +func GetGinCusMetrics(name string) []*ginPrometheus.Metric { + switch name { + case "Api": + return []*ginPrometheus.Metric{ApiCustomCnt} + default: + return []*ginPrometheus.Metric{ApiCustomCnt} + } +} diff --git a/pkg/common/prom_metrics/gin-api.go b/pkg/common/prom_metrics/gin-api.go new file mode 100644 index 000000000..7aa3f959e --- /dev/null +++ b/pkg/common/prom_metrics/gin-api.go @@ -0,0 +1,16 @@ +package prom_metrics + +import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus" + +/* +labels := prometheus.Labels{"label_one": "any", "label_two": "value"} +ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc() +*/ +var ( + ApiCustomCnt = &ginProm.Metric{ + Name: "custom_total", + Description: "Custom counter events.", + Type: "counter_vec", + Args: []string{"label_one", "label_two"}, + } +) diff --git a/pkg/common/prom_metrics/grpc-auth.go b/pkg/common/prom_metrics/grpc-auth.go new file mode 100644 index 000000000..7ca5f1f49 --- /dev/null +++ b/pkg/common/prom_metrics/grpc-auth.go @@ -0,0 +1,12 @@ +package prom_metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + UserLoginCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "user_login_total", + Help: "The number of user login", + }) +) diff --git a/pkg/common/prom_metrics/grpc-msg.go b/pkg/common/prom_metrics/grpc-msg.go new file mode 100644 index 000000000..14cb4d858 --- /dev/null +++ b/pkg/common/prom_metrics/grpc-msg.go @@ -0,0 +1,24 @@ +package prom_metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + SingleChatMsgProcessSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_process_success_total", + Help: "The number of single chat msg successful processed", + }) + SingleChatMsgProcessFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_process_failed_total", + Help: "The number of single chat msg failed processed", + }) + GroupChatMsgProcessSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_process_success_total", + Help: "The number of group chat msg successful processed", + }) + GroupChatMsgProcessFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_process_failed_total", + Help: "The number of group chat msg failed processed", + }) +) diff --git a/pkg/common/prom_metrics/grpc-msggateway.go b/pkg/common/prom_metrics/grpc-msggateway.go new file mode 100644 index 000000000..add72e391 --- /dev/null +++ b/pkg/common/prom_metrics/grpc-msggateway.go @@ -0,0 +1,12 @@ +package prom_metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + OnlineUserGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "online_user_num", + Help: "The number of online user num", + }) +) diff --git a/pkg/common/prom_metrics/grpc_push.go b/pkg/common/prom_metrics/grpc_push.go new file mode 100644 index 000000000..c05dd6180 --- /dev/null +++ b/pkg/common/prom_metrics/grpc_push.go @@ -0,0 +1,12 @@ +package prom_metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + MsgOfflinePushFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "msg_offline_push_failed_total", + Help: "The number of msg failed offline pushed", + }) +) diff --git a/pkg/common/prom_metrics/transfer.go b/pkg/common/prom_metrics/transfer.go new file mode 100644 index 000000000..d3fec47d9 --- /dev/null +++ b/pkg/common/prom_metrics/transfer.go @@ -0,0 +1,28 @@ +package prom_metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + MsgInsertRedisSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "msg_insert_redis_success_total", + Help: "The number of successful insert msg to redis", + }) + MsgInsertRedisFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "msg_insert_redis_failed_total", + Help: "The number of failed insert msg to redis", + }) + MsgInsertMongoSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "msg_insert_mongo_success_total", + Help: "The number of successful insert msg to mongo", + }) + MsgInsertMongoFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "msg_insert_mongo_failed_total", + Help: "The number of failed insert msg to mongo", + }) + SeqSetFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "seq_set_failed_total", + Help: "The number of failed set seq", + }) +) diff --git a/pkg/common/prome/doc.go b/pkg/common/prome/doc.go deleted file mode 100644 index 7058c711c..000000000 --- a/pkg/common/prome/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prome // import "github.com/openimsdk/open-im-server/v3/pkg/common/prome" diff --git a/pkg/common/prome/gather.go b/pkg/common/prome/gather.go deleted file mode 100644 index eb4bc6c3b..000000000 --- a/pkg/common/prome/gather.go +++ /dev/null @@ -1,470 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prome - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var ( - // auth rpc. - UserLoginCounter prometheus.Counter - UserRegisterCounter prometheus.Counter - - // seg. - SeqGetSuccessCounter prometheus.Counter - SeqGetFailedCounter prometheus.Counter - SeqSetSuccessCounter prometheus.Counter - SeqSetFailedCounter prometheus.Counter - - // msg-db. - MsgInsertRedisSuccessCounter prometheus.Counter - MsgInsertRedisFailedCounter prometheus.Counter - MsgInsertMongoSuccessCounter prometheus.Counter - MsgInsertMongoFailedCounter prometheus.Counter - MsgPullFromRedisSuccessCounter prometheus.Counter - MsgPullFromRedisFailedCounter prometheus.Counter - MsgPullFromMongoSuccessCounter prometheus.Counter - MsgPullFromMongoFailedCounter prometheus.Counter - - // msg-ws. - MsgRecvTotalCounter prometheus.Counter - GetNewestSeqTotalCounter prometheus.Counter - PullMsgBySeqListTotalCounter prometheus.Counter - - SingleChatMsgRecvSuccessCounter prometheus.Counter - GroupChatMsgRecvSuccessCounter prometheus.Counter - WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter - OnlineUserGauge prometheus.Gauge - - // msg-msg. - SingleChatMsgProcessSuccessCounter prometheus.Counter - SingleChatMsgProcessFailedCounter prometheus.Counter - GroupChatMsgProcessSuccessCounter prometheus.Counter - GroupChatMsgProcessFailedCounter prometheus.Counter - WorkSuperGroupChatMsgProcessSuccessCounter prometheus.Counter - WorkSuperGroupChatMsgProcessFailedCounter prometheus.Counter - - // msg-push. - MsgOnlinePushSuccessCounter prometheus.Counter - MsgOfflinePushSuccessCounter prometheus.Counter - MsgOfflinePushFailedCounter prometheus.Counter - // api. - ApiRequestCounter prometheus.Counter - ApiRequestSuccessCounter prometheus.Counter - ApiRequestFailedCounter prometheus.Counter - - // grpc. - GrpcRequestCounter prometheus.Counter - GrpcRequestSuccessCounter prometheus.Counter - GrpcRequestFailedCounter prometheus.Counter - - SendMsgCounter prometheus.Counter - - // conversation. - ConversationCreateSuccessCounter prometheus.Counter - ConversationCreateFailedCounter prometheus.Counter -) - -func NewUserLoginCounter() { - if UserLoginCounter != nil { - return - } - UserLoginCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "user_login", - Help: "The number of user login", - }) -} - -func NewUserRegisterCounter() { - if UserRegisterCounter != nil { - return - } - UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "user_register", - Help: "The number of user register", - }) -} - -func NewSeqGetSuccessCounter() { - if SeqGetSuccessCounter != nil { - return - } - SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "seq_get_success", - Help: "The number of successful get seq", - }) -} - -func NewSeqGetFailedCounter() { - if SeqGetFailedCounter != nil { - return - } - SeqGetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "seq_get_failed", - Help: "The number of failed get seq", - }) -} - -func NewSeqSetSuccessCounter() { - if SeqSetSuccessCounter != nil { - return - } - SeqSetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "seq_set_success", - Help: "The number of successful set seq", - }) -} - -func NewSeqSetFailedCounter() { - if SeqSetFailedCounter != nil { - return - } - SeqSetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "seq_set_failed", - Help: "The number of failed set seq", - }) -} - -func NewApiRequestCounter() { - if ApiRequestCounter != nil { - return - } - ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "api_request", - Help: "The number of api request", - }) -} - -func NewApiRequestSuccessCounter() { - if ApiRequestSuccessCounter != nil { - return - } - ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "api_request_success", - Help: "The number of api request success", - }) -} - -func NewApiRequestFailedCounter() { - if ApiRequestFailedCounter != nil { - return - } - ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "api_request_failed", - Help: "The number of api request failed", - }) -} - -func NewGrpcRequestCounter() { - if GrpcRequestCounter != nil { - return - } - GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "grpc_request", - Help: "The number of api request", - }) -} - -func NewGrpcRequestSuccessCounter() { - if GrpcRequestSuccessCounter != nil { - return - } - GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "grpc_request_success", - Help: "The number of grpc request success", - }) -} - -func NewGrpcRequestFailedCounter() { - if GrpcRequestFailedCounter != nil { - return - } - GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "grpc_request_failed", - Help: "The number of grpc request failed", - }) -} - -func NewSendMsgCount() { - if SendMsgCounter != nil { - return - } - SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "send_msg", - Help: "The number of send msg", - }) -} - -func NewMsgInsertRedisSuccessCounter() { - if MsgInsertRedisSuccessCounter != nil { - return - } - MsgInsertRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_insert_redis_success", - Help: "The number of successful insert msg to redis", - }) -} - -func NewMsgInsertRedisFailedCounter() { - if MsgInsertRedisFailedCounter != nil { - return - } - MsgInsertRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_insert_redis_failed", - Help: "The number of failed insert msg to redis", - }) -} - -func NewMsgInsertMongoSuccessCounter() { - if MsgInsertMongoSuccessCounter != nil { - return - } - MsgInsertMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_insert_mongo_success", - Help: "The number of successful insert msg to mongo", - }) -} - -func NewMsgInsertMongoFailedCounter() { - if MsgInsertMongoFailedCounter != nil { - return - } - MsgInsertMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_insert_mongo_failed", - Help: "The number of failed insert msg to mongo", - }) -} - -func NewMsgPullFromRedisSuccessCounter() { - if MsgPullFromRedisSuccessCounter != nil { - return - } - MsgPullFromRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_pull_from_redis_success", - Help: "The number of successful pull msg from redis", - }) -} - -func NewMsgPullFromRedisFailedCounter() { - if MsgPullFromRedisFailedCounter != nil { - return - } - MsgPullFromRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_pull_from_redis_failed", - Help: "The number of failed pull msg from redis", - }) -} - -func NewMsgPullFromMongoSuccessCounter() { - if MsgPullFromMongoSuccessCounter != nil { - return - } - MsgPullFromMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_pull_from_mongo_success", - Help: "The number of successful pull msg from mongo", - }) -} - -func NewMsgPullFromMongoFailedCounter() { - if MsgPullFromMongoFailedCounter != nil { - return - } - MsgPullFromMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_pull_from_mongo_failed", - Help: "The number of failed pull msg from mongo", - }) -} - -func NewMsgRecvTotalCounter() { - if MsgRecvTotalCounter != nil { - return - } - MsgRecvTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_recv_total", - Help: "The number of msg received", - }) -} - -func NewGetNewestSeqTotalCounter() { - if GetNewestSeqTotalCounter != nil { - return - } - GetNewestSeqTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "get_newest_seq_total", - Help: "the number of get newest seq", - }) -} - -func NewPullMsgBySeqListTotalCounter() { - if PullMsgBySeqListTotalCounter != nil { - return - } - PullMsgBySeqListTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "pull_msg_by_seq_list_total", - Help: "The number of pull msg by seq list", - }) -} - -func NewSingleChatMsgRecvSuccessCounter() { - if SingleChatMsgRecvSuccessCounter != nil { - return - } - SingleChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "single_chat_msg_recv_success", - Help: "The number of single chat msg successful received ", - }) -} - -func NewGroupChatMsgRecvSuccessCounter() { - if GroupChatMsgRecvSuccessCounter != nil { - return - } - GroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "group_chat_msg_recv_success", - Help: "The number of group chat msg successful received", - }) -} - -func NewWorkSuperGroupChatMsgRecvSuccessCounter() { - if WorkSuperGroupChatMsgRecvSuccessCounter != nil { - return - } - WorkSuperGroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "work_super_group_chat_msg_recv_success", - Help: "The number of work/super group chat msg successful received", - }) -} - -func NewOnlineUserGauges() { - if OnlineUserGauge != nil { - return - } - OnlineUserGauge = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "online_user_num", - Help: "The number of online user num", - }) -} - -func NewSingleChatMsgProcessSuccessCounter() { - if SingleChatMsgProcessSuccessCounter != nil { - return - } - SingleChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "single_chat_msg_process_success", - Help: "The number of single chat msg successful processed", - }) -} - -func NewSingleChatMsgProcessFailedCounter() { - if SingleChatMsgProcessFailedCounter != nil { - return - } - SingleChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "single_chat_msg_process_failed", - Help: "The number of single chat msg failed processed", - }) -} - -func NewGroupChatMsgProcessSuccessCounter() { - if GroupChatMsgProcessSuccessCounter != nil { - return - } - GroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "group_chat_msg_process_success", - Help: "The number of group chat msg successful processed", - }) -} - -func NewGroupChatMsgProcessFailedCounter() { - if GroupChatMsgProcessFailedCounter != nil { - return - } - GroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "group_chat_msg_process_failed", - Help: "The number of group chat msg failed processed", - }) -} - -func NewWorkSuperGroupChatMsgProcessSuccessCounter() { - if WorkSuperGroupChatMsgProcessSuccessCounter != nil { - return - } - WorkSuperGroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "work_super_group_chat_msg_process_success", - Help: "The number of work/super group chat msg successful processed", - }) -} - -func NewWorkSuperGroupChatMsgProcessFailedCounter() { - if WorkSuperGroupChatMsgProcessFailedCounter != nil { - return - } - WorkSuperGroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "work_super_group_chat_msg_process_failed", - Help: "The number of work/super group chat msg failed processed", - }) -} - -func NewMsgOnlinePushSuccessCounter() { - if MsgOnlinePushSuccessCounter != nil { - return - } - MsgOnlinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_online_push_success", - Help: "The number of msg successful online pushed", - }) -} - -func NewMsgOfflinePushSuccessCounter() { - if MsgOfflinePushSuccessCounter != nil { - return - } - MsgOfflinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_offline_push_success", - Help: "The number of msg successful offline pushed", - }) -} - -func NewMsgOfflinePushFailedCounter() { - if MsgOfflinePushFailedCounter != nil { - return - } - MsgOfflinePushFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "msg_offline_push_failed", - Help: "The number of msg failed offline pushed", - }) -} - -func NewConversationCreateSuccessCounter() { - if ConversationCreateSuccessCounter != nil { - return - } - ConversationCreateSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "conversation_push_success", - Help: "The number of conversation successful pushed", - }) -} - -func NewConversationCreateFailedCounter() { - if ConversationCreateFailedCounter != nil { - return - } - ConversationCreateFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "conversation_push_failed", - Help: "The number of conversation failed pushed", - }) -} diff --git a/pkg/common/prome/prometheus.go b/pkg/common/prome/prometheus.go deleted file mode 100644 index 254a6c9ea..000000000 --- a/pkg/common/prome/prometheus.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prome - -import ( - "bytes" - "net/http" - "strconv" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - - "github.com/gin-gonic/gin" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -func StartPrometheusSrv(prometheusPort int) error { - if config.Config.Prometheus.Enable { - http.Handle("/metrics", promhttp.Handler()) - err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil) - return err - } - return nil -} - -func PrometheusHandler() gin.HandlerFunc { - h := promhttp.Handler() - return func(c *gin.Context) { - h.ServeHTTP(c.Writer, c.Request) - } -} - -type responseBodyWriter struct { - gin.ResponseWriter - body *bytes.Buffer -} - -func (r responseBodyWriter) Write(b []byte) (int, error) { - r.body.Write(b) - return r.ResponseWriter.Write(b) -} - -func PrometheusMiddleware(c *gin.Context) { - Inc(ApiRequestCounter) - w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer} - c.Writer = w - c.Next() - if c.Writer.Status() == http.StatusOK { - Inc(ApiRequestSuccessCounter) - } else { - Inc(ApiRequestFailedCounter) - } -} - -func Inc(counter prometheus.Counter) { - if config.Config.Prometheus.Enable { - if counter != nil { - counter.Inc() - } - } -} - -func Add(counter prometheus.Counter, add int) { - if config.Config.Prometheus.Enable { - if counter != nil { - counter.Add(float64(add)) - } - } -} - -func GaugeInc(gauges prometheus.Gauge) { - if config.Config.Prometheus.Enable { - if gauges != nil { - gauges.Inc() - } - } -} - -func GaugeDec(gauges prometheus.Gauge) { - if config.Config.Prometheus.Enable { - if gauges != nil { - gauges.Dec() - } - } -} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index f04ab2508..975d21246 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,7 +16,12 @@ package startrpc import ( "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "log" "net" + "net/http" "strconv" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -29,7 +34,6 @@ import ( "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/network" - "github.com/OpenIMSDK/tools/prome" "github.com/OpenIMSDK/tools/utils" ) @@ -41,7 +45,7 @@ func Start( rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption, ) error { - fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s", + fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n", rpcRegisterName, rpcPort, prometheusPort, config.Version) listener, err := net.Listen( "tcp", @@ -61,16 +65,15 @@ func Start( if err != nil { return err } + var reg *prometheus.Registry + var metric *grpcprometheus.ServerMetrics // ctx 中间件 if config.Config.Prometheus.Enable { - prome.NewGrpcRequestCounter() - prome.NewGrpcRequestFailedCounter() - prome.NewGrpcRequestSuccessCounter() - unaryInterceptor := mw.InterceptChain(grpcprometheus.UnaryServerInterceptor, mw.RpcServerInterceptor) - options = append(options, []grpc.ServerOption{ - grpc.StreamInterceptor(grpcprometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(unaryInterceptor), - }...) + ////////////////////////// + cusMetrics := prom_metrics.GetGrpcCusMetrics(rpcRegisterName) + reg, metric, err = prom_metrics.NewGrpcPromObj(cusMetrics) + options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), + grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) } else { options = append(options, mw.GrpcServer()) } @@ -91,8 +94,11 @@ func Start( } go func() { if config.Config.Prometheus.Enable && prometheusPort != 0 { - if err := prome.StartPrometheusSrv(prometheusPort); err != nil { - panic(err.Error()) + metric.InitializeMetrics(srv) + // Create a HTTP server for prometheus. + httpServer := &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} + if err := httpServer.ListenAndServe(); err != nil { + log.Fatal("Unable to start a http server.") } } }() diff --git a/scripts/install/environment.sh b/scripts/install/environment.sh index d1ef03197..09fb94e7d 100755 --- a/scripts/install/environment.sh +++ b/scripts/install/environment.sh @@ -341,6 +341,9 @@ def "IOS_PRODUCTION" "false" # IOS生产 ###################### Prometheus 配置信息 ###################### def "PROMETHEUS_ENABLE" "false" # 是否启用 Prometheus +def "PROMETHEUS_URL" "/prometheus" +# Api 服务的 Prometheus 端口 +readonly API_PROM_PORT=${API_PROM_PORT:-'20100'} # User 服务的 Prometheus 端口 readonly USER_PROM_PORT=${USER_PROM_PORT:-'20110'} # Friend 服务的 Prometheus 端口 diff --git a/scripts/install/openim-msgtransfer.sh b/scripts/install/openim-msgtransfer.sh index b28ca7efa..08a7d3ec7 100755 --- a/scripts/install/openim-msgtransfer.sh +++ b/scripts/install/openim-msgtransfer.sh @@ -49,13 +49,13 @@ function openim::msgtransfer::start() openim::log::error_exit "OPENIM_MSGGATEWAY_NUM must be equal to the number of MSG_TRANSFER_PROM_PORTS" fi - for (( i=1; i<=$OPENIM_MSGGATEWAY_NUM; i++ )) do + for (( i=0; i<$OPENIM_MSGGATEWAY_NUM; i++ )) do openim::log::info "prometheus port: ${MSG_TRANSFER_PROM_PORTS[$i]}" PROMETHEUS_PORT_OPTION="" if [[ -n "${OPENIM_PROMETHEUS_PORTS[$i]}" ]]; then PROMETHEUS_PORT_OPTION="--prometheus_port ${OPENIM_PROMETHEUS_PORTS[$i]}" fi - nohup ${OPENIM_MSGTRANSFER_BINARY} ${PROMETHEUS_PORT_OPTION} -c ${OPENIM_MSGTRANSFER_CONFIG} >> ${LOG_FILE} 2>&1 & + nohup ${OPENIM_MSGTRANSFER_BINARY} ${PROMETHEUS_PORT_OPTION} -c ${OPENIM_MSGTRANSFER_CONFIG} -n ${i}>> ${LOG_FILE} 2>&1 & done openim::util::check_process_names "${OPENIM_OUTPUT_HOSTBIN}/${SERVER_NAME}"