diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index 4edf037c4..3ed7657de 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -30,6 +30,7 @@ import ( "github.com/gin-gonic/gin" //"syscall" "Open_IM/pkg/common/constant" + promePkg "Open_IM/pkg/common/prometheus" ) // @title open-IM-Server API @@ -49,6 +50,7 @@ func main() { r.Use(utils.CorsHandler()) log.Info("load config: ", config.Config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) + r.GET("/metrics", promePkg.PrometheusHandler()) // user routing group, which handles user registration and login services userRouterGroup := r.Group("/user") { diff --git a/cmd/open_im_demo/main.go b/cmd/open_im_demo/main.go index 988f04809..1125c7827 100644 --- a/cmd/open_im_demo/main.go +++ b/cmd/open_im_demo/main.go @@ -13,6 +13,8 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" + "github.com/gin-gonic/gin" ) @@ -21,10 +23,11 @@ func main() { gin.SetMode(gin.ReleaseMode) f, _ := os.Create("../logs/api.log") gin.DefaultWriter = io.MultiWriter(f) - r := gin.Default() r.Use(utils.CorsHandler()) - + if config.Config.Prometheus.Enable { + r.GET("/metrics", promePkg.PrometheusHandler()) + } authRouterGroup := r.Group("/demo") { authRouterGroup.POST("/code", register.SendVerificationCode) diff --git a/cmd/open_im_msg_gateway/main.go b/cmd/open_im_msg_gateway/main.go index 5996a4ac5..9429c0ad1 100644 --- a/cmd/open_im_msg_gateway/main.go +++ b/cmd/open_im_msg_gateway/main.go @@ -14,13 +14,15 @@ func main() { log.NewPrivateLog(constant.LogFileName) defaultRpcPorts := config.Config.RpcPort.OpenImMessageGatewayPort defaultWsPorts := config.Config.LongConnSvr.WebsocketPort + defaultPromePorts := config.Config.Prometheus.MessageGatewayPrometheusPort rpcPort := flag.Int("rpc_port", defaultRpcPorts[0], "rpc listening port") wsPort := flag.Int("ws_port", defaultWsPorts[0], "ws listening port") + prometheusPort := flag.Int("prometheus_port", defaultPromePorts[0], "PushrometheusPort default listen port") flag.Parse() var wg sync.WaitGroup wg.Add(1) fmt.Println("start rpc/msg_gateway server, port: ", *rpcPort, *wsPort) gate.Init(*rpcPort, *wsPort) - gate.Run() + gate.Run(*prometheusPort) wg.Wait() } diff --git a/cmd/open_im_msg_transfer/main.go b/cmd/open_im_msg_transfer/main.go index a43745c6c..129e21f31 100644 --- a/cmd/open_im_msg_transfer/main.go +++ b/cmd/open_im_msg_transfer/main.go @@ -13,10 +13,10 @@ import ( func main() { var wg sync.WaitGroup wg.Add(1) - rpcPort := flag.Int("port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "MessageTransferPrometheusPort default listen port") + prometheusPort := flag.Int("port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "MessageTransferPrometheusPort default listen port") log.NewPrivateLog(constant.LogFileName) logic.Init() fmt.Println("start msg_transfer server") - logic.Run(*rpcPort) + logic.Run(*prometheusPort) wg.Wait() } diff --git a/cmd/open_im_push/main.go b/cmd/open_im_push/main.go index 63b2aaa12..806515524 100644 --- a/cmd/open_im_push/main.go +++ b/cmd/open_im_push/main.go @@ -13,12 +13,13 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImPushPort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "PushrometheusPort default listen port") flag.Parse() var wg sync.WaitGroup wg.Add(1) log.NewPrivateLog(constant.LogFileName) fmt.Println("start push rpc server, port: ", *rpcPort) logic.Init(*rpcPort) - logic.Run() + logic.Run(*prometheusPort) wg.Wait() } diff --git a/cmd/rpc/open_im_admin_cms/main.go b/cmd/rpc/open_im_admin_cms/main.go index eeb4ca588..83c269e65 100644 --- a/cmd/rpc/open_im_admin_cms/main.go +++ b/cmd/rpc/open_im_admin_cms/main.go @@ -3,6 +3,7 @@ package main import ( rpcMessageCMS "Open_IM/internal/rpc/admin_cms" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImAdminCmsPort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.AdminCmsPrometheusPort[0], "adminCMSPrometheusPort default listen port") flag.Parse() fmt.Println("start cms rpc server, port: ", *rpcPort) rpcServer := rpcMessageCMS.NewAdminCMSServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_auth/main.go b/cmd/rpc/open_im_auth/main.go index 939c7d6f2..e5a253bfb 100644 --- a/cmd/rpc/open_im_auth/main.go +++ b/cmd/rpc/open_im_auth/main.go @@ -3,6 +3,7 @@ package main import ( rpcAuth "Open_IM/internal/rpc/auth" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,9 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImAuthPort rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.AuthPrometheusPort[0], "authPrometheusPort default listen port") flag.Parse() fmt.Println("start auth rpc server, port: ", *rpcPort) rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() - } diff --git a/cmd/rpc/open_im_cache/main.go b/cmd/rpc/open_im_cache/main.go index d0aacfbc9..2200c7ca9 100644 --- a/cmd/rpc/open_im_cache/main.go +++ b/cmd/rpc/open_im_cache/main.go @@ -3,6 +3,8 @@ package main import ( rpcCache "Open_IM/internal/rpc/cache" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" + "flag" "fmt" ) @@ -10,9 +12,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImCachePort rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.CachePrometheusPort[0], "cachePrometheusPort default listen port") flag.Parse() - fmt.Println("start auth rpc server, port: ", *rpcPort) + fmt.Println("start cache rpc server, port: ", *rpcPort) rpcServer := rpcCache.NewCacheServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() - } diff --git a/cmd/rpc/open_im_conversation/main.go b/cmd/rpc/open_im_conversation/main.go index 53769c4ac..f53b7315f 100644 --- a/cmd/rpc/open_im_conversation/main.go +++ b/cmd/rpc/open_im_conversation/main.go @@ -3,6 +3,7 @@ package main import ( rpcConversation "Open_IM/internal/rpc/conversation" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,9 +11,16 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImConversationPort rpcPort := flag.Int("port", defaultPorts[0], "RpcConversation default listen port 11300") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.ConversationPrometheusPort[0], "conversationPrometheusPort default listen port") flag.Parse() fmt.Println("start conversation rpc server, port: ", *rpcPort) rpcServer := rpcConversation.NewRpcConversationServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_friend/main.go b/cmd/rpc/open_im_friend/main.go index b6a650e63..46db5c448 100644 --- a/cmd/rpc/open_im_friend/main.go +++ b/cmd/rpc/open_im_friend/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/friend" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImFriendPort rpcPort := flag.Int("port", defaultPorts[0], "get RpcFriendPort from cmd,default 12000 as port") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.FriendPrometheusPort[0], "friendPrometheusPort default listen port") flag.Parse() fmt.Println("start friend rpc server, port: ", *rpcPort) rpcServer := friend.NewFriendServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_group/main.go b/cmd/rpc/open_im_group/main.go index 14cb56722..b747fa589 100644 --- a/cmd/rpc/open_im_group/main.go +++ b/cmd/rpc/open_im_group/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/group" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImGroupPort rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port") flag.Parse() fmt.Println("start group rpc server, port: ", *rpcPort) rpcServer := group.NewGroupServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_msg/main.go b/cmd/rpc/open_im_msg/main.go index 212d00472..2f766750d 100644 --- a/cmd/rpc/open_im_msg/main.go +++ b/cmd/rpc/open_im_msg/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImMessagePort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.MessagePrometheusPort[0], "msgPrometheusPort default listen port") flag.Parse() fmt.Println("start msg rpc server, port: ", *rpcPort) rpcServer := msg.NewRpcChatServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_office/main.go b/cmd/rpc/open_im_office/main.go index 51ec154f0..8c250b1bb 100644 --- a/cmd/rpc/open_im_office/main.go +++ b/cmd/rpc/open_im_office/main.go @@ -3,6 +3,7 @@ package main import ( rpc "Open_IM/internal/rpc/office" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImOfficePort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.OfficePrometheusPort[0], "officePrometheusPort default listen port") flag.Parse() fmt.Println("start office rpc server, port: ", *rpcPort) rpcServer := rpc.NewOfficeServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_organization/main.go b/cmd/rpc/open_im_organization/main.go index 2e8b94bb1..9494fd4bd 100644 --- a/cmd/rpc/open_im_organization/main.go +++ b/cmd/rpc/open_im_organization/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/organization" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImOrganizationPort rpcPort := flag.Int("port", defaultPorts[0], "get RpcOrganizationPort from cmd,default 11200 as port") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.OrganizationPrometheusPort[0], "organizationPrometheusPort default listen port") flag.Parse() fmt.Println("start organization rpc server, port: ", *rpcPort) rpcServer := organization.NewServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/cmd/rpc/open_im_user/main.go b/cmd/rpc/open_im_user/main.go index e50ebef35..08a42c428 100644 --- a/cmd/rpc/open_im_user/main.go +++ b/cmd/rpc/open_im_user/main.go @@ -3,6 +3,7 @@ package main import ( "Open_IM/internal/rpc/user" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" "flag" "fmt" ) @@ -10,8 +11,15 @@ import ( func main() { defaultPorts := config.Config.RpcPort.OpenImUserPort rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") + prometheusPort := flag.Int("promethus-port", config.Config.Prometheus.UserPrometheusPort[0], "userPrometheusPort default listen port") flag.Parse() fmt.Println("start user rpc server, port: ", *rpcPort) rpcServer := user.NewUserServer(*rpcPort) + go func() { + err := promePkg.StartPromeSrv(*prometheusPort) + if err != nil { + panic(err) + } + }() rpcServer.Run() } diff --git a/config/config.yaml b/config/config.yaml index 246312aa1..d277b6442 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -752,6 +752,7 @@ demo: rtc: signalTimeout: 35 +# prometheus每个服务监听的端口数量需要和rpc port保持一致 prometheus: enable: false userPrometheusPort: [ 20110 ] @@ -767,4 +768,4 @@ prometheus: conversationPrometheusPort: [ 20230 ] cachePrometheusPort: [ 20240 ] realTimeCommPrometheusPort: [ 21300 ] - messageTransferPrometheusPort: [ 21400 ] \ No newline at end of file + messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] # 端口数量和 script/path_info.cfg msg_transfer_service_num保持一致 \ No newline at end of file diff --git a/docker-compose_cfg/prometheus-compose.yml b/docker-compose_cfg/prometheus-compose.yml index 8a2d6c3ab..3782c5e56 100644 --- a/docker-compose_cfg/prometheus-compose.yml +++ b/docker-compose_cfg/prometheus-compose.yml @@ -13,13 +13,75 @@ scrape_configs: - job_name: 'openIM-server' metrics_path: /metrics static_configs: + - targets: ['localhost:10002'] + labels: + group: 'api' + - targets: ['localhost:10006'] labels: group: 'cms-api' - - targets: ['localhost:21400'] + + + - targets: ['localhost:20110'] + labels: + group: 'user' + + - targets: ['localhost:20120'] + labels: + group: 'friend' + + - targets: ['localhost:20130'] + labels: + group: 'message' + + - targets: ['localhost:20140'] + labels: + group: 'msg-gateway' + + - targets: ['localhost:20150'] + labels: + group: 'group' + + - targets: ['localhost:20160'] + labels: + group: 'auth' + + - targets: ['localhost:20170'] + labels: + group: 'push' + + - targets: ['localhost:20120'] + labels: + group: 'friend' + + - targets: ['localhost:20200'] + labels: + group: 'admin-cms' + + - targets: ['localhost:20120'] + labels: + group: 'office' + + - targets: ['localhost:20220'] + labels: + group: 'organization' + + - targets: ['localhost:20230'] + labels: + group: 'conversation' + + - targets: ['localhost:20240'] + labels: + group: 'cache' + + - targets: ['localhost:21400', 'localhost:21401', 'localhost:21402', 'localhost:21403'] labels: group: 'msg-transfer' + + + + - job_name: 'node' scrape_interval: 8s diff --git a/internal/api/auth/auth.go b/internal/api/auth/auth.go index 73a071548..b50e9e2ae 100644 --- a/internal/api/auth/auth.go +++ b/internal/api/auth/auth.go @@ -107,7 +107,7 @@ func UserToken(c *gin.Context) { params := api.UserTokenReq{} if err := c.BindJSON(¶ms); err != nil { errMsg := " BindJSON failed " + err.Error() - log.NewError("0", errMsg) + log.NewError(params.OperationID, errMsg) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": errMsg}) return } diff --git a/internal/cms_api/router.go b/internal/cms_api/router.go index 1811fc2a7..69e3357d0 100644 --- a/internal/cms_api/router.go +++ b/internal/cms_api/router.go @@ -11,6 +11,8 @@ import ( "Open_IM/internal/demo/register" "Open_IM/pkg/common/config" + promePkg "Open_IM/pkg/common/prometheus" + "github.com/gin-gonic/gin" ) @@ -18,7 +20,7 @@ func NewGinRouter() *gin.Engine { gin.SetMode(gin.ReleaseMode) baseRouter := gin.Default() if config.Config.Prometheus.Enable { - baseRouter.GET("/metrics", prometheusHandler()) + baseRouter.GET("/metrics", promePkg.PrometheusHandler()) } router := baseRouter.Group("/cms") router.Use(middleware.CorsHandler()) diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 60c97f1de..8269a03b0 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -6,8 +6,11 @@ import ( "Open_IM/pkg/statistics" "fmt" - "github.com/go-playground/validator/v10" "sync" + + promePkg "Open_IM/pkg/common/prometheus" + + "github.com/go-playground/validator/v10" ) var ( @@ -34,7 +37,13 @@ func Init(rpcPort, wsPort int) { rpcSvr.onInit(rpcPort) } -func Run() { +func Run(promethuesPort int) { go ws.run() go rpcSvr.run() + go func() { + err := promePkg.StartPromeSrv(promethuesPort) + if err != nil { + panic(err) + } + }() } diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 0a80d155c..b98f0e8a4 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -4,13 +4,10 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/statistics" "fmt" - "net/http" - "strconv" "sync" - - "github.com/prometheus/client_golang/prometheus/promhttp" ) const OnlineTopicBusy = 1 @@ -60,10 +57,12 @@ func Run(promethuesPort int) { go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) - if config.Config.Prometheus.Enable { - http.Handle("/metrics", promhttp.Handler()) - http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil) - } + go func() { + err := promePkg.StartPromeSrv(promethuesPort) + if err != nil { + panic(err) + } + }() } func SetOnlineTopicStatus(status int) { w.Lock() diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 7b45e9234..dec72181b 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -23,7 +23,8 @@ import ( ) var ( - msgInsertMysqlProcessed prometheus.Counter + msgInsertMysqlCounter prometheus.Counter + msgInsertFailedMysqlCounter prometheus.Counter ) type PersistentConsumerHandler struct { @@ -38,13 +39,21 @@ func (pc *PersistentConsumerHandler) Init() { OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) if config.Config.Prometheus.Enable { - msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{ - Name: "insert_mysql_msg_total", - Help: "The total number of msg insert mysql events", - }) + pc.initPrometheus() } } +func (pc *PersistentConsumerHandler) initPrometheus() { + msgInsertMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "insert_mysql_msg_total", + Help: "The total number of msg insert mysql events", + }) + msgInsertFailedMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "insert_mysql_failed_msg_total", + Help: "The total number of msg insert mysql events", + }) +} + func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { msg := cMsg.Value log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey) @@ -76,13 +85,11 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg)) if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil { log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String()) + msgInsertFailedMysqlCounter.Inc() return } - msgInsertMysqlProcessed.Inc() - msgInsertMysqlProcessed.Add(1) if config.Config.Prometheus.Enable { - log.NewDebug(msgFromMQ.OperationID, utils.GetSelfFuncName(), "inc msgInsertMysqlProcessed", msgInsertMysqlProcessed.Desc()) - msgInsertMysqlProcessed.Inc() + msgInsertMysqlCounter.Inc() } } diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 22d03603f..ce996d3c5 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -14,6 +14,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/statistics" "fmt" ) @@ -46,7 +47,13 @@ func init() { } } -func Run() { +func Run(promethuesPort int) { go rpcServer.run() go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) + go func() { + err := promePkg.StartPromeSrv(promethuesPort) + if err != nil { + panic(err) + } + }() } diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 8be927433..8fb99dbfb 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -5,13 +5,14 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" - "Open_IM/pkg/proto/push" + pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" "context" - "google.golang.org/grpc" "net" "strconv" "strings" + + "google.golang.org/grpc" ) type RPCServer struct { diff --git a/internal/rpc/admin_cms/admin_cms.go b/internal/rpc/admin_cms/admin_cms.go index 5d483f753..6c477af16 100644 --- a/internal/rpc/admin_cms/admin_cms.go +++ b/internal/rpc/admin_cms/admin_cms.go @@ -108,9 +108,9 @@ func (s *adminCMSServer) AdminLogin(_ context.Context, req *pbAdminCMS.AdminLogi } admin, err := imdb.GetUserByUserID(req.AdminID) if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID) + log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID, err.Error()) resp.CommonResp.ErrCode = constant.ErrTokenUnknown.ErrCode - resp.CommonResp.ErrMsg = constant.ErrTokenMalformed.ErrMsg + resp.CommonResp.ErrMsg = err.Error() return resp, nil } resp.UserName = admin.Nickname diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go new file mode 100644 index 000000000..5d7e3d901 --- /dev/null +++ b/pkg/common/prometheus/prometheus.go @@ -0,0 +1,26 @@ +package prometheus + +import ( + "Open_IM/pkg/common/config" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func StartPromeSrv(promethuesPort int) error { + if config.Config.Prometheus.Enable { + http.Handle("/metrics", promhttp.Handler()) + err := http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil) + return err + } + return nil +} + +func PrometheusHandler() gin.HandlerFunc { + h := promhttp.Handler() + return func(c *gin.Context) { + h.ServeHTTP(c.Writer, c.Request) + } +} diff --git a/script/msg_gateway_start.sh b/script/msg_gateway_start.sh old mode 100644 new mode 100755 index d61edf692..ea33ea93b --- a/script/msg_gateway_start.sh +++ b/script/msg_gateway_start.sh @@ -7,13 +7,16 @@ ulimit -n 200000 list1=$(cat $config_path | grep openImMessageGatewayPort | awk -F '[:]' '{print $NF}') list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}') +list3=$(cat $config_path | grep messageGatewayPrometheusPort | awk -F '[:]' '{print $NF}') list_to_string $list1 rpc_ports=($ports_array) list_to_string $list2 ws_ports=($ports_array) -if [ ${#rpc_ports[@]} -ne ${#ws_ports[@]} ]; then +list_to_string $list3 +prome_ports=($ports_array) +if [ ${#rpc_ports[@]} -ne ${#ws_ports[@]} -ne ${#prome_ports[@]} ]; then - echo -e ${RED_PREFIX}"ws_ports does not match push_rpc_ports in quantity!!!"${COLOR_SUFFIX} + echo -e ${RED_PREFIX}"ws_ports does not match push_rpc_ports or prome_ports in quantity!!!"${COLOR_SUFFIX} exit -1 fi @@ -28,7 +31,7 @@ fi sleep 1 cd ${msg_gateway_binary_root} for ((i = 0; i < ${#ws_ports[@]}; i++)); do - nohup ./${msg_gateway_name} -rpc_port ${rpc_ports[$i]} -ws_port ${ws_ports[$i]} >>../logs/openIM.log 2>&1 & + nohup ./${msg_gateway_name} -rpc_port ${rpc_ports[$i]} -ws_port ${ws_ports[$i]} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 & done #Check launched service process diff --git a/script/msg_transfer_start.sh b/script/msg_transfer_start.sh old mode 100644 new mode 100755 index 4d7fb96cd..45c4385c0 --- a/script/msg_transfer_start.sh +++ b/script/msg_transfer_start.sh @@ -3,6 +3,8 @@ source ./style_info.cfg source ./path_info.cfg +list1=$(cat $config_path | grep messageTransferPrometheusPort | awk -F '[:]' '{print $NF}') +prome_ports=($ports_array) #Check if the service exists @@ -18,7 +20,7 @@ sleep 1 cd ${msg_transfer_binary_root} for ((i = 0; i < ${msg_transfer_service_num}; i++)); do - nohup ./${msg_transfer_name} >>../logs/openIM.log 2>&1 & + nohup ./${msg_transfer_name} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 & done #Check launched service process diff --git a/script/push_start.sh b/script/push_start.sh index 8b2a47896..cbaffe7be 100644 --- a/script/push_start.sh +++ b/script/push_start.sh @@ -7,8 +7,11 @@ source ./function.sh list1=$(cat $config_path | grep openImPushPort | awk -F '[:]' '{print $NF}') +list2=$(cat $config_path | grep pushPrometheusPort | awk -F '[:]' '{print $NF}') list_to_string $list1 rpc_ports=($ports_array) +list_to_string $list2 +prome_ports=($ports_array) #Check if the service exists #If it is exists,kill this process @@ -22,7 +25,7 @@ sleep 1 cd ${push_binary_root} for ((i = 0; i < ${#rpc_ports[@]}; i++)); do - nohup ./${push_name} -port ${rpc_ports[$i]} >>../logs/openIM.log 2>&1 & + nohup ./${push_name} -port ${rpc_ports[$i]} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 & done sleep 3 diff --git a/script/start_rpc_service.sh b/script/start_rpc_service.sh index 1f9a7b30b..eb3d21214 100644 --- a/script/start_rpc_service.sh +++ b/script/start_rpc_service.sh @@ -40,6 +40,23 @@ service_port_name=( openImCachePort ) +service_prometheus_port_name=( + #api port name + openImApiPort + openImCmsApiPort + #rpc port name + userPrometheusPort + friendPrometheusPort + groupPrometheusPort + authPrometheusPort + adminCmsPrometheusPort + messagePrometheusPort + officePrometheusPort + organizationPrometheusPort + conversationPrometheusPort + cachePrometheusPort +) + for ((i = 0; i < ${#service_filename[*]}; i++)); do #Check whether the service exists service_name="ps -aux |grep -w ${service_filename[$i]} |grep -v grep" @@ -57,11 +74,16 @@ for ((i = 0; i < ${#service_filename[*]}; i++)); do #Get the rpc port in the configuration file portList=$(cat $config_path | grep ${service_port_name[$i]} | awk -F '[:]' '{print $NF}') list_to_string ${portList} + service_ports=($ports_array) + + portList2=$(cat $config_path | grep pushPrometheusPort | awk -F '[:]' '{print $NF}') + list_to_string $portList2 + prome_ports=($ports_array) #Start related rpc services based on the number of ports - for j in ${ports_array}; do + for j in ${service_ports}; do #Start the service in the background # ./${service_filename[$i]} -port $j & - nohup ./${service_filename[$i]} -port $j >>../logs/openIM.log 2>&1 & + nohup ./${service_filename[$i]} -port $j -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 & sleep 1 pid="netstat -ntlp|grep $j |awk '{printf \$7}'|cut -d/ -f1" echo -e "${GREEN_PREFIX}${service_filename[$i]} start success,port number:$j pid:$(eval $pid)$COLOR_SUFFIX"