Merge branch 'v2.3.0release'

pull/284/head
wangchuxiao 2 years ago
commit eb30e54dc8

@ -30,6 +30,7 @@ import (
"github.com/gin-gonic/gin"
//"syscall"
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
)
// @title open-IM-Server API
@ -49,6 +50,9 @@ func main() {
r.Use(utils.CorsHandler())
log.Info("load config: ", config.Config)
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
if config.Config.Prometheus.Enable {
r.GET("/metrics", promePkg.PrometheusHandler())
}
// user routing group, which handles user registration and login services
userRouterGroup := r.Group("/user")
{

@ -13,6 +13,8 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"github.com/gin-gonic/gin"
)
@ -21,10 +23,11 @@ func main() {
gin.SetMode(gin.ReleaseMode)
f, _ := os.Create("../logs/api.log")
gin.DefaultWriter = io.MultiWriter(f)
r := gin.Default()
r.Use(utils.CorsHandler())
if config.Config.Prometheus.Enable {
r.GET("/metrics", promePkg.PrometheusHandler())
}
authRouterGroup := r.Group("/demo")
{
authRouterGroup.POST("/code", register.SendVerificationCode)

@ -14,13 +14,15 @@ func main() {
log.NewPrivateLog(constant.LogFileName)
defaultRpcPorts := config.Config.RpcPort.OpenImMessageGatewayPort
defaultWsPorts := config.Config.LongConnSvr.WebsocketPort
defaultPromePorts := config.Config.Prometheus.MessageGatewayPrometheusPort
rpcPort := flag.Int("rpc_port", defaultRpcPorts[0], "rpc listening port")
wsPort := flag.Int("ws_port", defaultWsPorts[0], "ws listening port")
prometheusPort := flag.Int("prometheus_port", defaultPromePorts[0], "PushrometheusPort default listen port")
flag.Parse()
var wg sync.WaitGroup
wg.Add(1)
fmt.Println("start rpc/msg_gateway server, port: ", *rpcPort, *wsPort)
fmt.Println("start rpc/msg_gateway server, port: ", *rpcPort, *wsPort, *prometheusPort)
gate.Init(*rpcPort, *wsPort)
gate.Run()
gate.Run(*prometheusPort)
wg.Wait()
}

@ -13,10 +13,11 @@ import (
func main() {
var wg sync.WaitGroup
wg.Add(1)
rpcPort := flag.Int("port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "MessageTransferPrometheusPort default listen port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "MessageTransferPrometheusPort default listen port")
flag.Parse()
log.NewPrivateLog(constant.LogFileName)
logic.Init()
fmt.Println("start msg_transfer server")
logic.Run(*rpcPort)
logic.Run(*prometheusPort)
wg.Wait()
}

@ -13,12 +13,13 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImPushPort
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "PushrometheusPort default listen port")
flag.Parse()
var wg sync.WaitGroup
wg.Add(1)
log.NewPrivateLog(constant.LogFileName)
fmt.Println("start push rpc server, port: ", *rpcPort)
logic.Init(*rpcPort)
logic.Run()
logic.Run(*prometheusPort)
wg.Wait()
}

@ -3,6 +3,7 @@ package main
import (
rpcMessageCMS "Open_IM/internal/rpc/admin_cms"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,8 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImAdminCmsPort
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AdminCmsPrometheusPort[0], "adminCMSPrometheusPort default listen port")
flag.Parse()
fmt.Println("start cms rpc server, port: ", *rpcPort)
rpcServer := rpcMessageCMS.NewAdminCMSServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
rpcAuth "Open_IM/internal/rpc/auth"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,9 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImAuthPort
rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AuthPrometheusPort[0], "authPrometheusPort default listen port")
flag.Parse()
fmt.Println("start auth rpc server, port: ", *rpcPort)
rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,8 @@ package main
import (
rpcCache "Open_IM/internal/rpc/cache"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,9 +12,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImCachePort
rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.CachePrometheusPort[0], "cachePrometheusPort default listen port")
flag.Parse()
fmt.Println("start auth rpc server, port: ", *rpcPort)
fmt.Println("start cache rpc server, port: ", *rpcPort)
rpcServer := rpcCache.NewCacheServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
rpcConversation "Open_IM/internal/rpc/conversation"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,9 +11,16 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImConversationPort
rpcPort := flag.Int("port", defaultPorts[0], "RpcConversation default listen port 11300")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.ConversationPrometheusPort[0], "conversationPrometheusPort default listen port")
flag.Parse()
fmt.Println("start conversation rpc server, port: ", *rpcPort)
rpcServer := rpcConversation.NewRpcConversationServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
"Open_IM/internal/rpc/friend"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,8 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImFriendPort
rpcPort := flag.Int("port", defaultPorts[0], "get RpcFriendPort from cmd,default 12000 as port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.FriendPrometheusPort[0], "friendPrometheusPort default listen port")
flag.Parse()
fmt.Println("start friend rpc server, port: ", *rpcPort)
rpcServer := friend.NewFriendServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
"Open_IM/internal/rpc/group"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,8 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImGroupPort
rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port")
flag.Parse()
fmt.Println("start group rpc server, port: ", *rpcPort)
rpcServer := group.NewGroupServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
"Open_IM/internal/rpc/msg"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,8 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImMessagePort
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessagePrometheusPort[0], "msgPrometheusPort default listen port")
flag.Parse()
fmt.Println("start msg rpc server, port: ", *rpcPort)
rpcServer := msg.NewRpcChatServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
rpc "Open_IM/internal/rpc/office"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,8 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImOfficePort
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.OfficePrometheusPort[0], "officePrometheusPort default listen port")
flag.Parse()
fmt.Println("start office rpc server, port: ", *rpcPort)
rpcServer := rpc.NewOfficeServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
"Open_IM/internal/rpc/organization"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,8 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImOrganizationPort
rpcPort := flag.Int("port", defaultPorts[0], "get RpcOrganizationPort from cmd,default 11200 as port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.OrganizationPrometheusPort[0], "organizationPrometheusPort default listen port")
flag.Parse()
fmt.Println("start organization rpc server, port: ", *rpcPort)
rpcServer := organization.NewServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -3,6 +3,7 @@ package main
import (
"Open_IM/internal/rpc/user"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
@ -10,8 +11,15 @@ import (
func main() {
defaultPorts := config.Config.RpcPort.OpenImUserPort
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.UserPrometheusPort[0], "userPrometheusPort default listen port")
flag.Parse()
fmt.Println("start user rpc server, port: ", *rpcPort)
rpcServer := user.NewUserServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
}

@ -752,6 +752,7 @@ demo:
rtc:
signalTimeout: 35
# prometheus每个服务监听的端口数量需要和rpc port保持一致
prometheus:
enable: false
userPrometheusPort: [ 20110 ]
@ -767,4 +768,4 @@ prometheus:
conversationPrometheusPort: [ 20230 ]
cachePrometheusPort: [ 20240 ]
realTimeCommPrometheusPort: [ 21300 ]
messageTransferPrometheusPort: [ 21400 ]
messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] # 端口数量和 script/path_info.cfg msg_transfer_service_num保持一致

@ -10,8 +10,6 @@ service=(
group
auth
admin-cms
message-cms
statistics
office
organization
conversation

@ -1,34 +0,0 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: message-cms-deployment
spec:
selector:
matchLabels:
app: message-cms # 选择这个指定标签执行
replicas: 1 # 运行pod数量
template:
metadata:
labels:
app: message-cms # 标签
spec:
containers:
- name: message-cms
image: openim/message_cms:v2.3.0release
# imagePullPolicy: Always
ports:
- containerPort: 10190
volumeMounts:
- name: config
mountPath: /Open-IM-Server/config
readOnly: true
env:
- name: CONFIG_NAME
value: "/Open-IM-Server"
volumes:
- name: config
configMap:
name: openim-config
strategy: #更新策略
type: RollingUpdate # 滚动更新

@ -1,19 +0,0 @@
FROM ubuntu
# 设置固定的项目路径
ENV WORKDIR /Open-IM-Server
ENV CMDDIR $WORKDIR/cmd
ENV CONFIG_NAME $WORKDIR/config/config.yaml
# 将可执行文件复制到目标目录
ADD ./open_im_message_cms $WORKDIR/cmd/main
# 创建用于挂载的几个目录,添加可执行权限
RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \
chmod +x $WORKDIR/cmd/main
VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config","/Open-IM-Server/script"]
WORKDIR $CMDDIR
CMD ./main

@ -9,8 +9,6 @@ service=(
group
auth
admin_cms
message_cms
statistics
office
organization
conversation

@ -1,34 +0,0 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statistics-deployment
spec:
selector:
matchLabels:
app: statistics # 选择这个指定标签执行
replicas: 1 # 运行pod数量
template:
metadata:
labels:
app: statistics # 标签
spec:
containers:
- name: statistics
image: openim/statistics:v2.3.0release
# imagePullPolicy: Always
ports:
- containerPort: 10180
volumeMounts:
- name: config
mountPath: /Open-IM-Server/config
readOnly: true
env:
- name: CONFIG_NAME
value: "/Open-IM-Server"
volumes:
- name: config
configMap:
name: openim-config
strategy: #更新策略
type: RollingUpdate # 滚动更新

@ -1,19 +0,0 @@
FROM ubuntu
# 设置固定的项目路径
ENV WORKDIR /Open-IM-Server
ENV CMDDIR $WORKDIR/cmd
ENV CONFIG_NAME $WORKDIR/config/config.yaml
# 将可执行文件复制到目标目录
ADD ./open_im_statistics $WORKDIR/cmd/main
# 创建用于挂载的几个目录,添加可执行权限
RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \
chmod +x $WORKDIR/cmd/main
VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config","/Open-IM-Server/script"]
WORKDIR $CMDDIR
CMD ./main

@ -13,13 +13,75 @@ scrape_configs:
- job_name: 'openIM-server'
metrics_path: /metrics
static_configs:
- targets: ['localhost:10002']
labels:
group: 'api'
- targets: ['localhost:10006']
labels:
group: 'cms-api'
- targets: ['localhost:21400']
- targets: ['localhost:20110']
labels:
group: 'user'
- targets: ['localhost:20120']
labels:
group: 'friend'
- targets: ['localhost:20130']
labels:
group: 'message'
- targets: ['localhost:20140']
labels:
group: 'msg-gateway'
- targets: ['localhost:20150']
labels:
group: 'group'
- targets: ['localhost:20160']
labels:
group: 'auth'
- targets: ['localhost:20170']
labels:
group: 'push'
- targets: ['localhost:20120']
labels:
group: 'friend'
- targets: ['localhost:20200']
labels:
group: 'admin-cms'
- targets: ['localhost:20120']
labels:
group: 'office'
- targets: ['localhost:20220']
labels:
group: 'organization'
- targets: ['localhost:20230']
labels:
group: 'conversation'
- targets: ['localhost:20240']
labels:
group: 'cache'
- targets: ['localhost:21400', 'localhost:21401', 'localhost:21402', 'localhost:21403']
labels:
group: 'msg-transfer'
- job_name: 'node'
scrape_interval: 8s

@ -107,7 +107,7 @@ func UserToken(c *gin.Context) {
params := api.UserTokenReq{}
if err := c.BindJSON(&params); err != nil {
errMsg := " BindJSON failed " + err.Error()
log.NewError("0", errMsg)
log.NewError(params.OperationID, errMsg)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": errMsg})
return
}

@ -11,6 +11,8 @@ import (
"Open_IM/internal/demo/register"
"Open_IM/pkg/common/config"
promePkg "Open_IM/pkg/common/prometheus"
"github.com/gin-gonic/gin"
)
@ -18,7 +20,7 @@ func NewGinRouter() *gin.Engine {
gin.SetMode(gin.ReleaseMode)
baseRouter := gin.Default()
if config.Config.Prometheus.Enable {
baseRouter.GET("/metrics", prometheusHandler())
baseRouter.GET("/metrics", promePkg.PrometheusHandler())
}
router := baseRouter.Group("/cms")
router.Use(middleware.CorsHandler())

@ -6,8 +6,11 @@ import (
"Open_IM/pkg/statistics"
"fmt"
"github.com/go-playground/validator/v10"
"sync"
promePkg "Open_IM/pkg/common/prometheus"
"github.com/go-playground/validator/v10"
)
var (
@ -34,7 +37,13 @@ func Init(rpcPort, wsPort int) {
rpcSvr.onInit(rpcPort)
}
func Run() {
func Run(promethuesPort int) {
go ws.run()
go rpcSvr.run()
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}

@ -4,13 +4,10 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/statistics"
"fmt"
"net/http"
"strconv"
"sync"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const OnlineTopicBusy = 1
@ -60,10 +57,12 @@ func Run(promethuesPort int) {
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
if config.Config.Prometheus.Enable {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil)
}
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}
func SetOnlineTopicStatus(status int) {
w.Lock()

@ -23,7 +23,8 @@ import (
)
var (
msgInsertMysqlProcessed prometheus.Counter
msgInsertMysqlCounter prometheus.Counter
msgInsertFailedMysqlCounter prometheus.Counter
)
type PersistentConsumerHandler struct {
@ -38,13 +39,21 @@ func (pc *PersistentConsumerHandler) Init() {
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
if config.Config.Prometheus.Enable {
msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "insert_mysql_msg_total",
Help: "The total number of msg insert mysql events",
})
pc.initPrometheus()
}
}
func (pc *PersistentConsumerHandler) initPrometheus() {
msgInsertMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "insert_mysql_msg_total",
Help: "The total number of msg insert mysql events",
})
msgInsertFailedMysqlCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "insert_mysql_failed_msg_total",
Help: "The total number of msg insert mysql events",
})
}
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
@ -76,13 +85,11 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg))
if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil {
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
msgInsertFailedMysqlCounter.Inc()
return
}
msgInsertMysqlProcessed.Inc()
msgInsertMysqlProcessed.Add(1)
if config.Config.Prometheus.Enable {
log.NewDebug(msgFromMQ.OperationID, utils.GetSelfFuncName(), "inc msgInsertMysqlProcessed", msgInsertMysqlProcessed.Desc())
msgInsertMysqlProcessed.Inc()
msgInsertMysqlCounter.Inc()
}
}

@ -14,6 +14,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/statistics"
"fmt"
)
@ -46,7 +47,13 @@ func init() {
}
}
func Run() {
func Run(promethuesPort int) {
go rpcServer.run()
go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh)
go func() {
err := promePkg.StartPromeSrv(promethuesPort)
if err != nil {
panic(err)
}
}()
}

@ -5,13 +5,14 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/push"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
"context"
"google.golang.org/grpc"
"net"
"strconv"
"strings"
"google.golang.org/grpc"
)
type RPCServer struct {

@ -108,9 +108,9 @@ func (s *adminCMSServer) AdminLogin(_ context.Context, req *pbAdminCMS.AdminLogi
}
admin, err := imdb.GetUserByUserID(req.AdminID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID)
log.NewError(req.OperationID, utils.GetSelfFuncName(), "failed", req.AdminID, err.Error())
resp.CommonResp.ErrCode = constant.ErrTokenUnknown.ErrCode
resp.CommonResp.ErrMsg = constant.ErrTokenMalformed.ErrMsg
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
resp.UserName = admin.Nickname

@ -116,7 +116,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
utils.CopyStructFields(&groupInfo, req.GroupInfo)
groupInfo.CreatorUserID = req.OpUserID
groupInfo.GroupID = groupId
groupInfo.CreateTime = time.Now()
if groupInfo.NotificationUpdateTime.Unix() < 0 {
groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0)
}
@ -1373,9 +1373,10 @@ func (s *groupServer) GetGroups(_ context.Context, req *pbGroup.GetGroupsReq) (*
return resp, nil
}
groupInfo.MemberCount = uint32(memberNum)
groupInfo.CreateTime = uint32(groupInfoDB.CreateTime.Unix())
resp.CMSGroups = append(resp.CMSGroups, &pbGroup.CMSGroup{GroupInfo: groupInfo, GroupOwnerUserName: groupMember.Nickname, GroupOwnerUserID: groupMember.UserID})
} else {
groups, err := imdb.GetGroupsByName(req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber)
groups, count, err := imdb.GetGroupsByName(req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsByName error", req.String(), req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber)
}
@ -1392,13 +1393,7 @@ func (s *groupServer) GetGroups(_ context.Context, req *pbGroup.GetGroupsReq) (*
group.GroupOwnerUserName = groupMember.Nickname
resp.CMSGroups = append(resp.CMSGroups, group)
}
resp.GroupNum, err = imdb.GetGroupsCountNum(db.Group{GroupName: req.GroupName})
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsCountNum error", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
resp.GroupNum = int32(count)
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "GetGroups resp", resp.String())
return resp, nil

@ -53,11 +53,16 @@ type GroupWithNum struct {
MemberCount int `gorm:"column:num"`
}
func GetGroupsByName(groupName string, pageNumber, showNumber int32) ([]GroupWithNum, error) {
func GetGroupsByName(groupName string, pageNumber, showNumber int32) ([]GroupWithNum, int64, error) {
var groups []GroupWithNum
err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Select("groups.*, (select count(*) from group_members where group_members.group_id=groups.group_id) as num").
Where(" name like ? and status != ?", fmt.Sprintf("%%%s%%", groupName), constant.GroupStatusDismissed).Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&groups).Error
return groups, err
var count int64
sql := db.DB.MysqlDB.DefaultGormDB().Table("groups").Select("groups.*, (select count(*) from group_members where group_members.group_id=groups.group_id) as num").
Where(" name like ? and status != ?", fmt.Sprintf("%%%s%%", groupName), constant.GroupStatusDismissed)
if err := sql.Count(&count).Error; err != nil {
return nil, 0, err
}
err := sql.Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))).Find(&groups).Error
return groups, count, err
}
func GetGroups(pageNumber, showNumber int) ([]GroupWithNum, error) {

@ -0,0 +1,26 @@
package prometheus
import (
"Open_IM/pkg/common/config"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func StartPromeSrv(promethuesPort int) error {
if config.Config.Prometheus.Enable {
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil)
return err
}
return nil
}
func PrometheusHandler() gin.HandlerFunc {
h := promhttp.Handler()
return func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
}
}

@ -7,13 +7,16 @@ ulimit -n 200000
list1=$(cat $config_path | grep openImMessageGatewayPort | awk -F '[:]' '{print $NF}')
list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}')
list3=$(cat $config_path | grep messageGatewayPrometheusPort | awk -F '[:]' '{print $NF}')
list_to_string $list1
rpc_ports=($ports_array)
list_to_string $list2
ws_ports=($ports_array)
list_to_string $list3
prome_ports=($ports_array)
if [ ${#rpc_ports[@]} -ne ${#ws_ports[@]} ]; then
echo -e ${RED_PREFIX}"ws_ports does not match push_rpc_ports in quantity!!!"${COLOR_SUFFIX}
echo -e ${RED_PREFIX}"ws_ports does not match push_rpc_ports or prome_ports in quantity!!!"${COLOR_SUFFIX}
exit -1
fi
@ -28,7 +31,7 @@ fi
sleep 1
cd ${msg_gateway_binary_root}
for ((i = 0; i < ${#ws_ports[@]}; i++)); do
nohup ./${msg_gateway_name} -rpc_port ${rpc_ports[$i]} -ws_port ${ws_ports[$i]} >>../logs/openIM.log 2>&1 &
nohup ./${msg_gateway_name} -rpc_port ${rpc_ports[$i]} -ws_port ${ws_ports[$i]} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 &
done
#Check launched service process

@ -2,7 +2,11 @@
#Include shell font styles and some basic information
source ./style_info.cfg
source ./path_info.cfg
source ./function.sh
list1=$(cat $config_path | grep messageTransferPrometheusPort | awk -F '[:]' '{print $NF}')
list_to_string $list1
prome_ports=($ports_array)
#Check if the service exists
@ -18,7 +22,7 @@ sleep 1
cd ${msg_transfer_binary_root}
for ((i = 0; i < ${msg_transfer_service_num}; i++)); do
nohup ./${msg_transfer_name} >>../logs/openIM.log 2>&1 &
nohup ./${msg_transfer_name} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 &
done
#Check launched service process

@ -7,8 +7,11 @@ source ./function.sh
list1=$(cat $config_path | grep openImPushPort | awk -F '[:]' '{print $NF}')
list2=$(cat $config_path | grep pushPrometheusPort | awk -F '[:]' '{print $NF}')
list_to_string $list1
rpc_ports=($ports_array)
list_to_string $list2
prome_ports=($ports_array)
#Check if the service exists
#If it is exists,kill this process
@ -22,7 +25,7 @@ sleep 1
cd ${push_binary_root}
for ((i = 0; i < ${#rpc_ports[@]}; i++)); do
nohup ./${push_name} -port ${rpc_ports[$i]} >>../logs/openIM.log 2>&1 &
nohup ./${push_name} -port ${rpc_ports[$i]} -prometheus_port ${prome_ports[$i]} >>../logs/openIM.log 2>&1 &
done
sleep 3

@ -40,6 +40,23 @@ service_port_name=(
openImCachePort
)
service_prometheus_port_name=(
#api port name
openImApiPort
openImCmsApiPort
#rpc port name
userPrometheusPort
friendPrometheusPort
groupPrometheusPort
authPrometheusPort
adminCmsPrometheusPort
messagePrometheusPort
officePrometheusPort
organizationPrometheusPort
conversationPrometheusPort
cachePrometheusPort
)
for ((i = 0; i < ${#service_filename[*]}; i++)); do
#Check whether the service exists
service_name="ps -aux |grep -w ${service_filename[$i]} |grep -v grep"
@ -57,13 +74,24 @@ for ((i = 0; i < ${#service_filename[*]}; i++)); do
#Get the rpc port in the configuration file
portList=$(cat $config_path | grep ${service_port_name[$i]} | awk -F '[:]' '{print $NF}')
list_to_string ${portList}
service_ports=($ports_array)
portList2=$(cat $config_path | grep ${service_prometheus_port_name[$i]} | awk -F '[:]' '{print $NF}')
list_to_string $portList2
prome_ports=($ports_array)
#Start related rpc services based on the number of ports
for j in ${ports_array}; do
# for j in ${service_ports}; do
for ((j = 0; j < ${#service_ports[*]}; j++)); do
#Start the service in the background
# ./${service_filename[$i]} -port $j &
nohup ./${service_filename[$i]} -port $j >>../logs/openIM.log 2>&1 &
cmd="./${service_filename[$i]} -port ${service_ports[$j]} -prometheus_port ${prome_ports[$j]}"
if [ $i -eq 0 -o $i -eq 1 ]; then
cmd="./${service_filename[$i]} -port ${service_ports[$j]}"
fi
echo $cmd
nohup $cmd >>../logs/openIM.log 2>&1 &
sleep 1
pid="netstat -ntlp|grep $j |awk '{printf \$7}'|cut -d/ -f1"
echo -e "${GREEN_PREFIX}${service_filename[$i]} start success,port number:$j pid:$(eval $pid)$COLOR_SUFFIX"
echo -e "${GREEN_PREFIX}${service_filename[$i]} start success,port number:${service_ports[$j]} pid:$(eval $pid)$COLOR_SUFFIX"
done
done

Loading…
Cancel
Save