From 170392f0bcfe14453fa9285a0201131643d07ded Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 30 Mar 2022 18:23:05 +0800 Subject: [PATCH] oa notification add --- .../msg_transfer/logic/history_msg_handler.go | 16 +++++++++++++ .../logic/persistent_msg_handler.go | 23 +++++++++++-------- internal/rpc/msg/send_msg.go | 17 ++++++++++++++ 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go index 7d729f5cd..0b9f5c1c3 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/history_msg_handler.go @@ -80,6 +80,22 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) mc.groupMsgCount++ } go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) + case constant.NotificationChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) + if isHistory { + err := saveUserChat(msgKey, &msgFromMQ) + if err != nil { + log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + return + } + mc.singleMsgCount++ + log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time) + } + if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { + } else { + go sendMessageToPush(&msgFromMQ, msgKey) + } + log.NewDebug(operationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time) default: log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) return diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 59c338294..5400d54d6 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -34,26 +34,31 @@ func (pc *PersistentConsumerHandler) Init() { func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) { log.NewInfo("msg come here mysql!!!", "", "msg", string(msg)) + var tag bool msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { - log.ErrorByKv("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error()) return } //Control whether to store history messages (mysql) isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) //Only process receiver data if isPersist { - if msgKey == msgFromMQ.MsgData.RecvID && msgFromMQ.MsgData.SessionType == constant.SingleChatType { - log.InfoByKv("msg_transfer msg persisting", msgFromMQ.OperationID) - if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil { - log.ErrorByKv("Message insert failed", msgFromMQ.OperationID, "err", err.Error(), "msg", msgFromMQ.String()) - return + switch msgFromMQ.MsgData.SessionType { + case constant.SingleChatType, constant.NotificationChatType: + if msgKey == msgFromMQ.MsgData.RecvID { + tag = true } - } else if msgFromMQ.MsgData.SessionType == constant.GroupChatType && msgKey == msgFromMQ.MsgData.SendID { - log.InfoByKv("msg_transfer msg persisting", msgFromMQ.OperationID) + case constant.GroupChatType: + if msgKey == msgFromMQ.MsgData.SendID || utils.IsContain(msgFromMQ.MsgData.SendID, config.Config.Manager.AppManagerUid) { + tag = true + } + } + if tag { + log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg)) if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil { - log.ErrorByKv("Message insert failed", msgFromMQ.OperationID, "err", err.Error(), "msg", msgFromMQ.String()) + log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String()) return } } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 6cd321d07..c84549f6c 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -281,6 +281,23 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error()) } return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + case constant.NotificationChatType: + msgToMQ.MsgData = pb.MsgData + log.NewInfo(msgToMQ.OperationID, msgToMQ) + err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID) + if err1 != nil { + log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String()) + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } + + if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself + err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID) + if err2 != nil { + log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String()) + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } + } + return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) default: return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) }