diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index 4edf037c4..6a78121dd 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -30,6 +30,7 @@ import ( "github.com/gin-gonic/gin" //"syscall" "Open_IM/pkg/common/constant" + promePkg "Open_IM/pkg/common/prometheus" ) // @title open-IM-Server API @@ -49,6 +50,9 @@ func main() { r.Use(utils.CorsHandler()) log.Info("load config: ", config.Config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) + if config.Config.Prometheus.Enable { + r.GET("/metrics", promePkg.PrometheusHandler()) + } // user routing group, which handles user registration and login services userRouterGroup := r.Group("/user") { diff --git a/cmd/open_im_demo/main.go b/cmd/open_im_demo/main.go index 988f04809..1125c7827 100644 --- a/cmd/open_im_demo/main.go +++ b/cmd/open_im_demo/main.go @@ -13,6 +13,8 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" + "github.com/gin-gonic/gin" ) @@ -21,10 +23,11 @@ func main() { gin.SetMode(gin.ReleaseMode) f, _ := os.Create("../logs/api.log") gin.DefaultWriter = io.MultiWriter(f) - r := gin.Default() r.Use(utils.CorsHandler()) - + if config.Config.Prometheus.Enable { + r.GET("/metrics", promePkg.PrometheusHandler()) + } authRouterGroup := r.Group("/demo") { authRouterGroup.POST("/code", register.SendVerificationCode) diff --git a/cmd/open_im_msg_gateway/main.go b/cmd/open_im_msg_gateway/main.go index 5996a4ac5..32d6df69f 100644 --- a/cmd/open_im_msg_gateway/main.go +++ b/cmd/open_im_msg_gateway/main.go @@ -14,13 +14,15 @@ func main() { log.NewPrivateLog(constant.LogFileName) defaultRpcPorts := config.Config.RpcPort.OpenImMessageGatewayPort defaultWsPorts := config.Config.LongConnSvr.WebsocketPort + defaultPromePorts := config.Config.Prometheus.MessageGatewayPrometheusPort rpcPort := flag.Int("rpc_port", defaultRpcPorts[0], "rpc listening port") wsPort := flag.Int("ws_port", defaultWsPorts[0], "ws listening port") + prometheusPort := flag.Int("prometheus_port", defaultPromePorts[0], "PushrometheusPort default listen port") flag.Parse() var wg sync.WaitGroup wg.Add(1) - fmt.Println("start rpc/msg_gateway server, port: ", *rpcPort, *wsPort) + fmt.Println("start rpc/msg_gateway server, port: ", *rpcPort, *wsPort, *prometheusPort) gate.Init(*rpcPort, *wsPort) - gate.Run() + gate.Run(*prometheusPort) wg.Wait() } diff --git a/cmd/open_im_msg_transfer/main.go b/cmd/open_im_msg_transfer/main.go index a43745c6c..c112c56ba 100644 --- a/cmd/open_im_msg_transfer/main.go +++ b/cmd/open_im_msg_transfer/main.go @@ -13,10 +13,11 @@ import ( func main() { var wg sync.WaitGroup wg.Add(1) - rpcPort := flag.Int("port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "MessageTransferPrometheusPort default listen port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "MessageTransferPrometheusPort default listen port") + flag.Parse() log.NewPrivateLog(constant.LogFileName) logic.Init() fmt.Println("start msg_transfer server") - logic.Run(*rpcPort) + logic.Run(*prometheusPort) wg.Wait() } diff --git a/cmd/open_im_push/main.go b/cmd/open_im_push/main.go index 63b2aaa12..806515524 100644 --- a/cmd/open_im_push/main.go +++ b/cmd/open_im_push/main.go @@ -13,12 +13,13 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImPushPort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "PushrometheusPort default listen port") flag.Parse() var wg sync.WaitGroup wg.Add(1) log.NewPrivateLog(constant.LogFileName) fmt.Println("start push rpc server, port: ", *rpcPort) logic.Init(*rpcPort) - logic.Run() + logic.Run(*prometheusPort) wg.Wait() } diff --git a/cmd/rpc/open_im_admin_cms/main.go b/cmd/rpc/open_im_admin_cms/main.go index eeb4ca588..57c73713b 100644 --- a/cmd/rpc/open_im_admin_cms/main.go +++ b/cmd/rpc/open_im_admin_cms/main.go @@ -3,6 +3,7 @@ package main import ( rpcMessageCMS "Open_IM/internal/rpc/admin_cms" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImAdminCmsPort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AdminCmsPrometheusPort[0], "adminCMSPrometheusPort default listen port") flag.Parse() fmt.Println("start cms rpc server, port: ", *rpcPort) rpcServer := rpcMessageCMS.NewAdminCMSServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_auth/main.go b/cmd/rpc/open_im_auth/main.go index 939c7d6f2..113f1c646 100644 --- a/cmd/rpc/open_im_auth/main.go +++ b/cmd/rpc/open_im_auth/main.go @@ -3,6 +3,7 @@ package main import ( rpcAuth "Open_IM/internal/rpc/auth" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,9 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImAuthPort rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AuthPrometheusPort[0], "authPrometheusPort default listen port") flag.Parse() fmt.Println("start auth rpc server, port: ", *rpcPort) rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() - } diff --git a/cmd/rpc/open_im_cache/main.go b/cmd/rpc/open_im_cache/main.go index d0aacfbc9..002d71bdf 100644 --- a/cmd/rpc/open_im_cache/main.go +++ b/cmd/rpc/open_im_cache/main.go @@ -3,6 +3,8 @@ package main import ( rpcCache "Open_IM/internal/rpc/cache" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" + "flag" "fmt" ) @@ -10,9 +12,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImCachePort rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.CachePrometheusPort[0], "cachePrometheusPort default listen port") flag.Parse() - fmt.Println("start auth rpc server, port: ", *rpcPort) + fmt.Println("start cache rpc server, port: ", *rpcPort) rpcServer := rpcCache.NewCacheServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() - } diff --git a/cmd/rpc/open_im_conversation/main.go b/cmd/rpc/open_im_conversation/main.go index 53769c4ac..0980c7a96 100644 --- a/cmd/rpc/open_im_conversation/main.go +++ b/cmd/rpc/open_im_conversation/main.go @@ -3,6 +3,7 @@ package main import ( rpcConversation "Open_IM/internal/rpc/conversation" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,9 +11,16 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImConversationPort rpcPort := flag.Int("port", defaultPorts[0], "RpcConversation default listen port 11300") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.ConversationPrometheusPort[0], "conversationPrometheusPort default listen port") flag.Parse() fmt.Println("start conversation rpc server, port: ", *rpcPort) rpcServer := rpcConversation.NewRpcConversationServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_friend/main.go b/cmd/rpc/open_im_friend/main.go index b6a650e63..b8ad67aba 100644 --- a/cmd/rpc/open_im_friend/main.go +++ b/cmd/rpc/open_im_friend/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/friend" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImFriendPort rpcPort := flag.Int("port", defaultPorts[0], "get RpcFriendPort from cmd,default 12000 as port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.FriendPrometheusPort[0], "friendPrometheusPort default listen port") flag.Parse() fmt.Println("start friend rpc server, port: ", *rpcPort) rpcServer := friend.NewFriendServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_group/main.go b/cmd/rpc/open_im_group/main.go index 14cb56722..3961b93d0 100644 --- a/cmd/rpc/open_im_group/main.go +++ b/cmd/rpc/open_im_group/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/group" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImGroupPort rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port") flag.Parse() fmt.Println("start group rpc server, port: ", *rpcPort) rpcServer := group.NewGroupServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_msg/main.go b/cmd/rpc/open_im_msg/main.go index 212d00472..e198adac3 100644 --- a/cmd/rpc/open_im_msg/main.go +++ b/cmd/rpc/open_im_msg/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImMessagePort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessagePrometheusPort[0], "msgPrometheusPort default listen port") flag.Parse() fmt.Println("start msg rpc server, port: ", *rpcPort) rpcServer := msg.NewRpcChatServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_office/main.go b/cmd/rpc/open_im_office/main.go index 51ec154f0..0597a53de 100644 --- a/cmd/rpc/open_im_office/main.go +++ b/cmd/rpc/open_im_office/main.go @@ -3,6 +3,7 @@ package main import ( rpc "Open_IM/internal/rpc/office" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImOfficePort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.OfficePrometheusPort[0], "officePrometheusPort default listen port") flag.Parse() fmt.Println("start office rpc server, port: ", *rpcPort) rpcServer := rpc.NewOfficeServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_organization/main.go b/cmd/rpc/open_im_organization/main.go index 2e8b94bb1..66d817eac 100644 --- a/cmd/rpc/open_im_organization/main.go +++ b/cmd/rpc/open_im_organization/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/organization" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImOrganizationPort rpcPort := flag.Int("port", defaultPorts[0], "get RpcOrganizationPort from cmd,default 11200 as port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.OrganizationPrometheusPort[0], "organizationPrometheusPort default listen port") flag.Parse() fmt.Println("start organization rpc server, port: ", *rpcPort) rpcServer := organization.NewServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_user/main.go b/cmd/rpc/open_im_user/main.go index e50ebef35..7895894dc 100644 --- a/cmd/rpc/open_im_user/main.go +++ b/cmd/rpc/open_im_user/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/user" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImUserPort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.UserPrometheusPort[0], "userPrometheusPort default listen port") flag.Parse() fmt.Println("start user rpc server, port: ", *rpcPort) rpcServer := user.NewUserServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/config/config.yaml b/config/config.yaml index 246312aa1..d277b6442 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -752,6 +752,7 @@ demo: rtc: signalTimeout: 35 +# prometheus每个服务监听的端口数量需要和rpc port保持一致 prometheus: enable: false userPrometheusPort: [ 20110 ] @@ -767,4 +768,4 @@ prometheus: conversationPrometheusPort: [ 20230 ] cachePrometheusPort: [ 20240 ] realTimeCommPrometheusPort: [ 21300 ] - messageTransferPrometheusPort: [ 21400 ] \ No newline at end of file + messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] # 端口数量和 script/path_info.cfg msg_transfer_service_num保持一致 \ No newline at end of file diff --git a/deploy_k8s/kubectl_stop_all.sh b/deploy_k8s/kubectl_stop_all.sh index 47f87eeb6..b433ee3cb 100755 --- a/deploy_k8s/kubectl_stop_all.sh +++ b/deploy_k8s/kubectl_stop_all.sh @@ -10,8 +10,6 @@ service=( group auth admin-cms - message-cms - statistics office organization conversation diff --git a/deploy_k8s/message_cms/deployment.yaml b/deploy_k8s/message_cms/deployment.yaml deleted file mode 100644 index 131342ecf..000000000 --- a/deploy_k8s/message_cms/deployment.yaml +++ /dev/null @@ -1,34 +0,0 @@ ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: message-cms-deployment -spec: - selector: - matchLabels: - app: message-cms # 选择这个指定标签执行 - replicas: 1 # 运行pod数量 - template: - metadata: - labels: - app: message-cms # 标签 - spec: - containers: - - name: message-cms - image: openim/message_cms:v2.3.0release - # imagePullPolicy: Always - ports: - - containerPort: 10190 - volumeMounts: - - name: config - mountPath: /Open-IM-Server/config - readOnly: true - env: - - name: CONFIG_NAME - value: "/Open-IM-Server" - volumes: - - name: config - configMap: - name: openim-config - strategy: #更新策略 - type: RollingUpdate # 滚动更新 \ No newline at end of file diff --git a/deploy_k8s/message_cms/message_cms.Dockerfile b/deploy_k8s/message_cms/message_cms.Dockerfile deleted file mode 100644 index 24d08ece9..000000000 --- a/deploy_k8s/message_cms/message_cms.Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -FROM ubuntu - -# 设置固定的项目路径 -ENV WORKDIR /Open-IM-Server -ENV CMDDIR $WORKDIR/cmd -ENV CONFIG_NAME $WORKDIR/config/config.yaml - -# 将可执行文件复制到目标目录 -ADD ./open_im_message_cms $WORKDIR/cmd/main - -# 创建用于挂载的几个目录,添加可执行权限 -RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \ - chmod +x $WORKDIR/cmd/main - -VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config","/Open-IM-Server/script"] - - -WORKDIR $CMDDIR -CMD ./main \ No newline at end of file diff --git a/deploy_k8s/path_info.cfg b/deploy_k8s/path_info.cfg index 6839e97bf..193ffd320 100644 --- a/deploy_k8s/path_info.cfg +++ b/deploy_k8s/path_info.cfg @@ -9,8 +9,6 @@ service=( group auth admin_cms - message_cms - statistics office organization conversation diff --git a/deploy_k8s/statistics/deployment.yaml b/deploy_k8s/statistics/deployment.yaml deleted file mode 100644 index 5fe485ee4..000000000 --- a/deploy_k8s/statistics/deployment.yaml +++ /dev/null @@ -1,34 +0,0 @@ ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: statistics-deployment -spec: - selector: - matchLabels: - app: statistics # 选择这个指定标签执行 - replicas: 1 # 运行pod数量 - template: - metadata: - labels: - app: statistics # 标签 - spec: - containers: - - name: statistics - image: openim/statistics:v2.3.0release - # imagePullPolicy: Always - ports: - - containerPort: 10180 - volumeMounts: - - name: config - mountPath: /Open-IM-Server/config - readOnly: true - env: - - name: CONFIG_NAME - value: "/Open-IM-Server" - volumes: - - name: config - configMap: - name: openim-config - strategy: #更新策略 - type: RollingUpdate # 滚动更新 diff --git a/deploy_k8s/statistics/statistics.Dockerfile b/deploy_k8s/statistics/statistics.Dockerfile deleted file mode 100644 index e271aca29..000000000 --- a/deploy_k8s/statistics/statistics.Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -FROM ubuntu - -# 设置固定的项目路径 -ENV WORKDIR /Open-IM-Server -ENV CMDDIR $WORKDIR/cmd -ENV CONFIG_NAME $WORKDIR/config/config.yaml - -# 将可执行文件复制到目标目录 -ADD ./open_im_statistics $WORKDIR/cmd/main - -# 创建用于挂载的几个目录,添加可执行权限 -RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \ - chmod +x $WORKDIR/cmd/main - -VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config","/Open-IM-Server/script"] - - -WORKDIR $CMDDIR -CMD ./main \ No newline at end of file diff --git a/docker-compose_cfg/prometheus-compose.yml b/docker-compose_cfg/prometheus-compose.yml index 8a2d6c3ab..cf2c01bf4 100644 --- a/docker-compose_cfg/prometheus-compose.yml +++ b/docker-compose_cfg/prometheus-compose.yml @@ -13,13 +13,75 @@ scrape_configs: - job_name: 'openIM-server' metrics_path: /metrics static_configs: + - targets: ['localhost:10002'] + labels: + group: 'api' + - targets: ['localhost:10006'] labels: group: 'cms-api' - - targets: ['localhost:21400'] + + + - targets: ['localhost:20110'] + labels: + group: 'user' + + - targets: ['localhost:20120'] + labels: + group: 'friend' + + - targets: ['localhost:20130'] + labels: + group: 'message' + + - targets: ['localhost:20140'] + labels: + group: 'msg-gateway' + + - targets: ['localhost:20150'] + labels: + group: 'group' + + - targets: ['localhost:20160'] + labels: + group: 'auth' + + - targets: ['localhost:20170'] + labels: + group: 'push' + + - targets: ['localhost:20120'] + labels: + group: 'friend' + + - targets: ['localhost:20200'] + labels: + group: 'admin-cms' + + - targets: ['localhost:20120'] + labels: + group: 'office' + + - targets: ['localhost:20220'] + labels: + group: 'organization' + + - targets: ['localhost:20230'] + labels: + group: 'conversation' + + - targets: ['localhost:20240'] + labels: + group: 'cache' + + - targets: ['localhost:21400', 'localhost:21401', 'localhost:21402', 'localhost:21403'] labels: group: 'msg-transfer' + + + + - job_name: 'node' scrape_interval: 8s diff --git a/internal/api/auth/auth.go b/internal/api/auth/auth.go index 73a071548..b50e9e2ae 100644 --- a/internal/api/auth/auth.go +++ b/internal/api/auth/auth.go @@ -107,7 +107,7 @@ func UserToken(c *gin.Context) { params := api.UserTokenReq{} if err := c.BindJSON(¶ms); err != nil { errMsg := " BindJSON failed " + err.Error() - log.NewError("0", errMsg) + log.NewError(params.OperationID, errMsg) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": errMsg}) return } diff --git a/internal/cms_api/router.go b/internal/cms_api/router.go index 1811fc2a7..69e3357d0 100644 --- a/internal/cms_api/router.go +++ b/internal/cms_api/router.go @@ -11,6 +11,8 @@ import ( "Open_IM/internal/demo/register" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" + "github.com/gin-gonic/gin" ) @@ -18,7 +20,7 @@ func NewGinRouter() *gin.Engine { gin.SetMode(gin.ReleaseMode) baseRouter := gin.Default() if config.Config.Prometheus.Enable { - baseRouter.GET("/metrics", prometheusHandler()) + baseRouter.GET("/metrics", promePkg.PrometheusHandler()) } router := baseRouter.Group("/cms") router.Use(middleware.CorsHandler()) diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 60c97f1de..8269a03b0 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -6,8 +6,11 @@ import ( "Open_IM/pkg/statistics" "fmt" - "github.com/go-playground/validator/v10" "sync" + + promePkg "Open_IM/pkg/common/prometheus" + + "github.com/go-playground/validator/v10" ) var ( @@ -34,7 +37,13 @@ func Init(rpcPort, wsPort int) { rpcSvr.onInit(rpcPort) } -func Run() { +func Run(promethuesPort int) { go ws.run() go rpcSvr.run() + go func() { + err := promePkg.StartPromeSrv(promethuesPort) + if err != nil { + panic(err) + } + }() } diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 0a80d155c..b98f0e8a4 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -4,13 +4,10 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/statistics" "fmt" - "net/http" - "strconv" "sync" - - "github.com/prometheus/client_golang/prometheus/promhttp" ) const OnlineTopicBusy = 1 @@ -60,10 +57,12 @@ func Run(promethuesPort int) { go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) - if config.Config.Prometheus.Enable { - http.Handle("/metrics", promhttp.Handler()) - http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil) - } + go func() { + err := promePkg.StartPromeSrv(promethuesPort) + if err != nil { + panic(err) + } + }() } func SetOnlineTopicStatus(status int) { w.Lock() diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 7b45e9234..dec72181b 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -23,7 +23,8 @@ import ( ) var ( - msgInsertMysqlProcessed prometheus.Counter + msgInsertMysqlCounter prometheus.Counter + msgInsertFailedMysqlCounter prometheus.Counter ) type PersistentConsumerHandler struct { @@ -38,13 +39,21 @@ func (pc *PersistentConsumerHandler) Init() { OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) if config.Config.Prometheus.Enable { - msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{ - Name: "insert_mysql_msg_total", - Help: "The total number of msg insert mysql events", - }) + pc.initPrometheus() } } +func (pc *PersistentConsumerHandler) initPrometheus() { + msgInsertMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "insert_mysql_msg_total", + Help: "The total number of msg insert mysql events", + }) + msgInsertFailedMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "insert_mysql_failed_msg_total", + Help: "The total number of msg insert mysql events", + }) +} + func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { msg := cMsg.Value log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey) @@ -76,13 +85,11 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg)) if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil { log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String()) + msgInsertFailedMysqlCounter.Inc() return } - msgInsertMysqlProcessed.Inc() - msgInsertMysqlProcessed.Add(1) if config.Config.Prometheus.Enable { - log.NewDebug(msgFromMQ.OperationID, utils.GetSelfFuncName(), "inc msgInsertMysqlProcessed", msgInsertMysqlProcessed.Desc()) - msgInsertMysqlProcessed.Inc() + msgInsertMysqlCounter.Inc() } } diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 22d03603f..ce996d3c5 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -14,6 +14,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/statistics" "fmt" ) @@ -46,7 +47,13 @@ func init() { } } -func Run() { +func Run(promethuesPort int) { go rpcServer.run() go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) + go func() { + err := promePkg.StartPromeSrv(promethuesPort) + if err != nil { + panic(err) + } + }() } diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 8be927433..8fb99dbfb 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -5,13 +5,14 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" - "Open_IM/pkg/proto/push" + pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" "context" - "google.golang.org/grpc" "net" "strconv" "strings" + + "google.golang.org/grpc" ) type RPCServer struct { diff --git a/internal/rpc/admin_cms/admin_cms.go b/internal/rpc/admin_cms/admin_cms.go index 5d483f753..6c477af16 100644 --- a/internal/rpc/admin_cms/admin_cms.go +++ b/internal/rpc/admin_cms/admin_cms.go @@ -108,9 +108,9 @@ func (s *adminCMSServer) AdminLogin(_ context.Context, req *pbAdminCMS.AdminLogi } admin, err := imdb.GetUserByUserID(req.AdminID) if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID) + log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID, err.Error()) resp.CommonResp.ErrCode = constant.ErrTokenUnknown.ErrCode - resp.CommonResp.ErrMsg = constant.ErrTokenMalformed.ErrMsg + resp.CommonResp.ErrMsg = err.Error() return resp, nil } resp.UserName = admin.Nickname diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index aa7bb2b7e..9c0eb6bc8 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -116,7 +116,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR utils.CopyStructFields(&groupInfo, req.GroupInfo) groupInfo.CreatorUserID = req.OpUserID groupInfo.GroupID = groupId - + groupInfo.CreateTime = time.Now() if groupInfo.NotificationUpdateTime.Unix() < 0 { groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0) } @@ -1373,9 +1373,10 @@ func (s *groupServer) GetGroups(_ context.Context, req *pbGroup.GetGroupsReq) (* return resp, nil } groupInfo.MemberCount = uint32(memberNum) + groupInfo.CreateTime = uint32(groupInfoDB.CreateTime.Unix()) resp.CMSGroups = append(resp.CMSGroups, &pbGroup.CMSGroup{GroupInfo: groupInfo, GroupOwnerUserName: groupMember.Nickname, GroupOwnerUserID: groupMember.UserID}) } else { - groups, err := imdb.GetGroupsByName(req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber) + groups, count, err := imdb.GetGroupsByName(req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsByName error", req.String(), req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber) } @@ -1392,13 +1393,7 @@ func (s *groupServer) GetGroups(_ context.Context, req *pbGroup.GetGroupsReq) (* group.GroupOwnerUserName = groupMember.Nickname resp.CMSGroups = append(resp.CMSGroups, group) } - resp.GroupNum, err = imdb.GetGroupsCountNum(db.Group{GroupName: req.GroupName}) - if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsCountNum error", err.Error()) - resp.CommonResp.ErrCode = constant.ErrDB.ErrCode - resp.CommonResp.ErrMsg = err.Error() - return resp, nil - } + resp.GroupNum = int32(count) } log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "GetGroups resp", resp.String()) return resp, nil diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_model.go index 9f9b10d3c..6f3b230f4 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_model.go @@ -53,11 +53,16 @@ type GroupWithNum struct { MemberCount int `gorm:"column:num"` } -func GetGroupsByName(groupName string, pageNumber, showNumber int32) ([]GroupWithNum, error) { +func GetGroupsByName(groupName string, pageNumber, showNumber int32) ([]GroupWithNum, int64, error) { var groups []GroupWithNum - err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Select("groups.*, (select count(*) from group_members where group_members.group_id=groups.group_id) as num"). - Where(" name like ? and status != ?", fmt.Sprintf("%%%s%%", groupName), constant.GroupStatusDismissed).Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&groups).Error - return groups, err + var count int64 + sql := db.DB.MysqlDB.DefaultGormDB().Table("groups").Select("groups.*, (select count(*) from group_members where group_members.group_id=groups.group_id) as num"). + Where(" name like ? and status != ?", fmt.Sprintf("%%%s%%", groupName), constant.GroupStatusDismissed) + if err := sql.Count(&count).Error; err != nil { + return nil, 0, err + } + err := sql.Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&groups).Error + return groups, count, err } func GetGroups(pageNumber, showNumber int) ([]GroupWithNum, error) { diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go new file mode 100644 index 000000000..5d7e3d901 --- /dev/null +++ b/pkg/common/prometheus/prometheus.go @@ -0,0 +1,26 @@ +package prometheus + +import ( + "Open_IM/pkg/common/config" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func StartPromeSrv(promethuesPort int) error { + if config.Config.Prometheus.Enable { + http.Handle("/metrics", promhttp.Handler()) + err := http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil) + return err + } + return nil +} + +func PrometheusHandler() gin.HandlerFunc { + h := promhttp.Handler() + return func(c *gin.Context) { + h.ServeHTTP(c.Writer, c.Request) + } +} diff --git a/script/msg_gateway_start.sh b/script/msg_gateway_start.sh old mode 100644 new mode 100755 index d61edf692..46e43b194 --- a/script/msg_gateway_start.sh +++ b/script/msg_gateway_start.sh @@ -7,13 +7,16 @@ ulimit -n 200000 list1=$(cat $config_path | grep openImMessageGatewayPort | awk -F '[:]' '{print $NF}') list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}') +list3=$(cat $config_path | grep messageGatewayPrometheusPort | awk -F '[:]' '{print $NF}') list_to_string $list1 rpc_ports=($ports_array) list_to_string $list2 ws_ports=($ports_array) +list_to_string $list3 +prome_ports=($ports_array) if [ ${#rpc_ports[@]} -ne ${#ws_ports[@]} ]; then - echo -e ${RED_PREFIX}"ws_ports does not match push_rpc_ports in quantity!!!"${COLOR_SUFFIX} + echo -e ${RED_PREFIX}"ws_ports does not match push_rpc_ports or prome_ports in quantity!!!"${COLOR_SUFFIX} exit -1 fi @@ -28,7 +31,7 @@ fi sleep 1 cd ${msg_gateway_binary_root} for ((i = 0; i < ${#ws_ports[@]}; i++)); do - nohup ./${msg_gateway_name} -rpc_port ${rpc_ports[$i]} -ws_port ${ws_ports[$i]} >>../logs/openIM.log 2>&1 & + nohup ./${msg_gateway_name} -rpc_port ${rpc_ports[$i]} -ws_port ${ws_ports[$i]} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 & done #Check launched service process diff --git a/script/msg_transfer_start.sh b/script/msg_transfer_start.sh old mode 100644 new mode 100755 index 4d7fb96cd..b54513999 --- a/script/msg_transfer_start.sh +++ b/script/msg_transfer_start.sh @@ -2,7 +2,11 @@ #Include shell font styles and some basic information source ./style_info.cfg source ./path_info.cfg +source ./function.sh +list1=$(cat $config_path | grep messageTransferPrometheusPort | awk -F '[:]' '{print $NF}') +list_to_string $list1 +prome_ports=($ports_array) #Check if the service exists @@ -18,7 +22,7 @@ sleep 1 cd ${msg_transfer_binary_root} for ((i = 0; i < ${msg_transfer_service_num}; i++)); do - nohup ./${msg_transfer_name} >>../logs/openIM.log 2>&1 & + nohup ./${msg_transfer_name} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 & done #Check launched service process diff --git a/script/push_start.sh b/script/push_start.sh index 8b2a47896..cbaffe7be 100644 --- a/script/push_start.sh +++ b/script/push_start.sh @@ -7,8 +7,11 @@ source ./function.sh list1=$(cat $config_path | grep openImPushPort | awk -F '[:]' '{print $NF}') +list2=$(cat $config_path | grep pushPrometheusPort | awk -F '[:]' '{print $NF}') list_to_string $list1 rpc_ports=($ports_array) +list_to_string $list2 +prome_ports=($ports_array) #Check if the service exists #If it is exists,kill this process @@ -22,7 +25,7 @@ sleep 1 cd ${push_binary_root} for ((i = 0; i < ${#rpc_ports[@]}; i++)); do - nohup ./${push_name} -port ${rpc_ports[$i]} >>../logs/openIM.log 2>&1 & + nohup ./${push_name} -port ${rpc_ports[$i]} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 & done sleep 3 diff --git a/script/start_rpc_service.sh b/script/start_rpc_service.sh index 1f9a7b30b..bb53a85b6 100644 --- a/script/start_rpc_service.sh +++ b/script/start_rpc_service.sh @@ -40,6 +40,23 @@ service_port_name=( openImCachePort ) +service_prometheus_port_name=( + #api port name + openImApiPort + openImCmsApiPort + #rpc port name + userPrometheusPort + friendPrometheusPort + groupPrometheusPort + authPrometheusPort + adminCmsPrometheusPort + messagePrometheusPort + officePrometheusPort + organizationPrometheusPort + conversationPrometheusPort + cachePrometheusPort +) + for ((i = 0; i < ${#service_filename[*]}; i++)); do #Check whether the service exists service_name="ps -aux |grep -w ${service_filename[$i]} |grep -v grep" @@ -57,13 +74,24 @@ for ((i = 0; i < ${#service_filename[*]}; i++)); do #Get the rpc port in the configuration file portList=$(cat $config_path | grep ${service_port_name[$i]} | awk -F '[:]' '{print $NF}') list_to_string ${portList} + service_ports=($ports_array) + + portList2=$(cat $config_path | grep ${service_prometheus_port_name[$i]} | awk -F '[:]' '{print $NF}') + list_to_string $portList2 + prome_ports=($ports_array) #Start related rpc services based on the number of ports - for j in ${ports_array}; do + # for j in ${service_ports}; do + for ((j = 0; j < ${#service_ports[*]}; j++)); do #Start the service in the background # ./${service_filename[$i]} -port $j & - nohup ./${service_filename[$i]} -port $j >>../logs/openIM.log 2>&1 & + cmd="./${service_filename[$i]} -port ${service_ports[$j]} -prometheus_port ${prome_ports[$j]}" + if [ $i -eq 0 -o $i -eq 1 ]; then + cmd="./${service_filename[$i]} -port ${service_ports[$j]}" + fi + echo $cmd + nohup $cmd >>../logs/openIM.log 2>&1 & sleep 1 pid="netstat -ntlp|grep $j |awk '{printf \$7}'|cut -d/ -f1" - echo -e "${GREEN_PREFIX}${service_filename[$i]} start success,port number:$j pid:$(eval $pid)$COLOR_SUFFIX" + echo -e "${GREEN_PREFIX}${service_filename[$i]} start success,port number:${service_ports[$j]} pid:$(eval $pid)$COLOR_SUFFIX" done done