Add Prometheus monitoring function (#1337)

* Code adaptation k8s: service discovery and registration adaptation, configuration adaptation

* Initial submission of the help charts script for openim API

* change the help charts script

* change the help charts script

* change helm chart codes

* change dockerfiles script

* change chart script:add configmap mounts

* change chart script:change repository

* change chart script:msggateway add one service

* change config.yaml

* roll back some config values

* change chart script:change Ingress rule with a rewrite annotation

* add mysql charts scrible

* change chart script:add mysql.config.yaml

* add nfs provisioner charts

* change chart script:add nfs.config.yaml

* add ingress-nginx charts

* change chart script:add ingress-nginx.config.yaml

* add redis &mongodb charts

* add kafka&minio charts

* change chart script:change redis.values.yaml

* change chart script:add redis.config.yaml

* change chart script:change redis.config.yaml

* change chart script:change mongodb.value.yaml

* change chart script:change mongodb.value.yaml

* change chart script:add mongodb.config.yaml

* change chart script:change minio.values.yaml

* change chart script:add minio.config.yaml

* change chart script:change kafka.values.yaml

* change chart script:add kafka.config.yaml

* change chart script:change services.config.yaml

* bug fix:Delete websocket's Port restrictions

* bug fix:change port value

* change chart script:Submit a stable version script

* fix bug:Implement option interface

* fix bug:change K8sDR.Register

* change config.yaml

* change chats script:minio service add ingress

* change chats script:minio service add ingress

* change chats script:kafka.replicaCount=3& change minio.api ingress

* delete change chats script

* change config.yaml

* change openim.yaml

* merge go.sum

* Add monitoring function and struct for Prometheus on gin and GRPC

* Add GRPC and gin server monitoring logic

* Add GRPC and gin server monitoring logic2

* Add GRPC and gin server monitoring logic3

* Add GRPC and gin server monitoring logic4

* Add GRPC and gin server monitoring logic5

* Add GRPC and gin server monitoring logic6

* Add GRPC and gin server monitoring logic7

* delete:old monitoring code

* add for test

* fix bug:change packname

* fix bug:delete getPromPort funciton

* fix bug:delete getPromPort funciton

* fix bug:change logs

* fix bug:change registerName logic in GetGrpcCusMetrics function

* add getPrometheus url api

* fix:config path logic

* fix:prometheus enable function

* fix:prometheus enable function

* fix:transfer Multi process monitoring logic

* del:del not using manifest

* fix:openim-msgtransfer.sh

* fix:openim-msgtransfer.sh

---------

Co-authored-by: lin.huang <lin.huang@apulis.com>
Co-authored-by: Xinwei Xiong <3293172751@qq.com>
pull/1353/head
xuexihuang 8 months ago committed by GitHub
parent 297a8db788
commit 82a8f3351f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,8 @@ package main
import (
"context"
"fmt"
ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"net"
_ "net/http/pprof"
"strconv"
@ -43,11 +45,11 @@ func main() {
}
}
func run(port int) error {
log.ZInfo(context.Background(), "Openim api port:", "port", port)
func run(port int, proPort int) error {
log.ZInfo(context.Background(), "Openim api port:", "port", port, "proPort", proPort)
if port == 0 {
err := "port is empty"
if port == 0 || proPort == 0 {
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
log.ZError(context.Background(), err, nil)
return fmt.Errorf(err)
@ -82,6 +84,13 @@ func run(port int) error {
}
log.ZInfo(context.Background(), "api register public config to discov success")
router := api.NewGinRouter(client, rdb)
//////////////////////////////
if config.Config.Prometheus.Enable {
p := ginProm.NewPrometheus("app", prom_metrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
p.Use(router)
}
/////////////////////////////////
log.ZInfo(context.Background(), "api init router success")
var address string
if config.Config.Api.ListenIP != "" {

@ -21,6 +21,7 @@ import (
func main() {
msgTransferCmd := cmd.NewMsgTransferCmd()
msgTransferCmd.AddPrometheusPortFlag()
msgTransferCmd.AddTransferProgressFlag()
if err := msgTransferCmd.Exec(); err != nil {
panic(err.Error())
}

@ -382,7 +382,9 @@ callback:
# The number of Prometheus ports per service needs to correspond to rpcPort
# The number of ports needs to be consistent with msg_transfer_service_num in script/path_info.sh
prometheus:
enable: false
enable: true
prometheusUrl: "https://openim.prometheus"
apiPrometheusPort: [20100]
userPrometheusPort: [ 20110 ]
friendPrometheusPort: [ 20120 ]
messagePrometheusPort: [ 20130 ]

@ -383,6 +383,8 @@ callback:
# The number of ports needs to be consistent with msg_transfer_service_num in script/path_info.sh
prometheus:
enable: ${PROMETHEUS_ENABLE}
prometheusUrl: ${PROMETHEUS_URL}
apiPrometheusPort: [${API_PROM_PORT}]
userPrometheusPort: [ ${USER_PROM_PORT} ]
friendPrometheusPort: [ ${FRIEND_PROM_PORT} ]
messagePrometheusPort: [ ${MESSAGE_PROM_PORT} ]

@ -37,7 +37,7 @@ require github.com/google/uuid v1.3.1
require (
github.com/IBM/sarama v1.41.3
github.com/OpenIMSDK/protocol v0.0.30
github.com/OpenIMSDK/protocol v0.0.31
github.com/OpenIMSDK/tools v0.0.16
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible

@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.30 h1:MiHO6PyQMR9ojBHNnSFxCHLmsoE2xZqaiYj975JiZnM=
github.com/OpenIMSDK/protocol v0.0.30/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE=
github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.16 h1:te/GIq2imCMsrRPgU9OObYKbzZ3rT08Lih/o+3QFIz0=
github.com/OpenIMSDK/tools v0.0.16/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=

@ -39,7 +39,6 @@ import (
"github.com/OpenIMSDK/tools/mw"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
@ -63,13 +62,6 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc)
if config.Config.Prometheus.Enable {
prome.NewApiRequestCounter()
prome.NewApiRequestFailedCounter()
prome.NewApiRequestSuccessCounter()
r.Use(prome.PrometheusMiddleware)
r.GET("/metrics", prome.PrometheusHandler())
}
ParseToken := GinParseToken(rdb)
userRouterGroup := r.Group("/user")
{
@ -151,6 +143,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
// Third service
thirdGroup := r.Group("/third", ParseToken)
{
thirdGroup.GET("/prometheus", GetPrometheus)
t := NewThirdApi(*thirdRpc)
thirdGroup.POST("/fcm_update_token", t.FcmUpdateToken)
thirdGroup.POST("/set_app_badge", t.SetAppBadge)

@ -15,6 +15,7 @@
package api
import (
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"math/rand"
"net/http"
"strconv"
@ -118,3 +119,7 @@ func (o *ThirdApi) DeleteLogs(c *gin.Context) {
func (o *ThirdApi) SearchLogs(c *gin.Context) {
a2r.Call(third.ThirdClient.SearchLogs, o.Client, c)
}
func GetPrometheus(c *gin.Context) {
c.Redirect(http.StatusFound, config2.Config.Prometheus.PrometheusUrl)
}

@ -33,7 +33,6 @@ import (
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
)
@ -69,9 +68,10 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
s.LongConnServer = LongConnServer
}
func NewServer(rpcPort int, longConnServer LongConnServer) *Server {
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer) *Server {
return &Server{
rpcPort: rpcPort,
prometheusPort: proPort,
LongConnServer: longConnServer,
pushTerminal: []int{constant.IOSPlatformID, constant.AndroidPlatformID},
}
@ -158,7 +158,6 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(
} else {
if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
tempT.OnlinePush = true
prome.Inc(prome.MsgOnlinePushSuccessCounter)
resp = append(resp, temp)
}
}

@ -41,7 +41,7 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
if err != nil {
return err
}
hubServer := NewServer(rpcPort, longServer)
hubServer := NewServer(rpcPort, prometheusPort, longServer)
go func() {
err := hubServer.Start()
if err != nil {

@ -17,6 +17,7 @@ package msggateway
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"net/http"
"strconv"
"sync"
@ -220,6 +221,7 @@ func (ws *WsServer) registerClient(client *Client) {
if !userOK {
ws.clients.Set(client.UserID, client)
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
prom_metrics.OnlineUserGauge.Add(1)
ws.onlineUserNum.Add(1)
ws.onlineUserConnNum.Add(1)
} else {
@ -364,6 +366,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
if isDeleteUser {
ws.onlineUserNum.Add(-1)
prom_metrics.OnlineUserGauge.Dec()
}
ws.onlineUserConnNum.Add(-1)
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)

@ -15,13 +15,18 @@
package msgtransfer
import (
"errors"
"fmt"
"sync"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"log"
"net/http"
"sync"
"github.com/OpenIMSDK/tools/mw"
@ -31,7 +36,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
@ -81,7 +85,6 @@ func StartTransfer(prometheusPort int) error {
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgTransfer := NewMsgTransfer(chatLogDatabase, msgDatabase, &conversationRpcClient, &groupRpcClient)
msgTransfer.initPrometheus()
return msgTransfer.Start(prometheusPort)
}
@ -95,21 +98,13 @@ func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
}
}
func (m *MsgTransfer) initPrometheus() {
prome.NewSeqGetSuccessCounter()
prome.NewSeqGetFailedCounter()
prome.NewSeqSetSuccessCounter()
prome.NewSeqSetFailedCounter()
prome.NewMsgInsertRedisSuccessCounter()
prome.NewMsgInsertRedisFailedCounter()
prome.NewMsgInsertMongoSuccessCounter()
prome.NewMsgInsertMongoFailedCounter()
}
func (m *MsgTransfer) Start(prometheusPort int) error {
var wg sync.WaitGroup
wg.Add(1)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if prometheusPort <= 0 {
return errors.New("prometheusPort not correct")
}
if config.Config.ChatPersistenceMysql {
// go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
} else {
@ -118,10 +113,21 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
err := prome.StartPrometheusSrv(prometheusPort)
/*err := prome.StartPrometheusSrv(prometheusPort)
if err != nil {
return err
}*/
////////////////////////////
if config.Config.Prometheus.Enable {
reg := prometheus.NewRegistry()
reg.MustRegister(
collectors.NewGoCollector(),
)
reg.MustRegister(prom_metrics.GetGrpcCusMetrics("Transfer")...)
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil))
}
////////////////////////////////////////
wg.Wait()
return nil
}

@ -16,6 +16,7 @@ package msgtransfer
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/IBM/sarama"
"google.golang.org/protobuf/proto"
@ -74,6 +75,9 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(
"conversationID",
msgFromMQ.ConversationID,
)
prom_metrics.MsgInsertMongoFailedCounter.Inc()
} else {
prom_metrics.MsgInsertMongoSuccessCounter.Inc()
}
var seqs []int64
for _, msg := range msgFromMQ.MsgData {

@ -14,10 +14,6 @@
package push
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
)
type Consumer struct {
pushCh ConsumerHandler
successCount uint64
@ -29,11 +25,6 @@ func NewConsumer(pusher *Pusher) *Consumer {
}
}
func (c *Consumer) initPrometheus() {
prome.NewMsgOfflinePushSuccessCounter()
prome.NewMsgOfflinePushFailedCounter()
}
func (c *Consumer) Start() {
// statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to
// msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)

@ -67,7 +67,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
go func() {
defer wg.Done()
consumer := NewConsumer(pusher)
consumer.initPrometheus()
consumer.Start()
}()
wg.Wait()

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
"github.com/OpenIMSDK/protocol/conversation"
@ -40,7 +41,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
@ -288,10 +288,9 @@ func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg
}
err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil {
prome.Inc(prome.MsgOfflinePushFailedCounter)
prom_metrics.MsgOfflinePushFailedCounter.Inc()
return err
}
prome.Inc(prome.MsgOfflinePushSuccessCounter)
return nil
}

@ -16,6 +16,7 @@ package auth
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -73,6 +74,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*
if err != nil {
return nil, err
}
prom_metrics.UserLoginCounter.Inc()
resp.Token = token
resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60
return &resp, nil

@ -16,6 +16,7 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -28,8 +29,6 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
promepkg "github.com/openimsdk/open-im-server/v3/pkg/common/prome"
)
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, error error) {
@ -59,9 +58,8 @@ func (m *msgServer) sendMsgSuperGroupChat(
ctx context.Context,
req *pbmsg.SendMsgReq,
) (resp *pbmsg.SendMsgResp, err error) {
promepkg.Inc(promepkg.WorkSuperGroupChatMsgRecvSuccessCounter)
if err = m.messageVerification(ctx, req); err != nil {
promepkg.Inc(promepkg.WorkSuperGroupChatMsgProcessFailedCounter)
prom_metrics.GroupChatMsgProcessFailedCounter.Inc()
return nil, err
}
if err = callbackBeforeSendGroupMsg(ctx, req); err != nil {
@ -80,7 +78,7 @@ func (m *msgServer) sendMsgSuperGroupChat(
if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err)
}
promepkg.Inc(promepkg.WorkSuperGroupChatMsgProcessSuccessCounter)
prom_metrics.GroupChatMsgProcessSuccessCounter.Inc()
resp = &pbmsg.SendMsgResp{}
resp.SendTime = req.MsgData.SendTime
resp.ServerMsgID = req.MsgData.ServerMsgID
@ -133,9 +131,7 @@ func (m *msgServer) sendMsgNotification(
ctx context.Context,
req *pbmsg.SendMsgReq,
) (resp *pbmsg.SendMsgResp, err error) {
promepkg.Inc(promepkg.SingleChatMsgRecvSuccessCounter)
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter)
return nil, err
}
resp = &pbmsg.SendMsgResp{
@ -147,7 +143,6 @@ func (m *msgServer) sendMsgNotification(
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
promepkg.Inc(promepkg.SingleChatMsgRecvSuccessCounter)
if err := m.messageVerification(ctx, req); err != nil {
return nil, err
}
@ -166,7 +161,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
}
}
if !isSend {
promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter)
prom_metrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, nil
} else {
if err = callbackBeforeSendSingleMsg(ctx, req); err != nil {
@ -176,7 +171,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
return nil, err
}
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter)
prom_metrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, err
}
err = callbackAfterSendSingleMsg(ctx, req)
@ -188,7 +183,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
ClientMsgID: req.MsgData.ClientMsgID,
SendTime: req.MsgData.SendTime,
}
promepkg.Inc(promepkg.SingleChatMsgProcessSuccessCounter)
prom_metrics.SingleChatMsgProcessSuccessCounter.Inc()
return resp, nil
}
}

@ -28,7 +28,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
@ -94,27 +93,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
}
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
s.addInterceptorHandler(MessageHasReadEnabled)
s.initPrometheus()
msg.RegisterMsgServer(server, s)
return nil
}
func (m *msgServer) initPrometheus() {
prome.NewMsgPullFromRedisSuccessCounter()
prome.NewMsgPullFromRedisFailedCounter()
prome.NewMsgPullFromMongoSuccessCounter()
prome.NewMsgPullFromMongoFailedCounter()
prome.NewSingleChatMsgRecvSuccessCounter()
prome.NewGroupChatMsgRecvSuccessCounter()
prome.NewWorkSuperGroupChatMsgRecvSuccessCounter()
prome.NewSingleChatMsgProcessSuccessCounter()
prome.NewSingleChatMsgProcessFailedCounter()
prome.NewGroupChatMsgProcessSuccessCounter()
prome.NewGroupChatMsgProcessFailedCounter()
prome.NewWorkSuperGroupChatMsgProcessSuccessCounter()
prome.NewWorkSuperGroupChatMsgProcessFailedCounter()
}
func (m *msgServer) conversationAndGetRecvID(conversation *conversation.Conversation, userID string) (recvID string) {
if conversation.ConversationType == constant.SingleChatType ||
conversation.ConversationType == constant.NotificationChatType {

@ -34,9 +34,9 @@ func NewApiCmd() *ApiCmd {
return ret
}
func (a *ApiCmd) AddApi(f func(port int) error) {
func (a *ApiCmd) AddApi(f func(port int, promPort int) error) {
a.Command.RunE = func(cmd *cobra.Command, args []string) error {
return f(a.getPortFlag(cmd))
return f(a.getPortFlag(cmd), a.getPrometheusPortFlag(cmd))
}
}
@ -44,8 +44,8 @@ func (a *ApiCmd) GetPortFromConfig(portType string) int {
fmt.Println("GetPortFromConfig:", portType)
if portType == constant.FlagPort {
return config2.Config.Api.OpenImApiPort[0]
} else {
return 0
} else if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.ApiPrometheusPort[0]
}
return 0
}

@ -66,7 +66,7 @@ func (m *MsgGatewayCmd) GetPortFromConfig(portType string) int {
} else if portType == constant.FlagPort {
return v3config.Config.LongConnSvr.OpenImMessageGatewayPort[0]
} else if portType == constant.FlagPrometheusPort {
return 0
return v3config.Config.Prometheus.MessageGatewayPrometheusPort[0]
} else {
return 0
}

@ -15,6 +15,9 @@
package cmd
import (
"fmt"
"github.com/OpenIMSDK/protocol/constant"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
@ -40,3 +43,25 @@ func (m *MsgTransferCmd) Exec() error {
m.addRunE()
return m.Execute()
}
func (m *MsgTransferCmd) GetPortFromConfig(portType string) int {
fmt.Println("GetPortFromConfig:", portType)
if portType == constant.FlagPort {
return 0
} else if portType == constant.FlagPrometheusPort {
n := m.getTransferProgressFlagValue()
return config2.Config.Prometheus.MessageTransferPrometheusPort[n]
}
return 0
}
func (m *MsgTransferCmd) AddTransferProgressFlag() {
m.Command.Flags().IntP(constant.FlagTransferProgressIndex, "n", 0, "transfer progress index")
}
func (m *MsgTransferCmd) getTransferProgressFlagValue() int {
nindex, err := m.Command.Flags().GetInt(constant.FlagTransferProgressIndex)
if err != nil {
fmt.Println("get transfercmd error,make sure it is k8s env or not")
return 0
}
return nindex
}

@ -61,34 +61,58 @@ func (a *RpcCmd) GetPortFromConfig(portType string) int {
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImPushPort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.PushPrometheusPort[0]
}
case RpcAuthServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImAuthPort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.AuthPrometheusPort[0]
}
case RpcConversationServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImConversationPort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.ConversationPrometheusPort[0]
}
case RpcFriendServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImFriendPort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.FriendPrometheusPort[0]
}
case RpcGroupServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImGroupPort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.GroupPrometheusPort[0]
}
case RpcMsgServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImMessagePort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.MessagePrometheusPort[0]
}
case RpcThirdServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImThirdPort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.ThirdPrometheusPort[0]
}
case RpcUserServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImUserPort[0]
}
if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.UserPrometheusPort[0]
}
}
return 0
}

@ -262,18 +262,20 @@ type configStruct struct {
} `yaml:"callback"`
Prometheus struct {
Enable bool `yaml:"enable"`
UserPrometheusPort []int `yaml:"userPrometheusPort"`
FriendPrometheusPort []int `yaml:"friendPrometheusPort"`
MessagePrometheusPort []int `yaml:"messagePrometheusPort"`
MessageGatewayPrometheusPort []int `yaml:"messageGatewayPrometheusPort"`
GroupPrometheusPort []int `yaml:"groupPrometheusPort"`
AuthPrometheusPort []int `yaml:"authPrometheusPort"`
PushPrometheusPort []int `yaml:"pushPrometheusPort"`
ConversationPrometheusPort []int `yaml:"conversationPrometheusPort"`
RtcPrometheusPort []int `yaml:"rtcPrometheusPort"`
MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"`
ThirdPrometheusPort []int `yaml:"thirdPrometheusPort"`
Enable bool `yaml:"enable"`
PrometheusUrl string `yaml:"prometheusUrl"`
ApiPrometheusPort []int `yaml:"apiPrometheusPort"`
UserPrometheusPort []int `yaml:"userPrometheusPort"`
FriendPrometheusPort []int `yaml:"friendPrometheusPort"`
MessagePrometheusPort []int `yaml:"messagePrometheusPort"`
MessageGatewayPrometheusPort []int `yaml:"messageGatewayPrometheusPort"`
GroupPrometheusPort []int `yaml:"groupPrometheusPort"`
AuthPrometheusPort []int `yaml:"authPrometheusPort"`
PushPrometheusPort []int `yaml:"pushPrometheusPort"`
ConversationPrometheusPort []int `yaml:"conversationPrometheusPort"`
RtcPrometheusPort []int `yaml:"rtcPrometheusPort"`
MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"`
ThirdPrometheusPort []int `yaml:"thirdPrometheusPort"`
} `yaml:"prometheus"`
Notification notification `yaml:"notification"`
}

@ -35,6 +35,16 @@ const (
DefaultFolderPath = "../config/"
)
// return absolude path join ../config/, this is k8s container config path
func GetDefaultConfigPath() string {
b, err := filepath.Abs(os.Args[0])
if err != nil {
fmt.Println("filepath.Abs error,err=", err)
return ""
}
return filepath.Join(filepath.Dir(b), "../config/")
}
// getProjectRoot returns the absolute path of the project root directory
func GetProjectRoot() string {
b, _ := filepath.Abs(os.Args[0])
@ -65,9 +75,11 @@ func initConfig(config interface{}, configName, configFolderPath string) error {
_, err := os.Stat(configFolderPath)
if err != nil {
if !os.IsNotExist(err) {
fmt.Println("stat config path error:", err.Error())
return fmt.Errorf("stat config path error: %w", err)
}
configFolderPath = filepath.Join(GetProjectRoot(), "config", configName)
fmt.Println("flag's path,enviment's path,default path all is not exist,using project path:", configFolderPath)
}
data, err := os.ReadFile(configFolderPath)
if err != nil {
@ -86,7 +98,7 @@ func InitConfig(configFolderPath string) error {
if envConfigPath != "" {
configFolderPath = envConfigPath
} else {
configFolderPath = DefaultFolderPath
configFolderPath = GetDefaultConfigPath()
}
}

@ -17,6 +17,7 @@ package controller
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"time"
"github.com/redis/go-redis/v9"
@ -30,8 +31,6 @@ import (
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"go.mongodb.org/mongo-driver/mongo"
pbmsg "github.com/OpenIMSDK/protocol/msg"
@ -355,10 +354,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
prome.Inc(prome.SeqGetFailedCounter)
log.ZError(ctx, "db.cache.GetMaxSeq", err)
return 0, false, err
}
prome.Inc(prome.SeqGetSuccessCounter)
lenList := len(msgs)
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, false, errors.New("too large")
@ -378,23 +376,20 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
}
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
if err != nil {
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
prom_metrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prome.Inc(prome.MsgInsertRedisSuccessCounter)
prom_metrics.MsgInsertRedisSuccessCounter.Inc()
}
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
if err != nil {
prome.Inc(prome.SeqSetFailedCounter)
} else {
prome.Inc(prome.SeqSetSuccessCounter)
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID)
prom_metrics.SeqSetFailedCounter.Inc()
}
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
prome.Inc(prome.SeqSetFailedCounter)
} else {
prome.Inc(prome.SeqSetSuccessCounter)
prom_metrics.SeqSetFailedCounter.Inc()
}
return lastMaxSeq, isNew, utils.Wrap(err, "")
}
@ -493,7 +488,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs)
}
}
@ -530,7 +525,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2))
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache)
}
}
@ -543,14 +538,14 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs)
}
// get from cache or db
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end)
if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return 0, 0, nil, err
}
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
@ -582,7 +577,6 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
}
}
@ -602,14 +596,14 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
"conversationID",
conversationID,
)
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return 0, 0, nil, err
}
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
return minSeq, maxSeq, successMsgs, nil

@ -0,0 +1,417 @@
package ginPrometheus
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var defaultMetricPath = "/metrics"
// counter, counter_vec, gauge, gauge_vec,
// histogram, histogram_vec, summary, summary_vec
var reqCnt = &Metric{
ID: "reqCnt",
Name: "requests_total",
Description: "How many HTTP requests processed, partitioned by status code and HTTP method.",
Type: "counter_vec",
Args: []string{"code", "method", "handler", "host", "url"}}
var reqDur = &Metric{
ID: "reqDur",
Name: "request_duration_seconds",
Description: "The HTTP request latencies in seconds.",
Type: "histogram_vec",
Args: []string{"code", "method", "url"},
}
var resSz = &Metric{
ID: "resSz",
Name: "response_size_bytes",
Description: "The HTTP response sizes in bytes.",
Type: "summary"}
var reqSz = &Metric{
ID: "reqSz",
Name: "request_size_bytes",
Description: "The HTTP request sizes in bytes.",
Type: "summary"}
var standardMetrics = []*Metric{
reqCnt,
reqDur,
resSz,
reqSz,
}
/*
RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control
the cardinality of the request counter's "url" label, which might be required in some contexts.
For instance, if for a "/customer/:name" route you don't want to generate a time series for every
possible customer name, you could use this function:
func(c *gin.Context) string {
url := c.Request.URL.Path
for _, p := range c.Params {
if p.Key == "name" {
url = strings.Replace(url, p.Value, ":name", 1)
break
}
}
return url
}
which would map "/customer/alice" and "/customer/bob" to their template "/customer/:name".
*/
type RequestCounterURLLabelMappingFn func(c *gin.Context) string
// Metric is a definition for the name, description, type, ID, and
// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric
type Metric struct {
MetricCollector prometheus.Collector
ID string
Name string
Description string
Type string
Args []string
}
// Prometheus contains the metrics gathered by the instance and its path
type Prometheus struct {
reqCnt *prometheus.CounterVec
reqDur *prometheus.HistogramVec
reqSz, resSz prometheus.Summary
router *gin.Engine
listenAddress string
Ppg PrometheusPushGateway
MetricsList []*Metric
MetricsPath string
ReqCntURLLabelMappingFn RequestCounterURLLabelMappingFn
// gin.Context string to use as a prometheus URL label
URLLabelFromContext string
}
// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional)
type PrometheusPushGateway struct {
// Push interval in seconds
PushIntervalSeconds time.Duration
// Push Gateway URL in format http://domain:port
// where JOBNAME can be any string of your choice
PushGatewayURL string
// Local metrics URL where metrics are fetched from, this could be ommited in the future
// if implemented using prometheus common/expfmt instead
MetricsURL string
// pushgateway job name, defaults to "gin"
Job string
}
// NewPrometheus generates a new set of metrics with a certain subsystem name
func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus {
subsystem = "app"
var metricsList []*Metric
if len(customMetricsList) > 1 {
panic("Too many args. NewPrometheus( string, <optional []*Metric> ).")
} else if len(customMetricsList) == 1 {
metricsList = customMetricsList[0]
}
for _, metric := range standardMetrics {
metricsList = append(metricsList, metric)
}
p := &Prometheus{
MetricsList: metricsList,
MetricsPath: defaultMetricPath,
ReqCntURLLabelMappingFn: func(c *gin.Context) string {
return c.Request.URL.Path
},
}
p.registerMetrics(subsystem)
return p
}
// SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL
// every pushIntervalSeconds. Metrics are fetched from metricsURL
func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) {
p.Ppg.PushGatewayURL = pushGatewayURL
p.Ppg.MetricsURL = metricsURL
p.Ppg.PushIntervalSeconds = pushIntervalSeconds
p.startPushTicker()
}
// SetPushGatewayJob job name, defaults to "gin"
func (p *Prometheus) SetPushGatewayJob(j string) {
p.Ppg.Job = j
}
// SetListenAddress for exposing metrics on address. If not set, it will be exposed at the
// same address of the gin engine that is being used
func (p *Prometheus) SetListenAddress(address string) {
p.listenAddress = address
if p.listenAddress != "" {
p.router = gin.Default()
}
}
// SetListenAddressWithRouter for using a separate router to expose metrics. (this keeps things like GET /metrics out of
// your content's access log).
func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Engine) {
p.listenAddress = listenAddress
if len(p.listenAddress) > 0 {
p.router = r
}
}
// SetMetricsPath set metrics paths
func (p *Prometheus) SetMetricsPath(e *gin.Engine) {
if p.listenAddress != "" {
p.router.GET(p.MetricsPath, prometheusHandler())
p.runServer()
} else {
e.GET(p.MetricsPath, prometheusHandler())
}
}
// SetMetricsPathWithAuth set metrics paths with authentication
func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) {
if p.listenAddress != "" {
p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
p.runServer()
} else {
e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
}
}
func (p *Prometheus) runServer() {
if p.listenAddress != "" {
go p.router.Run(p.listenAddress)
}
}
func (p *Prometheus) getMetrics() []byte {
response, _ := http.Get(p.Ppg.MetricsURL)
defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
return body
}
func (p *Prometheus) getPushGatewayURL() string {
h, _ := os.Hostname()
if p.Ppg.Job == "" {
p.Ppg.Job = "gin"
}
return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + h
}
func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) {
req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics))
client := &http.Client{}
if _, err = client.Do(req); err != nil {
fmt.Println("Error sending to push gateway error:", err.Error())
}
}
func (p *Prometheus) startPushTicker() {
ticker := time.NewTicker(time.Second * p.Ppg.PushIntervalSeconds)
go func() {
for range ticker.C {
p.sendMetricsToPushGateway(p.getMetrics())
}
}()
}
// NewMetric associates prometheus.Collector based on Metric.Type
func NewMetric(m *Metric, subsystem string) prometheus.Collector {
var metric prometheus.Collector
switch m.Type {
case "counter_vec":
metric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "counter":
metric = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "gauge_vec":
metric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "gauge":
metric = prometheus.NewGauge(
prometheus.GaugeOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "histogram_vec":
metric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "histogram":
metric = prometheus.NewHistogram(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "summary_vec":
metric = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "summary":
metric = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
}
return metric
}
func (p *Prometheus) registerMetrics(subsystem string) {
for _, metricDef := range p.MetricsList {
metric := NewMetric(metricDef, subsystem)
if err := prometheus.Register(metric); err != nil {
fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error())
}
switch metricDef {
case reqCnt:
p.reqCnt = metric.(*prometheus.CounterVec)
case reqDur:
p.reqDur = metric.(*prometheus.HistogramVec)
case resSz:
p.resSz = metric.(prometheus.Summary)
case reqSz:
p.reqSz = metric.(prometheus.Summary)
}
metricDef.MetricCollector = metric
}
}
// Use adds the middleware to a gin engine.
func (p *Prometheus) Use(e *gin.Engine) {
e.Use(p.HandlerFunc())
p.SetMetricsPath(e)
}
// UseWithAuth adds the middleware to a gin engine with BasicAuth.
func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) {
e.Use(p.HandlerFunc())
p.SetMetricsPathWithAuth(e, accounts)
}
// HandlerFunc defines handler function for middleware
func (p *Prometheus) HandlerFunc() gin.HandlerFunc {
return func(c *gin.Context) {
if c.Request.URL.Path == p.MetricsPath {
c.Next()
return
}
start := time.Now()
reqSz := computeApproximateRequestSize(c.Request)
c.Next()
status := strconv.Itoa(c.Writer.Status())
elapsed := float64(time.Since(start)) / float64(time.Second)
resSz := float64(c.Writer.Size())
url := p.ReqCntURLLabelMappingFn(c)
if len(p.URLLabelFromContext) > 0 {
u, found := c.Get(p.URLLabelFromContext)
if !found {
u = "unknown"
}
url = u.(string)
}
p.reqDur.WithLabelValues(status, c.Request.Method, url).Observe(elapsed)
p.reqCnt.WithLabelValues(status, c.Request.Method, c.HandlerName(), c.Request.Host, url).Inc()
p.reqSz.Observe(float64(reqSz))
p.resSz.Observe(resSz)
}
}
func prometheusHandler() gin.HandlerFunc {
h := promhttp.Handler()
return func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
}
}
func computeApproximateRequestSize(r *http.Request) int {
s := 0
if r.URL != nil {
s = len(r.URL.Path)
}
s += len(r.Method)
s += len(r.Proto)
for name, values := range r.Header {
s += len(name)
for _, value := range values {
s += len(value)
}
}
s += len(r.Host)
// r.Form and r.MultipartForm are assumed to be included in r.URL.
if r.ContentLength != -1 {
s += int(r.ContentLength)
}
return s
}

@ -28,8 +28,6 @@ import (
"github.com/IBM/sarama"
"google.golang.org/protobuf/proto"
prome "github.com/openimsdk/open-im-server/v3/pkg/common/prome"
)
const (
@ -131,8 +129,8 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
kMsg.Headers = header
partition, offset, err := p.producer.SendMessage(kMsg)
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())
if err == nil {
prome.Inc(prome.SendMsgCounter)
if err != nil {
log.ZWarn(ctx, "p.producer.SendMessage error", err)
}
return partition, offset, utils.Wrap(err, "")
}

@ -0,0 +1,45 @@
package prom_metrics
import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)
func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *grpc_prometheus.ServerMetrics, error) {
////////////////////////////////////////////////////////
reg := prometheus.NewRegistry()
grpcMetrics := grpc_prometheus.NewServerMetrics()
grpcMetrics.EnableHandlingTimeHistogram()
cusMetrics = append(cusMetrics, grpcMetrics, collectors.NewGoCollector())
reg.MustRegister(cusMetrics...)
return reg, grpcMetrics, nil
}
func GetGrpcCusMetrics(registerName string) []prometheus.Collector {
switch registerName {
case config2.Config.RpcRegisterName.OpenImMessageGatewayName:
return []prometheus.Collector{OnlineUserGauge}
case config2.Config.RpcRegisterName.OpenImMsgName:
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter}
case "Transfer":
return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter}
case config2.Config.RpcRegisterName.OpenImPushName:
return []prometheus.Collector{MsgOfflinePushFailedCounter}
case config2.Config.RpcRegisterName.OpenImAuthName:
return []prometheus.Collector{UserLoginCounter}
default:
return nil
}
}
func GetGinCusMetrics(name string) []*ginPrometheus.Metric {
switch name {
case "Api":
return []*ginPrometheus.Metric{ApiCustomCnt}
default:
return []*ginPrometheus.Metric{ApiCustomCnt}
}
}

@ -0,0 +1,16 @@
package prom_metrics
import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
/*
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc()
*/
var (
ApiCustomCnt = &ginProm.Metric{
Name: "custom_total",
Description: "Custom counter events.",
Type: "counter_vec",
Args: []string{"label_one", "label_two"},
}
)

@ -0,0 +1,12 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
UserLoginCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "user_login_total",
Help: "The number of user login",
})
)

@ -0,0 +1,24 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
SingleChatMsgProcessSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "single_chat_msg_process_success_total",
Help: "The number of single chat msg successful processed",
})
SingleChatMsgProcessFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "single_chat_msg_process_failed_total",
Help: "The number of single chat msg failed processed",
})
GroupChatMsgProcessSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "group_chat_msg_process_success_total",
Help: "The number of group chat msg successful processed",
})
GroupChatMsgProcessFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "group_chat_msg_process_failed_total",
Help: "The number of group chat msg failed processed",
})
)

@ -0,0 +1,12 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
OnlineUserGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "online_user_num",
Help: "The number of online user num",
})
)

@ -0,0 +1,12 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
MsgOfflinePushFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_offline_push_failed_total",
Help: "The number of msg failed offline pushed",
})
)

@ -0,0 +1,28 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
MsgInsertRedisSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_redis_success_total",
Help: "The number of successful insert msg to redis",
})
MsgInsertRedisFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_redis_failed_total",
Help: "The number of failed insert msg to redis",
})
MsgInsertMongoSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_mongo_success_total",
Help: "The number of successful insert msg to mongo",
})
MsgInsertMongoFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_mongo_failed_total",
Help: "The number of failed insert msg to mongo",
})
SeqSetFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "seq_set_failed_total",
Help: "The number of failed set seq",
})
)

@ -1,15 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prome // import "github.com/openimsdk/open-im-server/v3/pkg/common/prome"

@ -1,470 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prome
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// auth rpc.
UserLoginCounter prometheus.Counter
UserRegisterCounter prometheus.Counter
// seg.
SeqGetSuccessCounter prometheus.Counter
SeqGetFailedCounter prometheus.Counter
SeqSetSuccessCounter prometheus.Counter
SeqSetFailedCounter prometheus.Counter
// msg-db.
MsgInsertRedisSuccessCounter prometheus.Counter
MsgInsertRedisFailedCounter prometheus.Counter
MsgInsertMongoSuccessCounter prometheus.Counter
MsgInsertMongoFailedCounter prometheus.Counter
MsgPullFromRedisSuccessCounter prometheus.Counter
MsgPullFromRedisFailedCounter prometheus.Counter
MsgPullFromMongoSuccessCounter prometheus.Counter
MsgPullFromMongoFailedCounter prometheus.Counter
// msg-ws.
MsgRecvTotalCounter prometheus.Counter
GetNewestSeqTotalCounter prometheus.Counter
PullMsgBySeqListTotalCounter prometheus.Counter
SingleChatMsgRecvSuccessCounter prometheus.Counter
GroupChatMsgRecvSuccessCounter prometheus.Counter
WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter
OnlineUserGauge prometheus.Gauge
// msg-msg.
SingleChatMsgProcessSuccessCounter prometheus.Counter
SingleChatMsgProcessFailedCounter prometheus.Counter
GroupChatMsgProcessSuccessCounter prometheus.Counter
GroupChatMsgProcessFailedCounter prometheus.Counter
WorkSuperGroupChatMsgProcessSuccessCounter prometheus.Counter
WorkSuperGroupChatMsgProcessFailedCounter prometheus.Counter
// msg-push.
MsgOnlinePushSuccessCounter prometheus.Counter
MsgOfflinePushSuccessCounter prometheus.Counter
MsgOfflinePushFailedCounter prometheus.Counter
// api.
ApiRequestCounter prometheus.Counter
ApiRequestSuccessCounter prometheus.Counter
ApiRequestFailedCounter prometheus.Counter
// grpc.
GrpcRequestCounter prometheus.Counter
GrpcRequestSuccessCounter prometheus.Counter
GrpcRequestFailedCounter prometheus.Counter
SendMsgCounter prometheus.Counter
// conversation.
ConversationCreateSuccessCounter prometheus.Counter
ConversationCreateFailedCounter prometheus.Counter
)
func NewUserLoginCounter() {
if UserLoginCounter != nil {
return
}
UserLoginCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "user_login",
Help: "The number of user login",
})
}
func NewUserRegisterCounter() {
if UserRegisterCounter != nil {
return
}
UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "user_register",
Help: "The number of user register",
})
}
func NewSeqGetSuccessCounter() {
if SeqGetSuccessCounter != nil {
return
}
SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "seq_get_success",
Help: "The number of successful get seq",
})
}
func NewSeqGetFailedCounter() {
if SeqGetFailedCounter != nil {
return
}
SeqGetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "seq_get_failed",
Help: "The number of failed get seq",
})
}
func NewSeqSetSuccessCounter() {
if SeqSetSuccessCounter != nil {
return
}
SeqSetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "seq_set_success",
Help: "The number of successful set seq",
})
}
func NewSeqSetFailedCounter() {
if SeqSetFailedCounter != nil {
return
}
SeqSetFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "seq_set_failed",
Help: "The number of failed set seq",
})
}
func NewApiRequestCounter() {
if ApiRequestCounter != nil {
return
}
ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request",
Help: "The number of api request",
})
}
func NewApiRequestSuccessCounter() {
if ApiRequestSuccessCounter != nil {
return
}
ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request_success",
Help: "The number of api request success",
})
}
func NewApiRequestFailedCounter() {
if ApiRequestFailedCounter != nil {
return
}
ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request_failed",
Help: "The number of api request failed",
})
}
func NewGrpcRequestCounter() {
if GrpcRequestCounter != nil {
return
}
GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request",
Help: "The number of api request",
})
}
func NewGrpcRequestSuccessCounter() {
if GrpcRequestSuccessCounter != nil {
return
}
GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request_success",
Help: "The number of grpc request success",
})
}
func NewGrpcRequestFailedCounter() {
if GrpcRequestFailedCounter != nil {
return
}
GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request_failed",
Help: "The number of grpc request failed",
})
}
func NewSendMsgCount() {
if SendMsgCounter != nil {
return
}
SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "send_msg",
Help: "The number of send msg",
})
}
func NewMsgInsertRedisSuccessCounter() {
if MsgInsertRedisSuccessCounter != nil {
return
}
MsgInsertRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_redis_success",
Help: "The number of successful insert msg to redis",
})
}
func NewMsgInsertRedisFailedCounter() {
if MsgInsertRedisFailedCounter != nil {
return
}
MsgInsertRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_redis_failed",
Help: "The number of failed insert msg to redis",
})
}
func NewMsgInsertMongoSuccessCounter() {
if MsgInsertMongoSuccessCounter != nil {
return
}
MsgInsertMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_mongo_success",
Help: "The number of successful insert msg to mongo",
})
}
func NewMsgInsertMongoFailedCounter() {
if MsgInsertMongoFailedCounter != nil {
return
}
MsgInsertMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_mongo_failed",
Help: "The number of failed insert msg to mongo",
})
}
func NewMsgPullFromRedisSuccessCounter() {
if MsgPullFromRedisSuccessCounter != nil {
return
}
MsgPullFromRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_pull_from_redis_success",
Help: "The number of successful pull msg from redis",
})
}
func NewMsgPullFromRedisFailedCounter() {
if MsgPullFromRedisFailedCounter != nil {
return
}
MsgPullFromRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_pull_from_redis_failed",
Help: "The number of failed pull msg from redis",
})
}
func NewMsgPullFromMongoSuccessCounter() {
if MsgPullFromMongoSuccessCounter != nil {
return
}
MsgPullFromMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_pull_from_mongo_success",
Help: "The number of successful pull msg from mongo",
})
}
func NewMsgPullFromMongoFailedCounter() {
if MsgPullFromMongoFailedCounter != nil {
return
}
MsgPullFromMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_pull_from_mongo_failed",
Help: "The number of failed pull msg from mongo",
})
}
func NewMsgRecvTotalCounter() {
if MsgRecvTotalCounter != nil {
return
}
MsgRecvTotalCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_recv_total",
Help: "The number of msg received",
})
}
func NewGetNewestSeqTotalCounter() {
if GetNewestSeqTotalCounter != nil {
return
}
GetNewestSeqTotalCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "get_newest_seq_total",
Help: "the number of get newest seq",
})
}
func NewPullMsgBySeqListTotalCounter() {
if PullMsgBySeqListTotalCounter != nil {
return
}
PullMsgBySeqListTotalCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "pull_msg_by_seq_list_total",
Help: "The number of pull msg by seq list",
})
}
func NewSingleChatMsgRecvSuccessCounter() {
if SingleChatMsgRecvSuccessCounter != nil {
return
}
SingleChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "single_chat_msg_recv_success",
Help: "The number of single chat msg successful received ",
})
}
func NewGroupChatMsgRecvSuccessCounter() {
if GroupChatMsgRecvSuccessCounter != nil {
return
}
GroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "group_chat_msg_recv_success",
Help: "The number of group chat msg successful received",
})
}
func NewWorkSuperGroupChatMsgRecvSuccessCounter() {
if WorkSuperGroupChatMsgRecvSuccessCounter != nil {
return
}
WorkSuperGroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "work_super_group_chat_msg_recv_success",
Help: "The number of work/super group chat msg successful received",
})
}
func NewOnlineUserGauges() {
if OnlineUserGauge != nil {
return
}
OnlineUserGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "online_user_num",
Help: "The number of online user num",
})
}
func NewSingleChatMsgProcessSuccessCounter() {
if SingleChatMsgProcessSuccessCounter != nil {
return
}
SingleChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "single_chat_msg_process_success",
Help: "The number of single chat msg successful processed",
})
}
func NewSingleChatMsgProcessFailedCounter() {
if SingleChatMsgProcessFailedCounter != nil {
return
}
SingleChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "single_chat_msg_process_failed",
Help: "The number of single chat msg failed processed",
})
}
func NewGroupChatMsgProcessSuccessCounter() {
if GroupChatMsgProcessSuccessCounter != nil {
return
}
GroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "group_chat_msg_process_success",
Help: "The number of group chat msg successful processed",
})
}
func NewGroupChatMsgProcessFailedCounter() {
if GroupChatMsgProcessFailedCounter != nil {
return
}
GroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "group_chat_msg_process_failed",
Help: "The number of group chat msg failed processed",
})
}
func NewWorkSuperGroupChatMsgProcessSuccessCounter() {
if WorkSuperGroupChatMsgProcessSuccessCounter != nil {
return
}
WorkSuperGroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "work_super_group_chat_msg_process_success",
Help: "The number of work/super group chat msg successful processed",
})
}
func NewWorkSuperGroupChatMsgProcessFailedCounter() {
if WorkSuperGroupChatMsgProcessFailedCounter != nil {
return
}
WorkSuperGroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "work_super_group_chat_msg_process_failed",
Help: "The number of work/super group chat msg failed processed",
})
}
func NewMsgOnlinePushSuccessCounter() {
if MsgOnlinePushSuccessCounter != nil {
return
}
MsgOnlinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_online_push_success",
Help: "The number of msg successful online pushed",
})
}
func NewMsgOfflinePushSuccessCounter() {
if MsgOfflinePushSuccessCounter != nil {
return
}
MsgOfflinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_offline_push_success",
Help: "The number of msg successful offline pushed",
})
}
func NewMsgOfflinePushFailedCounter() {
if MsgOfflinePushFailedCounter != nil {
return
}
MsgOfflinePushFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_offline_push_failed",
Help: "The number of msg failed offline pushed",
})
}
func NewConversationCreateSuccessCounter() {
if ConversationCreateSuccessCounter != nil {
return
}
ConversationCreateSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "conversation_push_success",
Help: "The number of conversation successful pushed",
})
}
func NewConversationCreateFailedCounter() {
if ConversationCreateFailedCounter != nil {
return
}
ConversationCreateFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "conversation_push_failed",
Help: "The number of conversation failed pushed",
})
}

@ -1,97 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prome
import (
"bytes"
"net/http"
"strconv"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func StartPrometheusSrv(prometheusPort int) error {
if config.Config.Prometheus.Enable {
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
return err
}
return nil
}
func PrometheusHandler() gin.HandlerFunc {
h := promhttp.Handler()
return func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
}
}
type responseBodyWriter struct {
gin.ResponseWriter
body *bytes.Buffer
}
func (r responseBodyWriter) Write(b []byte) (int, error) {
r.body.Write(b)
return r.ResponseWriter.Write(b)
}
func PrometheusMiddleware(c *gin.Context) {
Inc(ApiRequestCounter)
w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer}
c.Writer = w
c.Next()
if c.Writer.Status() == http.StatusOK {
Inc(ApiRequestSuccessCounter)
} else {
Inc(ApiRequestFailedCounter)
}
}
func Inc(counter prometheus.Counter) {
if config.Config.Prometheus.Enable {
if counter != nil {
counter.Inc()
}
}
}
func Add(counter prometheus.Counter, add int) {
if config.Config.Prometheus.Enable {
if counter != nil {
counter.Add(float64(add))
}
}
}
func GaugeInc(gauges prometheus.Gauge) {
if config.Config.Prometheus.Enable {
if gauges != nil {
gauges.Inc()
}
}
}
func GaugeDec(gauges prometheus.Gauge) {
if config.Config.Prometheus.Enable {
if gauges != nil {
gauges.Dec()
}
}
}

@ -16,7 +16,12 @@ package startrpc
import (
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net"
"net/http"
"strconv"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -29,7 +34,6 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/network"
"github.com/OpenIMSDK/tools/prome"
"github.com/OpenIMSDK/tools/utils"
)
@ -41,7 +45,7 @@ func Start(
rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption,
) error {
fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s",
fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n",
rpcRegisterName, rpcPort, prometheusPort, config.Version)
listener, err := net.Listen(
"tcp",
@ -61,16 +65,15 @@ func Start(
if err != nil {
return err
}
var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics
// ctx 中间件
if config.Config.Prometheus.Enable {
prome.NewGrpcRequestCounter()
prome.NewGrpcRequestFailedCounter()
prome.NewGrpcRequestSuccessCounter()
unaryInterceptor := mw.InterceptChain(grpcprometheus.UnaryServerInterceptor, mw.RpcServerInterceptor)
options = append(options, []grpc.ServerOption{
grpc.StreamInterceptor(grpcprometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(unaryInterceptor),
}...)
//////////////////////////
cusMetrics := prom_metrics.GetGrpcCusMetrics(rpcRegisterName)
reg, metric, err = prom_metrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
} else {
options = append(options, mw.GrpcServer())
}
@ -91,8 +94,11 @@ func Start(
}
go func() {
if config.Config.Prometheus.Enable && prometheusPort != 0 {
if err := prome.StartPrometheusSrv(prometheusPort); err != nil {
panic(err.Error())
metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
httpServer := &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
if err := httpServer.ListenAndServe(); err != nil {
log.Fatal("Unable to start a http server.")
}
}
}()

@ -341,6 +341,9 @@ def "IOS_PRODUCTION" "false" # IOS生产
###################### Prometheus 配置信息 ######################
def "PROMETHEUS_ENABLE" "false" # 是否启用 Prometheus
def "PROMETHEUS_URL" "/prometheus"
# Api 服务的 Prometheus 端口
readonly API_PROM_PORT=${API_PROM_PORT:-'20100'}
# User 服务的 Prometheus 端口
readonly USER_PROM_PORT=${USER_PROM_PORT:-'20110'}
# Friend 服务的 Prometheus 端口

@ -49,13 +49,13 @@ function openim::msgtransfer::start()
openim::log::error_exit "OPENIM_MSGGATEWAY_NUM must be equal to the number of MSG_TRANSFER_PROM_PORTS"
fi
for (( i=1; i<=$OPENIM_MSGGATEWAY_NUM; i++ )) do
for (( i=0; i<$OPENIM_MSGGATEWAY_NUM; i++ )) do
openim::log::info "prometheus port: ${MSG_TRANSFER_PROM_PORTS[$i]}"
PROMETHEUS_PORT_OPTION=""
if [[ -n "${OPENIM_PROMETHEUS_PORTS[$i]}" ]]; then
PROMETHEUS_PORT_OPTION="--prometheus_port ${OPENIM_PROMETHEUS_PORTS[$i]}"
fi
nohup ${OPENIM_MSGTRANSFER_BINARY} ${PROMETHEUS_PORT_OPTION} -c ${OPENIM_MSGTRANSFER_CONFIG} >> ${LOG_FILE} 2>&1 &
nohup ${OPENIM_MSGTRANSFER_BINARY} ${PROMETHEUS_PORT_OPTION} -c ${OPENIM_MSGTRANSFER_CONFIG} -n ${i}>> ${LOG_FILE} 2>&1 &
done
openim::util::check_process_names "${OPENIM_OUTPUT_HOSTBIN}/${SERVER_NAME}"

Loading…
Cancel
Save