diff --git a/config/config.yaml b/config/config.yaml index 5524a1219..5fbfe7492 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -2,7 +2,7 @@ # The class cannot be named by Pascal or camel case. # If it is not used, the corresponding structure will not be set, # and it will not be read naturally. -serverversion: 2.3.0 +serverversion: 2.3.1 #---------------Infrastructure configuration---------------------# etcd: etcdSchema: openim #默认即可 @@ -102,6 +102,9 @@ cmsapi: sdk: openImSdkWsPort: [ 10003 ] #jssdk服务端口,默认即可,项目中使用jssdk才需开放此端口或做nginx转发 dataDir: [ ../db/sdk/ ] + openImWsAddress: ws://127.0.0.1:10001 + openImApiAddress: http://127.0.0.1:10002 + #对象存储服务,以下配置二选一,目前支持两种,腾讯云和minio,二者配置好其中一种即可(如果使用minio参考https://doc.rentsoft.cn/#/qa/minio搭建minio服务器) credential: #腾讯cos,发送图片、视频、文件时需要,请自行申请后替换,必须修改 tencent: diff --git a/docker-compose.yaml b/docker-compose.yaml index aeac41995..115c49f9e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -126,7 +126,7 @@ services: # STORE_PORT: 3306 open_im_server: - image: openim/open_im_server:v2.3.0 + image: openim/open_im_server:v2.3.1 container_name: open_im_server volumes: - ./logs:/Open-IM-Server/logs diff --git a/internal/demo/register/onboarding_process.go b/internal/demo/register/onboarding_process.go index bac5a244c..0978b62b1 100644 --- a/internal/demo/register/onboarding_process.go +++ b/internal/demo/register/onboarding_process.go @@ -16,10 +16,11 @@ import ( "context" "errors" "fmt" - "github.com/golang/protobuf/proto" "math/rand" "strings" "time" + + "github.com/golang/protobuf/proto" ) type OnboardingProcessReq struct { @@ -75,6 +76,7 @@ func onboardingProcess(operationID, userID, userName, faceURL, phoneNumber, emai if err != nil { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } + log.Debug(operationID, utils.GetSelfFuncName(), "getjoinGroupIDListdepartmentID", groupIDList) joinGroups(operationID, userID, userName, faceURL, groupIDList) log.NewInfo(operationID, utils.GetSelfFuncName(), "fineshed") } diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index b00129d64..406cb991e 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -66,6 +66,7 @@ func (r *RPCServer) run() { err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName, 10) if err != nil { log.Error("", "register push message rpc to etcd err", "", "err", err.Error(), r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName) + panic(utils.Wrap(err, "register msg_gataway module rpc to etcd err")) } r.target = getcdv3.GetTarget(r.etcdSchema, rpcRegisterIP, r.rpcPort, r.rpcRegisterName) err = srv.Serve(listener) diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 2ed590bce..29ec4f8ea 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -282,7 +282,7 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) { func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) { rwLock.Lock() defer rwLock.Unlock() - log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token) + log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token, "ip: ", conn.RemoteAddr().String()) callbackResp := callbackUserOnline(operationID, uid, platformID, token) if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp) diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 08939ef1a..8be927433 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -55,6 +55,7 @@ func (r *RPCServer) run() { err = getcdv3.RegisterEtcd(r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName, 10) if err != nil { log.Error("", "register push module rpc to etcd err", err.Error(), r.etcdSchema, strings.Join(r.etcdAddr, ","), rpcRegisterIP, r.rpcPort, r.rpcRegisterName) + panic(utils.Wrap(err, "register push module rpc to etcd err")) } err = srv.Serve(listener) if err != nil { diff --git a/internal/rpc/admin_cms/admin_cms.go b/internal/rpc/admin_cms/admin_cms.go index 554259595..2832c54ea 100644 --- a/internal/rpc/admin_cms/admin_cms.go +++ b/internal/rpc/admin_cms/admin_cms.go @@ -71,7 +71,7 @@ func (s *adminCMSServer) Run() { err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) if err != nil { log.NewError("0", "RegisterEtcd failed ", err.Error()) - return + panic(utils.Wrap(err, "register admin module rpc to etcd err")) } err = srv.Serve(listener) if err != nil { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index a460a9033..ccab54c65 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -156,7 +156,8 @@ func (rpc *rpcAuth) Run() { if err != nil { log.NewError(operationID, "RegisterEtcd failed ", err.Error(), rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) - return + panic(utils.Wrap(err, "register auth module rpc to etcd err")) + } log.NewInfo(operationID, "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) err = srv.Serve(listener) diff --git a/internal/rpc/cache/cache.go b/internal/rpc/cache/cache.go index 60cc52d32..78bf8c124 100644 --- a/internal/rpc/cache/cache.go +++ b/internal/rpc/cache/cache.go @@ -65,7 +65,7 @@ func (s *cacheServer) Run() { err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) if err != nil { log.NewError("0", "RegisterEtcd failed ", err.Error()) - return + panic(utils.Wrap(err, "register cache module rpc to etcd err")) } err = srv.Serve(listener) if err != nil { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 023169a6b..87e201aa1 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -84,6 +84,10 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) { conversation.OwnerUserID = v conversation.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill() + err = rocksCache.DelUserConversationIDListFromCache(v) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error()) + } err := imdb.SetOneConversation(conversation) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) @@ -194,7 +198,7 @@ func (rpc *rpcConversation) Run() { if err != nil { log.NewError("0", "RegisterEtcd failed ", err.Error(), rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) - return + panic(utils.Wrap(err, "register conversation module rpc to etcd err")) } log.NewInfo("0", "RegisterConversationServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) err = srv.Serve(listener) diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 6a699d278..29ec99eab 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -75,7 +75,7 @@ func (s *friendServer) Run() { err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) if err != nil { log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName) - return + panic(utils.Wrap(err, "register friend module rpc to etcd err")) } err = srv.Serve(listener) if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index beff56072..7a821c34e 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -83,7 +83,8 @@ func (s *groupServer) Run() { err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) if err != nil { log.NewError("", "RegisterEtcd failed ", err.Error()) - return + panic(utils.Wrap(err, "register group module rpc to etcd err")) + } log.Info("", "RegisterEtcd ", s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName) err = srv.Serve(listener) @@ -483,11 +484,9 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) } } - go func() { - for _, v := range req.InvitedUserIDList { - chat.SuperGroupNotification(req.OperationID, v, v) - } - }() + for _, v := range req.InvitedUserIDList { + chat.SuperGroupNotification(req.OperationID, v, v) + } } log.NewInfo(req.OperationID, "InviteUserToGroup rpc return ", resp) diff --git a/internal/rpc/message_cms/message_cms.go b/internal/rpc/message_cms/message_cms.go new file mode 100644 index 000000000..58a7e7d4e --- /dev/null +++ b/internal/rpc/message_cms/message_cms.go @@ -0,0 +1,182 @@ +package messageCMS + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" + errors "Open_IM/pkg/common/http" + "context" + "strconv" + + "Open_IM/pkg/common/log" + + imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbMessageCMS "Open_IM/pkg/proto/message_cms" + open_im_sdk "Open_IM/pkg/proto/sdk_ws" + + "Open_IM/pkg/utils" + + "net" + "strings" + + "google.golang.org/grpc" +) + +type messageCMSServer struct { + rpcPort int + rpcRegisterName string + etcdSchema string + etcdAddr []string +} + +func NewMessageCMSServer(port int) *messageCMSServer { + log.NewPrivateLog(constant.LogFileName) + return &messageCMSServer{ + rpcPort: port, + rpcRegisterName: config.Config.RpcRegisterName.OpenImMessageCMSName, + etcdSchema: config.Config.Etcd.EtcdSchema, + etcdAddr: config.Config.Etcd.EtcdAddr, + } +} + +func (s *messageCMSServer) Run() { + log.NewInfo("0", "messageCMS rpc start ") + + listenIP := "" + if config.Config.ListenIP == "" { + listenIP = "0.0.0.0" + } else { + listenIP = config.Config.ListenIP + } + address := listenIP + ":" + strconv.Itoa(s.rpcPort) + + //listener network + listener, err := net.Listen("tcp", address) + if err != nil { + panic("listening err:" + err.Error() + s.rpcRegisterName) + } + log.NewInfo("0", "listen network success, ", address, listener) + defer listener.Close() + //grpc server + srv := grpc.NewServer() + defer srv.GracefulStop() + //Service registers with etcd + pbMessageCMS.RegisterMessageCMSServer(srv, s) + rpcRegisterIP := config.Config.RpcRegisterIP + if config.Config.RpcRegisterIP == "" { + rpcRegisterIP, err = utils.GetLocalIP() + if err != nil { + log.Error("", "GetLocalIP failed ", err.Error()) + } + } + log.NewInfo("", "rpcRegisterIP", rpcRegisterIP) + err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) + if err != nil { + log.NewError("0", "RegisterEtcd failed ", err.Error()) + panic(utils.Wrap(err, "register message_cms module rpc to etcd err")) + } + err = srv.Serve(listener) + if err != nil { + log.NewError("0", "Serve failed ", err.Error()) + return + } + log.NewInfo("0", "message cms rpc success") +} + +func (s *messageCMSServer) BoradcastMessage(_ context.Context, req *pbMessageCMS.BoradcastMessageReq) (*pbMessageCMS.BoradcastMessageResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "BoradcastMessage", req.String()) + resp := &pbMessageCMS.BoradcastMessageResp{} + return resp, errors.WrapError(constant.ErrDB) +} + +func (s *messageCMSServer) GetChatLogs(_ context.Context, req *pbMessageCMS.GetChatLogsReq) (*pbMessageCMS.GetChatLogsResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "GetChatLogs", req.String()) + resp := &pbMessageCMS.GetChatLogsResp{} + time, err := utils.TimeStringToTime(req.Date) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "time string parse error", err.Error()) + } + chatLog := db.ChatLog{ + Content: req.Content, + SendTime: time, + ContentType: req.ContentType, + SessionType: req.SessionType, + } + switch chatLog.SessionType { + case constant.SingleChatType: + chatLog.SendID = req.UserId + case constant.GroupChatType: + chatLog.RecvID = req.GroupId + chatLog.SendID = req.UserId + } + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "chat_log: ", chatLog) + nums, err := imdb.GetChatLogCount(chatLog) + resp.ChatLogsNum = int32(nums) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLogCount", err.Error()) + } + chatLogs, err := imdb.GetChatLog(chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLog", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + for _, chatLog := range chatLogs { + pbChatLog := &pbMessageCMS.ChatLogs{ + SessionType: chatLog.SessionType, + ContentType: chatLog.ContentType, + SearchContent: req.Content, + WholeContent: chatLog.Content, + Date: chatLog.CreateTime.String(), + SenderNickName: chatLog.SenderNickname, + SenderId: chatLog.SendID, + } + if chatLog.SenderNickname == "" { + sendUser, err := imdb.GetUserByUserID(chatLog.SendID) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserByUserID failed", err.Error()) + continue + } + pbChatLog.SenderNickName = sendUser.Nickname + } + switch chatLog.SessionType { + case constant.SingleChatType: + recvUser, err := imdb.GetUserByUserID(chatLog.RecvID) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserByUserID failed", err.Error()) + continue + } + pbChatLog.ReciverId = recvUser.UserID + pbChatLog.ReciverNickName = recvUser.Nickname + + case constant.GroupChatType: + group, err := imdb.GetGroupInfoByGroupID(chatLog.RecvID) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupById failed") + continue + } + pbChatLog.GroupId = group.GroupID + pbChatLog.GroupName = group.GroupName + } + resp.ChatLogs = append(resp.ChatLogs, pbChatLog) + } + resp.Pagination = &open_im_sdk.ResponsePagination{ + CurrentPage: req.Pagination.PageNumber, + ShowNumber: req.Pagination.ShowNumber, + } + + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp output: ", resp.String()) + return resp, nil +} + +func (s *messageCMSServer) MassSendMessage(_ context.Context, req *pbMessageCMS.MassSendMessageReq) (*pbMessageCMS.MassSendMessageResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "MassSendMessage", req.String()) + resp := &pbMessageCMS.MassSendMessageResp{} + return resp, nil +} + +func (s *messageCMSServer) WithdrawMessage(_ context.Context, req *pbMessageCMS.WithdrawMessageReq) (*pbMessageCMS.WithdrawMessageResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "WithdrawMessage", req.String()) + resp := &pbMessageCMS.WithdrawMessageResp{} + return resp, nil +} diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index e00ee001a..e60a1ff3c 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -29,6 +29,7 @@ func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq SenderFaceURL: msg.MsgData.SenderFaceURL, Content: callback.GetContent(msg.MsgData), Seq: msg.MsgData.Seq, + Ex: msg.MsgData.Ex, } return req } @@ -137,11 +138,11 @@ func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp } func callbackWordFilter(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text { return callbackResp } - log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) commonCallbackReq := copyCallbackCommonReqStruct(msg) commonCallbackReq.CallbackCommand = constant.CallbackWordFilterCommand req := cbApi.CallbackWordFilterReq{ diff --git a/internal/rpc/msg/organization_notification.go b/internal/rpc/msg/organization_notification.go index d9dd6fd52..bcdeb13e8 100644 --- a/internal/rpc/msg/organization_notification.go +++ b/internal/rpc/msg/organization_notification.go @@ -1,12 +1,14 @@ package msg import ( + "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/log" utils2 "Open_IM/pkg/common/utils" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" + "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" ) @@ -29,7 +31,7 @@ func OrganizationNotificationToAll(opUserID string, operationID string) { for _, v := range userIDList { log.Debug(operationID, "OrganizationNotification", opUserID, v, constant.OrganizationChangedNotification, &tips, operationID) - OrganizationNotification(opUserID, v, constant.OrganizationChangedNotification, &tips, operationID) + OrganizationNotification(config.Config.Manager.AppManagerUid[0], v, constant.OrganizationChangedNotification, &tips, operationID) } } diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 95f0ceaa5..fde8536be 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -75,7 +75,7 @@ func (rpc *rpcChat) Run() { err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName, 10) if err != nil { log.Error("", "register rpcChat to etcd failed ", err.Error()) - return + panic(utils.Wrap(err, "register chat module rpc to etcd err")) } go rpc.runCh() err = srv.Serve(listener) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 228fc6eea..ab39a3662 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -226,6 +226,7 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} newTime := db.GetCurrentTimestampByMill() + t1 := time.Now() log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String()) flag, errCode, errMsg := isMessageHasReadEnabled(pb) log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag) @@ -233,15 +234,18 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, errCode, errMsg, "", 0) } flag, errCode, errMsg, _ = messageVerification(pb) - log.Info(pb.OperationID, "userRelationshipVerification ", flag) + log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } + t1 = time.Now() rpc.encapsulateMsgData(pb.MsgData) + log.Info(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1)) msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} // callback + t1 = time.Now() callbackResp := callbackWordFilter(pb) - log.Info(pb.OperationID, "callbackWordFilter ", callbackResp) + log.Info(pb.OperationID, "callbackWordFilter ", callbackResp, "cost time: ", time.Since(t1)) if callbackResp.ErrCode != 0 { log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) } @@ -256,7 +260,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S switch pb.MsgData.SessionType { case constant.SingleChatType: // callback + t1 = time.Now() callbackResp := callbackBeforeSendSingleMsg(pb) + log.Info(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1)) if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp) } @@ -267,28 +273,37 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } + t1 = time.Now() isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) + log.Info(pb.OperationID, "modifyMessageByUserMessageReceiveOpt ", " cost time: ", time.Since(t1)) if isSend { msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) + t1 = time.Now() err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) + log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself + t1 = time.Now() err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) + log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } // callback + t1 = time.Now() callbackResp = callbackAfterSendSingleMsg(pb) + log.Info(pb.OperationID, "callbackAfterSendSingleMsg ", " cost time: ", time.Since(t1)) if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp) } + log.Debug(pb.OperationID, "send msg cost time all: ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.GroupChatType: // callback diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 828d8b58c..741938ca3 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -82,7 +82,7 @@ func (s *officeServer) Run() { err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) if err != nil { log.NewError("0", "RegisterEtcd failed ", err.Error()) - return + panic(utils.Wrap(err, "register office module rpc to etcd err")) } go s.sendTagMsgRoutine() err = srv.Serve(listener) diff --git a/internal/rpc/organization/organization.go b/internal/rpc/organization/organization.go index 95f3233ef..89cd15321 100644 --- a/internal/rpc/organization/organization.go +++ b/internal/rpc/organization/organization.go @@ -73,7 +73,7 @@ func (s *organizationServer) Run() { err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) if err != nil { log.NewError("", "RegisterEtcd failed ", err.Error()) - return + panic(utils.Wrap(err, "register organization module rpc to etcd err")) } log.NewInfo("", "organization rpc RegisterEtcd success", rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) err = srv.Serve(listener) diff --git a/internal/rpc/statistics/statistics.go b/internal/rpc/statistics/statistics.go new file mode 100644 index 000000000..8947c6a74 --- /dev/null +++ b/internal/rpc/statistics/statistics.go @@ -0,0 +1,399 @@ +package statistics + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "context" + "strconv" + "sync" + "time" + + //"Open_IM/pkg/common/constant" + //"Open_IM/pkg/common/db" + imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + "Open_IM/pkg/common/log" + + //cp "Open_IM/pkg/common/utils" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbStatistics "Open_IM/pkg/proto/statistics" + + //open_im_sdk "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" + //"context" + errors "Open_IM/pkg/common/http" + "net" + "strings" + + "google.golang.org/grpc" +) + +type statisticsServer struct { + rpcPort int + rpcRegisterName string + etcdSchema string + etcdAddr []string +} + +func NewStatisticsServer(port int) *statisticsServer { + log.NewPrivateLog(constant.LogFileName) + return &statisticsServer{ + rpcPort: port, + rpcRegisterName: config.Config.RpcRegisterName.OpenImStatisticsName, + etcdSchema: config.Config.Etcd.EtcdSchema, + etcdAddr: config.Config.Etcd.EtcdAddr, + } +} + +func (s *statisticsServer) Run() { + log.NewInfo("0", "Statistics rpc start ") + + listenIP := "" + if config.Config.ListenIP == "" { + listenIP = "0.0.0.0" + } else { + listenIP = config.Config.ListenIP + } + address := listenIP + ":" + strconv.Itoa(s.rpcPort) + + //listener network + listener, err := net.Listen("tcp", address) + if err != nil { + panic("listening err:" + err.Error() + s.rpcRegisterName) + } + log.NewInfo("0", "listen network success, ", address, listener) + defer listener.Close() + //grpc server + srv := grpc.NewServer() + defer srv.GracefulStop() + //Service registers with etcd + pbStatistics.RegisterUserServer(srv, s) + rpcRegisterIP := config.Config.RpcRegisterIP + if config.Config.RpcRegisterIP == "" { + rpcRegisterIP, err = utils.GetLocalIP() + if err != nil { + log.Error("", "GetLocalIP failed ", err.Error()) + } + } + err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) + if err != nil { + log.NewError("0", "RegisterEtcd failed ", err.Error()) + panic(utils.Wrap(err, "register statistics module rpc to etcd err")) + } + err = srv.Serve(listener) + if err != nil { + log.NewError("0", "Serve failed ", err.Error()) + return + } + log.NewInfo("0", "statistics rpc success") +} + +func (s *statisticsServer) GetActiveGroup(_ context.Context, req *pbStatistics.GetActiveGroupReq) (*pbStatistics.GetActiveGroupResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req", req.String()) + resp := &pbStatistics.GetActiveGroupResp{} + fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error()) + return resp, errors.WrapError(constant.ErrArgs) + } + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "time: ", fromTime, toTime) + activeGroups, err := imdb.GetActiveGroups(fromTime, toTime, 12) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetActiveGroups failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + for _, activeGroup := range activeGroups { + resp.Groups = append(resp.Groups, + &pbStatistics.GroupResp{ + GroupName: activeGroup.Name, + GroupId: activeGroup.Id, + MessageNum: int32(activeGroup.MessageNum), + }) + } + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String()) + return resp, nil +} + +func (s *statisticsServer) GetActiveUser(_ context.Context, req *pbStatistics.GetActiveUserReq) (*pbStatistics.GetActiveUserResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String()) + resp := &pbStatistics.GetActiveUserResp{} + fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "time: ", fromTime, toTime) + activeUsers, err := imdb.GetActiveUsers(fromTime, toTime, 12) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetActiveUsers failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + for _, activeUser := range activeUsers { + resp.Users = append(resp.Users, + &pbStatistics.UserResp{ + UserId: activeUser.Id, + NickName: activeUser.Name, + MessageNum: int32(activeUser.MessageNum), + }, + ) + } + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String()) + return resp, nil +} + +func ParseTimeFromTo(from, to string) (time.Time, time.Time, error) { + var fromTime time.Time + var toTime time.Time + fromTime, err := utils.TimeStringToTime(from) + if err != nil { + return fromTime, toTime, err + } + toTime, err = utils.TimeStringToTime(to) + if err != nil { + return fromTime, toTime, err + } + return fromTime, toTime, nil +} + +func isInOneMonth(from, to time.Time) bool { + return from.Month() == to.Month() && from.Year() == to.Year() +} + +func GetRangeDate(from, to time.Time) [][2]time.Time { + interval := to.Sub(from) + var times [][2]time.Time + switch { + // today + case interval == 0: + times = append(times, [2]time.Time{ + from, from.Add(time.Hour * 24), + }) + // days + case isInOneMonth(from, to): + for i := 0; ; i++ { + fromTime := from.Add(time.Hour * 24 * time.Duration(i)) + toTime := from.Add(time.Hour * 24 * time.Duration(i+1)) + if toTime.After(to.Add(time.Hour * 24)) { + break + } + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + } + // month + case !isInOneMonth(from, to): + if to.Sub(from) < time.Hour*24*30 { + for i := 0; ; i++ { + fromTime := from.Add(time.Hour * 24 * time.Duration(i)) + toTime := from.Add(time.Hour * 24 * time.Duration(i+1)) + if toTime.After(to.Add(time.Hour * 24)) { + break + } + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + } + } else { + for i := 0; ; i++ { + if i == 0 { + fromTime := from + toTime := getFirstDateOfNextNMonth(fromTime, 1) + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + } else { + fromTime := getFirstDateOfNextNMonth(from, i) + toTime := getFirstDateOfNextNMonth(fromTime, 1) + if toTime.After(to) { + toTime = to + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + break + } + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + } + + } + } + } + return times +} + +func getFirstDateOfNextNMonth(currentTime time.Time, n int) time.Time { + lastOfMonth := time.Date(currentTime.Year(), currentTime.Month(), 1, 0, 0, 0, 0, currentTime.Location()).AddDate(0, n, 0) + return lastOfMonth +} + +func (s *statisticsServer) GetGroupStatistics(_ context.Context, req *pbStatistics.GetGroupStatisticsReq) (*pbStatistics.GetGroupStatisticsResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String()) + resp := &pbStatistics.GetGroupStatisticsResp{} + fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupStatistics failed", err.Error()) + return resp, errors.WrapError(constant.ErrArgs) + } + increaseGroupNum, err := imdb.GetIncreaseGroupNum(fromTime, toTime.Add(time.Hour*24)) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum failed", err.Error(), fromTime, toTime) + return resp, errors.WrapError(constant.ErrDB) + } + totalGroupNum, err := imdb.GetTotalGroupNum() + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + resp.IncreaseGroupNum = increaseGroupNum + resp.TotalGroupNum = totalGroupNum + times := GetRangeDate(fromTime, toTime) + log.NewDebug(req.OperationID, "times:", times) + wg := &sync.WaitGroup{} + resp.IncreaseGroupNumList = make([]*pbStatistics.DateNumList, len(times), len(times)) + resp.TotalGroupNumList = make([]*pbStatistics.DateNumList, len(times), len(times)) + wg.Add(len(times)) + for i, v := range times { + go func(wg *sync.WaitGroup, index int, v [2]time.Time) { + defer wg.Done() + num, err := imdb.GetIncreaseGroupNum(v[0], v[1]) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error()) + } + resp.IncreaseGroupNumList[index] = &pbStatistics.DateNumList{ + Date: v[0].String(), + Num: num, + } + num, err = imdb.GetGroupNum(v[1]) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error()) + } + resp.TotalGroupNumList[index] = &pbStatistics.DateNumList{ + Date: v[0].String(), + Num: num, + } + }(wg, i, v) + } + wg.Wait() + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp) + return resp, nil +} + +func (s *statisticsServer) GetMessageStatistics(_ context.Context, req *pbStatistics.GetMessageStatisticsReq) (*pbStatistics.GetMessageStatisticsResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String()) + resp := &pbStatistics.GetMessageStatisticsResp{} + fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To) + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "times: ", fromTime, toTime) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error()) + return resp, errors.WrapError(constant.ErrArgs) + } + privateMessageNum, err := imdb.GetPrivateMessageNum(fromTime, toTime.Add(time.Hour*24)) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetPrivateMessageNum failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + groupMessageNum, err := imdb.GetGroupMessageNum(fromTime, toTime.Add(time.Hour*24)) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupMessageNum failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), privateMessageNum, groupMessageNum) + resp.PrivateMessageNum = privateMessageNum + resp.GroupMessageNum = groupMessageNum + times := GetRangeDate(fromTime, toTime) + resp.GroupMessageNumList = make([]*pbStatistics.DateNumList, len(times), len(times)) + resp.PrivateMessageNumList = make([]*pbStatistics.DateNumList, len(times), len(times)) + wg := &sync.WaitGroup{} + wg.Add(len(times)) + for i, v := range times { + go func(wg *sync.WaitGroup, index int, v [2]time.Time) { + defer wg.Done() + + num, err := imdb.GetPrivateMessageNum(v[0], v[1]) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error()) + } + resp.PrivateMessageNumList[index] = &pbStatistics.DateNumList{ + Date: v[0].String(), + Num: num, + } + num, err = imdb.GetGroupMessageNum(v[0], v[1]) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error()) + } + resp.GroupMessageNumList[index] = &pbStatistics.DateNumList{ + Date: v[0].String(), + Num: num, + } + }(wg, i, v) + } + wg.Wait() + return resp, nil +} + +func (s *statisticsServer) GetUserStatistics(_ context.Context, req *pbStatistics.GetUserStatisticsReq) (*pbStatistics.GetUserStatisticsResp, error) { + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) + resp := &pbStatistics.GetUserStatisticsResp{} + fromTime, toTime, err := ParseTimeFromTo(req.StatisticsReq.From, req.StatisticsReq.To) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "ParseTimeFromTo failed", err.Error()) + return resp, errors.WrapError(constant.ErrArgs) + } + activeUserNum, err := imdb.GetActiveUserNum(fromTime, toTime.Add(time.Hour*24)) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetActiveUserNum failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + increaseUserNum, err := imdb.GetIncreaseUserNum(fromTime, toTime.Add(time.Hour*24)) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseUserNum failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + totalUserNum, err := imdb.GetTotalUserNum() + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetTotalUserNum failed", err.Error()) + return resp, errors.WrapError(constant.ErrDB) + } + resp.ActiveUserNum = activeUserNum + resp.TotalUserNum = totalUserNum + resp.IncreaseUserNum = increaseUserNum + times := GetRangeDate(fromTime, toTime) + resp.TotalUserNumList = make([]*pbStatistics.DateNumList, len(times), len(times)) + resp.ActiveUserNumList = make([]*pbStatistics.DateNumList, len(times), len(times)) + resp.IncreaseUserNumList = make([]*pbStatistics.DateNumList, len(times), len(times)) + wg := &sync.WaitGroup{} + wg.Add(len(times)) + for i, v := range times { + go func(wg *sync.WaitGroup, index int, v [2]time.Time) { + defer wg.Done() + num, err := imdb.GetActiveUserNum(v[0], v[1]) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error()) + } + resp.ActiveUserNumList[index] = &pbStatistics.DateNumList{ + Date: v[0].String(), + Num: num, + } + + num, err = imdb.GetTotalUserNumByDate(v[1]) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetTotalUserNumByDate", v, err.Error()) + } + resp.TotalUserNumList[index] = &pbStatistics.DateNumList{ + Date: v[0].String(), + Num: num, + } + num, err = imdb.GetIncreaseUserNum(v[0], v[1]) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseUserNum", v, err.Error()) + } + resp.IncreaseUserNumList[index] = &pbStatistics.DateNumList{ + Date: v[0].String(), + Num: num, + } + }(wg, i, v) + } + wg.Wait() + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp) + return resp, nil +} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 10078a170..56670eb72 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -76,7 +76,7 @@ func (s *userServer) Run() { err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10) if err != nil { log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName) - return + panic(utils.Wrap(err, "register user module rpc to etcd err")) } err = srv.Serve(listener) if err != nil { @@ -316,13 +316,18 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp conversation.ConversationType = constant.GroupChatType } } - err := imdb.SetRecvMsgOpt(conversation) + isUpdate, err := imdb.SetRecvMsgOpt(conversation) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} return resp, nil } - if err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID); err != nil { + if isUpdate { + err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID) + } else { + err = rocksCache.DelUserConversationIDListFromCache(conversation.OwnerUserID) + } + if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), conversation.ConversationID, err.Error()) } chat.ConversationChangeNotification(req.OperationID, req.OwnerUserID) diff --git a/pkg/base_info/conversation_api_struct.go b/pkg/base_info/conversation_api_struct.go index c0cb9fccf..be913a3c0 100644 --- a/pkg/base_info/conversation_api_struct.go +++ b/pkg/base_info/conversation_api_struct.go @@ -45,7 +45,7 @@ type Conversation struct { IsPrivateChat bool `json:"isPrivateChat"` GroupAtType int32 `json:"groupAtType"` IsNotInGroup bool `json:"isNotInGroup"` - UpdateUnreadCountTime int64 ` json:"updateUnreadCountTime"` + UpdateUnreadCountTime int64 `json:"updateUnreadCountTime"` AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } diff --git a/pkg/call_back_struct/common.go b/pkg/call_back_struct/common.go index ea61818e5..c3e5a4a71 100644 --- a/pkg/call_back_struct/common.go +++ b/pkg/call_back_struct/common.go @@ -17,6 +17,7 @@ type CommonCallbackReq struct { Seq uint32 `json:"seq"` AtUserIDList []string `json:"atUserList"` SenderFaceURL string `json:"faceURL"` + Ex string `json:"ex"` } type CommonCallbackResp struct { diff --git a/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go b/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go index 2a645d54f..52fe1dcb9 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go @@ -41,16 +41,18 @@ func PeerUserSetConversation(conversation db.Conversation) error { } -func SetRecvMsgOpt(conversation db.Conversation) error { +func SetRecvMsgOpt(conversation db.Conversation) (bool, error) { + var isUpdate bool newConversation := conversation if db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Find(&newConversation).RowsAffected == 0 { log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "not exist in db, create") - return db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error + return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error // if exist, then update record } else { log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "exist in db, update") //force update - return db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID). + isUpdate = true + return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID). Updates(map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt}).Error } } diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index 8ebfbc23a..08dcddd01 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -418,9 +418,12 @@ func GetJoinedSuperGroupListFromCache(userID string) ([]string, error) { return string(bytes), nil } joinedSuperGroupListStr, err := db.DB.Rc.Fetch(joinedSuperGroupListCache+userID, time.Second*30*60, getJoinedSuperGroupIDList) + if err != nil { + return nil, err + } var joinedSuperGroupList []string err = json.Unmarshal([]byte(joinedSuperGroupListStr), &joinedSuperGroupList) - return joinedSuperGroupList, err + return joinedSuperGroupList, utils.Wrap(err, "") } func DelJoinedSuperGroupIDListFromCache(userID string) error { @@ -493,7 +496,7 @@ func GetUserConversationIDListFromCache(userID string) ([]string, error) { } func DelUserConversationIDListFromCache(userID string) error { - return db.DB.Rc.TagAsDeleted(conversationIDListCache + userID) + return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationIDListCache+userID), "DelUserConversationIDListFromCache err") } func GetConversationFromCache(ownerUserID, conversationID string) (*db.Conversation, error) { @@ -550,5 +553,5 @@ func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) { } func DelConversationFromCache(ownerUserID, conversationID string) error { - return db.DB.Rc.TagAsDeleted(conversationCache + ownerUserID + ":" + conversationID) + return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err") } diff --git a/script/sdk_svr_start.sh b/script/sdk_svr_start.sh index 5ac90501d..115ceff82 100644 --- a/script/sdk_svr_start.sh +++ b/script/sdk_svr_start.sh @@ -4,17 +4,13 @@ source ./style_info.cfg source ./path_info.cfg source ./function.sh ulimit -n 200000 -list1=$(cat $config_path | grep openImApiPort | awk -F '[:]' '{print $NF}') -list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}') + +ws_address=$(cat $config_path | grep openImWsAddress | awk -F '[ ]' '{print $NF}') +api_address=$(cat $config_path | grep openImApiAddress | awk -F '[ ]' '{print $NF}') list3=$(cat $config_path | grep openImSdkWsPort | awk -F '[:]' '{print $NF}') logLevel=$(cat $config_path | grep remainLogLevel | awk -F '[:]' '{print $NF}') -list_to_string $list1 -api_ports=($ports_array) -list_to_string $list2 -ws_ports=($ports_array) list_to_string $list3 sdk_ws_ports=($ports_array) -list_to_string $list4 @@ -28,7 +24,7 @@ fi #Waiting port recycling sleep 1 cd ${sdk_server_binary_root} - nohup ./${sdk_server_name} -openIM_api_port ${api_ports[0]} -openIM_ws_port ${ws_ports[0]} -sdk_ws_port ${sdk_ws_ports[0]} -openIM_log_level ${logLevel} >>../logs/openIM.log 2>&1 & + nohup ./${sdk_server_name} -openIM_ws_address ${ws_address} -sdk_ws_port ${sdk_ws_ports[0]} -openIM_api_address ${api_address} -openIM_log_level ${logLevel} >>../logs/openIM.log 2>&1 & #Check launched service process sleep 3