pull/351/head
wangchuxiao 2 years ago
commit 5657f617e7

@ -2,7 +2,7 @@
# The class cannot be named by Pascal or camel case.
# If it is not used, the corresponding structure will not be set,
# and it will not be read naturally.
serverversion: 2.3.0
serverversion: 2.3.1
#---------------Infrastructure configuration---------------------#
etcd:
etcdSchema: openim #默认即可
@ -102,6 +102,9 @@ cmsapi:
sdk:
openImSdkWsPort: [ 10003 ] #jssdk服务端口默认即可项目中使用jssdk才需开放此端口或做nginx转发
dataDir: [ ../db/sdk/ ]
openImWsAddress: ws://127.0.0.1:10001
openImApiAddress: http://127.0.0.1:10002
#对象存储服务以下配置二选一目前支持两种腾讯云和minio二者配置好其中一种即可如果使用minio参考https://doc.rentsoft.cn/#/qa/minio搭建minio服务器
credential: #腾讯cos发送图片、视频、文件时需要请自行申请后替换必须修改
tencent:

@ -126,7 +126,7 @@ services:
# STORE_PORT: 3306
open_im_server:
image: openim/open_im_server:v2.3.0
image: openim/open_im_server:v2.3.1
container_name: open_im_server
volumes:
- ./logs:/Open-IM-Server/logs

@ -16,10 +16,11 @@ import (
"context"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"math/rand"
"strings"
"time"
"github.com/golang/protobuf/proto"
)
type OnboardingProcessReq struct {
@ -75,6 +76,7 @@ func onboardingProcess(operationID, userID, userName, faceURL, phoneNumber, emai
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
}
log.Debug(operationID, utils.GetSelfFuncName(), "getjoinGroupIDListdepartmentID", groupIDList)
joinGroups(operationID, userID, userName, faceURL, groupIDList)
log.NewInfo(operationID, utils.GetSelfFuncName(), "fineshed")
}

@ -66,6 +66,7 @@ func (r *RPCServer) run() {
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName, 10)
if err != nil {
log.Error("", "register push message rpc to etcd err", "", "err", err.Error(), r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName)
panic(utils.Wrap(err, "register msg_gataway module rpc to etcd err"))
}
r.target = getcdv3.GetTarget(r.etcdSchema, rpcRegisterIP, r.rpcPort, r.rpcRegisterName)
err = srv.Serve(listener)

@ -282,7 +282,7 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) {
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) {
rwLock.Lock()
defer rwLock.Unlock()
log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token)
log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token, "ip: ", conn.RemoteAddr().String())
callbackResp := callbackUserOnline(operationID, uid, platformID, token)
if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp)

@ -55,6 +55,7 @@ func (r *RPCServer) run() {
err = getcdv3.RegisterEtcd(r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName, 10)
if err != nil {
log.Error("", "register push module rpc to etcd err", err.Error(), r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName)
panic(utils.Wrap(err, "register push module rpc to etcd err"))
}
err = srv.Serve(listener)
if err != nil {

@ -71,7 +71,7 @@ func (s *adminCMSServer) Run() {
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error())
return
panic(utils.Wrap(err, "register admin module rpc to etcd err"))
}
err = srv.Serve(listener)
if err != nil {

@ -156,7 +156,8 @@ func (rpc *rpcAuth) Run() {
if err != nil {
log.NewError(operationID, "RegisterEtcd failed ", err.Error(),
rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
return
panic(utils.Wrap(err, "register auth module rpc to etcd err"))
}
log.NewInfo(operationID, "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
err = srv.Serve(listener)

@ -65,7 +65,7 @@ func (s *cacheServer) Run() {
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error())
return
panic(utils.Wrap(err, "register cache module rpc to etcd err"))
}
err = srv.Serve(listener)
if err != nil {

@ -84,6 +84,10 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo
for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) {
conversation.OwnerUserID = v
conversation.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill()
err = rocksCache.DelUserConversationIDListFromCache(v)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
err := imdb.SetOneConversation(conversation)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
@ -194,7 +198,7 @@ func (rpc *rpcConversation) Run() {
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(),
rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
return
panic(utils.Wrap(err, "register conversation module rpc to etcd err"))
}
log.NewInfo("0", "RegisterConversationServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
err = srv.Serve(listener)

@ -75,7 +75,7 @@ func (s *friendServer) Run() {
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName)
return
panic(utils.Wrap(err, "register friend module rpc to etcd err"))
}
err = srv.Serve(listener)
if err != nil {

@ -83,7 +83,8 @@ func (s *groupServer) Run() {
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("", "RegisterEtcd failed ", err.Error())
return
panic(utils.Wrap(err, "register group module rpc to etcd err"))
}
log.Info("", "RegisterEtcd ", s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName)
err = srv.Serve(listener)
@ -483,11 +484,9 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
}
}
go func() {
for _, v := range req.InvitedUserIDList {
chat.SuperGroupNotification(req.OperationID, v, v)
}
}()
for _, v := range req.InvitedUserIDList {
chat.SuperGroupNotification(req.OperationID, v, v)
}
}
log.NewInfo(req.OperationID, "InviteUserToGroup rpc return ", resp)

@ -0,0 +1,182 @@
package messageCMS
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
errors "Open_IM/pkg/common/http"
"context"
"strconv"
"Open_IM/pkg/common/log"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbMessageCMS "Open_IM/pkg/proto/message_cms"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"net"
"strings"
"google.golang.org/grpc"
)
type messageCMSServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
}
func NewMessageCMSServer(port int) *messageCMSServer {
log.NewPrivateLog(constant.LogFileName)
return &messageCMSServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImMessageCMSName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
}
}
func (s *messageCMSServer) Run() {
log.NewInfo("0", "messageCMS rpc start ")
listenIP := ""
if config.Config.ListenIP == "" {
listenIP = "0.0.0.0"
} else {
listenIP = config.Config.ListenIP
}
address := listenIP + ":" + strconv.Itoa(s.rpcPort)
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
defer srv.GracefulStop()
//Service registers with etcd
pbMessageCMS.RegisterMessageCMSServer(srv, s)
rpcRegisterIP := config.Config.RpcRegisterIP
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
}
}
log.NewInfo("", "rpcRegisterIP", rpcRegisterIP)
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error())
panic(utils.Wrap(err, "register message_cms module rpc to etcd err"))
}
err = srv.Serve(listener)
if err != nil {
log.NewError("0", "Serve failed ", err.Error())
return
}
log.NewInfo("0", "message cms rpc success")
}
func (s *messageCMSServer) BoradcastMessage(_ context.Context, req *pbMessageCMS.BoradcastMessageReq) (*pbMessageCMS.BoradcastMessageResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "BoradcastMessage", req.String())
resp := &pbMessageCMS.BoradcastMessageResp{}
return resp, errors.WrapError(constant.ErrDB)
}
func (s *messageCMSServer) GetChatLogs(_ context.Context, req *pbMessageCMS.GetChatLogsReq) (*pbMessageCMS.GetChatLogsResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "GetChatLogs", req.String())
resp := &pbMessageCMS.GetChatLogsResp{}
time, err := utils.TimeStringToTime(req.Date)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "time string parse error", err.Error())
}
chatLog := db.ChatLog{
Content: req.Content,
SendTime: time,
ContentType: req.ContentType,
SessionType: req.SessionType,
}
switch chatLog.SessionType {
case constant.SingleChatType:
chatLog.SendID = req.UserId
case constant.GroupChatType:
chatLog.RecvID = req.GroupId
chatLog.SendID = req.UserId
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "chat_log: ", chatLog)
nums, err := imdb.GetChatLogCount(chatLog)
resp.ChatLogsNum = int32(nums)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLogCount", err.Error())
}
chatLogs, err := imdb.GetChatLog(chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLog", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
for _, chatLog := range chatLogs {
pbChatLog := &pbMessageCMS.ChatLogs{
SessionType: chatLog.SessionType,
ContentType: chatLog.ContentType,
SearchContent: req.Content,
WholeContent: chatLog.Content,
Date: chatLog.CreateTime.String(),
SenderNickName: chatLog.SenderNickname,
SenderId: chatLog.SendID,
}
if chatLog.SenderNickname == "" {
sendUser, err := imdb.GetUserByUserID(chatLog.SendID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserByUserID failed", err.Error())
continue
}
pbChatLog.SenderNickName = sendUser.Nickname
}
switch chatLog.SessionType {
case constant.SingleChatType:
recvUser, err := imdb.GetUserByUserID(chatLog.RecvID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserByUserID failed", err.Error())
continue
}
pbChatLog.ReciverId = recvUser.UserID
pbChatLog.ReciverNickName = recvUser.Nickname
case constant.GroupChatType:
group, err := imdb.GetGroupInfoByGroupID(chatLog.RecvID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupById failed")
continue
}
pbChatLog.GroupId = group.GroupID
pbChatLog.GroupName = group.GroupName
}
resp.ChatLogs = append(resp.ChatLogs, pbChatLog)
}
resp.Pagination = &open_im_sdk.ResponsePagination{
CurrentPage: req.Pagination.PageNumber,
ShowNumber: req.Pagination.ShowNumber,
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp output: ", resp.String())
return resp, nil
}
func (s *messageCMSServer) MassSendMessage(_ context.Context, req *pbMessageCMS.MassSendMessageReq) (*pbMessageCMS.MassSendMessageResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "MassSendMessage", req.String())
resp := &pbMessageCMS.MassSendMessageResp{}
return resp, nil
}
func (s *messageCMSServer) WithdrawMessage(_ context.Context, req *pbMessageCMS.WithdrawMessageReq) (*pbMessageCMS.WithdrawMessageResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "WithdrawMessage", req.String())
resp := &pbMessageCMS.WithdrawMessageResp{}
return resp, nil
}

@ -29,6 +29,7 @@ func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq
SenderFaceURL: msg.MsgData.SenderFaceURL,
Content: callback.GetContent(msg.MsgData),
Seq: msg.MsgData.Seq,
Ex: msg.MsgData.Ex,
}
return req
}
@ -137,11 +138,11 @@ func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp
}
func callbackWordFilter(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text {
return callbackResp
}
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
commonCallbackReq := copyCallbackCommonReqStruct(msg)
commonCallbackReq.CallbackCommand = constant.CallbackWordFilterCommand
req := cbApi.CallbackWordFilterReq{

@ -1,12 +1,14 @@
package msg
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
utils2 "Open_IM/pkg/common/utils"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
)
@ -29,7 +31,7 @@ func OrganizationNotificationToAll(opUserID string, operationID string) {
for _, v := range userIDList {
log.Debug(operationID, "OrganizationNotification", opUserID, v, constant.OrganizationChangedNotification, &tips, operationID)
OrganizationNotification(opUserID, v, constant.OrganizationChangedNotification, &tips, operationID)
OrganizationNotification(config.Config.Manager.AppManagerUid[0], v, constant.OrganizationChangedNotification, &tips, operationID)
}
}

@ -75,7 +75,7 @@ func (rpc *rpcChat) Run() {
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName, 10)
if err != nil {
log.Error("", "register rpcChat to etcd failed ", err.Error())
return
panic(utils.Wrap(err, "register chat module rpc to etcd err"))
}
go rpc.runCh()
err = srv.Serve(listener)

@ -226,6 +226,7 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
replay := pbChat.SendMsgResp{}
newTime := db.GetCurrentTimestampByMill()
t1 := time.Now()
log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String())
flag, errCode, errMsg := isMessageHasReadEnabled(pb)
log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag)
@ -233,15 +234,18 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
flag, errCode, errMsg, _ = messageVerification(pb)
log.Info(pb.OperationID, "userRelationshipVerification ", flag)
log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1))
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
t1 = time.Now()
rpc.encapsulateMsgData(pb.MsgData)
log.Info(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1))
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
// callback
t1 = time.Now()
callbackResp := callbackWordFilter(pb)
log.Info(pb.OperationID, "callbackWordFilter ", callbackResp)
log.Info(pb.OperationID, "callbackWordFilter ", callbackResp, "cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 {
log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
}
@ -256,7 +260,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
switch pb.MsgData.SessionType {
case constant.SingleChatType:
// callback
t1 = time.Now()
callbackResp := callbackBeforeSendSingleMsg(pb)
log.Info(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp)
}
@ -267,28 +273,37 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
t1 = time.Now()
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
log.Info(pb.OperationID, "modifyMessageByUserMessageReceiveOpt ", " cost time: ", time.Since(t1))
if isSend {
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
t1 = time.Now()
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1))
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
t1 = time.Now()
err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1))
if err2 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
// callback
t1 = time.Now()
callbackResp = callbackAfterSendSingleMsg(pb)
log.Info(pb.OperationID, "callbackAfterSendSingleMsg ", " cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp)
}
log.Debug(pb.OperationID, "send msg cost time all: ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.GroupChatType:
// callback

@ -82,7 +82,7 @@ func (s *officeServer) Run() {
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error())
return
panic(utils.Wrap(err, "register office module rpc to etcd err"))
}
go s.sendTagMsgRoutine()
err = srv.Serve(listener)

@ -73,7 +73,7 @@ func (s *organizationServer) Run() {
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("", "RegisterEtcd failed ", err.Error())
return
panic(utils.Wrap(err, "register organization module rpc to etcd err"))
}
log.NewInfo("", "organization rpc RegisterEtcd success", rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
err = srv.Serve(listener)

@ -0,0 +1,399 @@
package statistics
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"context"
"strconv"
"sync"
"time"
//"Open_IM/pkg/common/constant"
//"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
//cp "Open_IM/pkg/common/utils"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbStatistics "Open_IM/pkg/proto/statistics"
//open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
//"context"
errors "Open_IM/pkg/common/http"
"net"
"strings"
"google.golang.org/grpc"
)
type statisticsServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
}
func NewStatisticsServer(port int) *statisticsServer {
log.NewPrivateLog(constant.LogFileName)
return &statisticsServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImStatisticsName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
}
}
func (s *statisticsServer) Run() {
log.NewInfo("0", "Statistics rpc start ")
listenIP := ""
if config.Config.ListenIP == "" {
listenIP = "0.0.0.0"
} else {
listenIP = config.Config.ListenIP
}
address := listenIP + ":" + strconv.Itoa(s.rpcPort)
//listener network
listener, err := net.Listen("tcp", address)
if err != nil {
panic("listening err:" + err.Error() + s.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
defer srv.GracefulStop()
//Service registers with etcd
pbStatistics.RegisterUserServer(srv, s)
rpcRegisterIP := config.Config.RpcRegisterIP
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
}
}
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error())
panic(utils.Wrap(err, "register statistics module rpc to etcd err"))
}
err = srv.Serve(listener)
if err != nil {
log.NewError("0", "Serve failed ", err.Error())
return
}
log.NewInfo("0", "statistics rpc success")
}
func (s *statisticsServer) GetActiveGroup(_ context.Context, req *pbStatistics.GetActiveGroupReq) (*pbStatistics.GetActiveGroupResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req", req.String())
resp := &pbStatistics.GetActiveGroupResp{}
fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error())
return resp, errors.WrapError(constant.ErrArgs)
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "time: ", fromTime, toTime)
activeGroups, err := imdb.GetActiveGroups(fromTime, toTime, 12)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetActiveGroups failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
for _, activeGroup := range activeGroups {
resp.Groups = append(resp.Groups,
&pbStatistics.GroupResp{
GroupName: activeGroup.Name,
GroupId: activeGroup.Id,
MessageNum: int32(activeGroup.MessageNum),
})
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String())
return resp, nil
}
func (s *statisticsServer) GetActiveUser(_ context.Context, req *pbStatistics.GetActiveUserReq) (*pbStatistics.GetActiveUserResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String())
resp := &pbStatistics.GetActiveUserResp{}
fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "time: ", fromTime, toTime)
activeUsers, err := imdb.GetActiveUsers(fromTime, toTime, 12)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetActiveUsers failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
for _, activeUser := range activeUsers {
resp.Users = append(resp.Users,
&pbStatistics.UserResp{
UserId: activeUser.Id,
NickName: activeUser.Name,
MessageNum: int32(activeUser.MessageNum),
},
)
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String())
return resp, nil
}
func ParseTimeFromTo(from, to string) (time.Time, time.Time, error) {
var fromTime time.Time
var toTime time.Time
fromTime, err := utils.TimeStringToTime(from)
if err != nil {
return fromTime, toTime, err
}
toTime, err = utils.TimeStringToTime(to)
if err != nil {
return fromTime, toTime, err
}
return fromTime, toTime, nil
}
func isInOneMonth(from, to time.Time) bool {
return from.Month() == to.Month() && from.Year() == to.Year()
}
func GetRangeDate(from, to time.Time) [][2]time.Time {
interval := to.Sub(from)
var times [][2]time.Time
switch {
// today
case interval == 0:
times = append(times, [2]time.Time{
from, from.Add(time.Hour * 24),
})
// days
case isInOneMonth(from, to):
for i := 0; ; i++ {
fromTime := from.Add(time.Hour * 24 * time.Duration(i))
toTime := from.Add(time.Hour * 24 * time.Duration(i+1))
if toTime.After(to.Add(time.Hour * 24)) {
break
}
times = append(times, [2]time.Time{
fromTime, toTime,
})
}
// month
case !isInOneMonth(from, to):
if to.Sub(from) < time.Hour*24*30 {
for i := 0; ; i++ {
fromTime := from.Add(time.Hour * 24 * time.Duration(i))
toTime := from.Add(time.Hour * 24 * time.Duration(i+1))
if toTime.After(to.Add(time.Hour * 24)) {
break
}
times = append(times, [2]time.Time{
fromTime, toTime,
})
}
} else {
for i := 0; ; i++ {
if i == 0 {
fromTime := from
toTime := getFirstDateOfNextNMonth(fromTime, 1)
times = append(times, [2]time.Time{
fromTime, toTime,
})
} else {
fromTime := getFirstDateOfNextNMonth(from, i)
toTime := getFirstDateOfNextNMonth(fromTime, 1)
if toTime.After(to) {
toTime = to
times = append(times, [2]time.Time{
fromTime, toTime,
})
break
}
times = append(times, [2]time.Time{
fromTime, toTime,
})
}
}
}
}
return times
}
func getFirstDateOfNextNMonth(currentTime time.Time, n int) time.Time {
lastOfMonth := time.Date(currentTime.Year(), currentTime.Month(), 1, 0, 0, 0, 0, currentTime.Location()).AddDate(0, n, 0)
return lastOfMonth
}
func (s *statisticsServer) GetGroupStatistics(_ context.Context, req *pbStatistics.GetGroupStatisticsReq) (*pbStatistics.GetGroupStatisticsResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String())
resp := &pbStatistics.GetGroupStatisticsResp{}
fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupStatistics failed", err.Error())
return resp, errors.WrapError(constant.ErrArgs)
}
increaseGroupNum, err := imdb.GetIncreaseGroupNum(fromTime, toTime.Add(time.Hour*24))
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum failed", err.Error(), fromTime, toTime)
return resp, errors.WrapError(constant.ErrDB)
}
totalGroupNum, err := imdb.GetTotalGroupNum()
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
resp.IncreaseGroupNum = increaseGroupNum
resp.TotalGroupNum = totalGroupNum
times := GetRangeDate(fromTime, toTime)
log.NewDebug(req.OperationID, "times:", times)
wg := &sync.WaitGroup{}
resp.IncreaseGroupNumList = make([]*pbStatistics.DateNumList, len(times), len(times))
resp.TotalGroupNumList = make([]*pbStatistics.DateNumList, len(times), len(times))
wg.Add(len(times))
for i, v := range times {
go func(wg *sync.WaitGroup, index int, v [2]time.Time) {
defer wg.Done()
num, err := imdb.GetIncreaseGroupNum(v[0], v[1])
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error())
}
resp.IncreaseGroupNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Num: num,
}
num, err = imdb.GetGroupNum(v[1])
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error())
}
resp.TotalGroupNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Num: num,
}
}(wg, i, v)
}
wg.Wait()
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp)
return resp, nil
}
func (s *statisticsServer) GetMessageStatistics(_ context.Context, req *pbStatistics.GetMessageStatisticsReq) (*pbStatistics.GetMessageStatisticsResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String())
resp := &pbStatistics.GetMessageStatisticsResp{}
fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To)
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "times: ", fromTime, toTime)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error())
return resp, errors.WrapError(constant.ErrArgs)
}
privateMessageNum, err := imdb.GetPrivateMessageNum(fromTime, toTime.Add(time.Hour*24))
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetPrivateMessageNum failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
groupMessageNum, err := imdb.GetGroupMessageNum(fromTime, toTime.Add(time.Hour*24))
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupMessageNum failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), privateMessageNum, groupMessageNum)
resp.PrivateMessageNum = privateMessageNum
resp.GroupMessageNum = groupMessageNum
times := GetRangeDate(fromTime, toTime)
resp.GroupMessageNumList = make([]*pbStatistics.DateNumList, len(times), len(times))
resp.PrivateMessageNumList = make([]*pbStatistics.DateNumList, len(times), len(times))
wg := &sync.WaitGroup{}
wg.Add(len(times))
for i, v := range times {
go func(wg *sync.WaitGroup, index int, v [2]time.Time) {
defer wg.Done()
num, err := imdb.GetPrivateMessageNum(v[0], v[1])
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error())
}
resp.PrivateMessageNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Num: num,
}
num, err = imdb.GetGroupMessageNum(v[0], v[1])
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error())
}
resp.GroupMessageNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Num: num,
}
}(wg, i, v)
}
wg.Wait()
return resp, nil
}
func (s *statisticsServer) GetUserStatistics(_ context.Context, req *pbStatistics.GetUserStatisticsReq) (*pbStatistics.GetUserStatisticsResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbStatistics.GetUserStatisticsResp{}
fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error())
return resp, errors.WrapError(constant.ErrArgs)
}
activeUserNum, err := imdb.GetActiveUserNum(fromTime, toTime.Add(time.Hour*24))
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetActiveUserNum failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
increaseUserNum, err := imdb.GetIncreaseUserNum(fromTime, toTime.Add(time.Hour*24))
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseUserNum failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
totalUserNum, err := imdb.GetTotalUserNum()
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetTotalUserNum failed", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
resp.ActiveUserNum = activeUserNum
resp.TotalUserNum = totalUserNum
resp.IncreaseUserNum = increaseUserNum
times := GetRangeDate(fromTime, toTime)
resp.TotalUserNumList = make([]*pbStatistics.DateNumList, len(times), len(times))
resp.ActiveUserNumList = make([]*pbStatistics.DateNumList, len(times), len(times))
resp.IncreaseUserNumList = make([]*pbStatistics.DateNumList, len(times), len(times))
wg := &sync.WaitGroup{}
wg.Add(len(times))
for i, v := range times {
go func(wg *sync.WaitGroup, index int, v [2]time.Time) {
defer wg.Done()
num, err := imdb.GetActiveUserNum(v[0], v[1])
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error())
}
resp.ActiveUserNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Num: num,
}
num, err = imdb.GetTotalUserNumByDate(v[1])
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetTotalUserNumByDate", v, err.Error())
}
resp.TotalUserNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Num: num,
}
num, err = imdb.GetIncreaseUserNum(v[0], v[1])
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseUserNum", v, err.Error())
}
resp.IncreaseUserNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Num: num,
}
}(wg, i, v)
}
wg.Wait()
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp)
return resp, nil
}

@ -76,7 +76,7 @@ func (s *userServer) Run() {
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10)
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName)
return
panic(utils.Wrap(err, "register user module rpc to etcd err"))
}
err = srv.Serve(listener)
if err != nil {
@ -316,13 +316,18 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp
conversation.ConversationType = constant.GroupChatType
}
}
err := imdb.SetRecvMsgOpt(conversation)
isUpdate, err := imdb.SetRecvMsgOpt(conversation)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
if err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID); err != nil {
if isUpdate {
err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID)
} else {
err = rocksCache.DelUserConversationIDListFromCache(conversation.OwnerUserID)
}
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), conversation.ConversationID, err.Error())
}
chat.ConversationChangeNotification(req.OperationID, req.OwnerUserID)

@ -45,7 +45,7 @@ type Conversation struct {
IsPrivateChat bool `json:"isPrivateChat"`
GroupAtType int32 `json:"groupAtType"`
IsNotInGroup bool `json:"isNotInGroup"`
UpdateUnreadCountTime int64 ` json:"updateUnreadCountTime"`
UpdateUnreadCountTime int64 `json:"updateUnreadCountTime"`
AttachedInfo string `json:"attachedInfo"`
Ex string `json:"ex"`
}

@ -17,6 +17,7 @@ type CommonCallbackReq struct {
Seq uint32 `json:"seq"`
AtUserIDList []string `json:"atUserList"`
SenderFaceURL string `json:"faceURL"`
Ex string `json:"ex"`
}
type CommonCallbackResp struct {

@ -41,16 +41,18 @@ func PeerUserSetConversation(conversation db.Conversation) error {
}
func SetRecvMsgOpt(conversation db.Conversation) error {
func SetRecvMsgOpt(conversation db.Conversation) (bool, error) {
var isUpdate bool
newConversation := conversation
if db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Find(&newConversation).RowsAffected == 0 {
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "not exist in db, create")
return db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error
return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error
// if exist, then update record
} else {
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "exist in db, update")
//force update
return db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).
isUpdate = true
return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).
Updates(map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt}).Error
}
}

@ -418,9 +418,12 @@ func GetJoinedSuperGroupListFromCache(userID string) ([]string, error) {
return string(bytes), nil
}
joinedSuperGroupListStr, err := db.DB.Rc.Fetch(joinedSuperGroupListCache+userID, time.Second*30*60, getJoinedSuperGroupIDList)
if err != nil {
return nil, err
}
var joinedSuperGroupList []string
err = json.Unmarshal([]byte(joinedSuperGroupListStr), &joinedSuperGroupList)
return joinedSuperGroupList, err
return joinedSuperGroupList, utils.Wrap(err, "")
}
func DelJoinedSuperGroupIDListFromCache(userID string) error {
@ -493,7 +496,7 @@ func GetUserConversationIDListFromCache(userID string) ([]string, error) {
}
func DelUserConversationIDListFromCache(userID string) error {
return db.DB.Rc.TagAsDeleted(conversationIDListCache + userID)
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationIDListCache+userID), "DelUserConversationIDListFromCache err")
}
func GetConversationFromCache(ownerUserID, conversationID string) (*db.Conversation, error) {
@ -550,5 +553,5 @@ func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) {
}
func DelConversationFromCache(ownerUserID, conversationID string) error {
return db.DB.Rc.TagAsDeleted(conversationCache + ownerUserID + ":" + conversationID)
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err")
}

@ -4,17 +4,13 @@ source ./style_info.cfg
source ./path_info.cfg
source ./function.sh
ulimit -n 200000
list1=$(cat $config_path | grep openImApiPort | awk -F '[:]' '{print $NF}')
list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}')
ws_address=$(cat $config_path | grep openImWsAddress | awk -F '[ ]' '{print $NF}')
api_address=$(cat $config_path | grep openImApiAddress | awk -F '[ ]' '{print $NF}')
list3=$(cat $config_path | grep openImSdkWsPort | awk -F '[:]' '{print $NF}')
logLevel=$(cat $config_path | grep remainLogLevel | awk -F '[:]' '{print $NF}')
list_to_string $list1
api_ports=($ports_array)
list_to_string $list2
ws_ports=($ports_array)
list_to_string $list3
sdk_ws_ports=($ports_array)
list_to_string $list4
@ -28,7 +24,7 @@ fi
#Waiting port recycling
sleep 1
cd ${sdk_server_binary_root}
nohup ./${sdk_server_name} -openIM_api_port ${api_ports[0]} -openIM_ws_port ${ws_ports[0]} -sdk_ws_port ${sdk_ws_ports[0]} -openIM_log_level ${logLevel} >>../logs/openIM.log 2>&1 &
nohup ./${sdk_server_name} -openIM_ws_address ${ws_address} -sdk_ws_port ${sdk_ws_ports[0]} -openIM_api_address ${api_address} -openIM_log_level ${logLevel} >>../logs/openIM.log 2>&1 &
#Check launched service process
sleep 3

Loading…
Cancel
Save