diff --git a/internal/rpc/chat/send_msg.go b/internal/rpc/chat/send_msg.go index 298f3ab65..530db7fd4 100644 --- a/internal/rpc/chat/send_msg.go +++ b/internal/rpc/chat/send_msg.go @@ -5,6 +5,7 @@ import ( "Open_IM/internal/push/content_struct" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" http2 "Open_IM/pkg/common/http" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -105,9 +106,17 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* } switch pbData.SessionType { case constant.SingleChatType: - err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) + isSend := modifyMessageByUserMessageReceiveOpt(pbData.RecvID, pbData.SendID, constant.SingleChatType, &pbData) + if isSend { + err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) + if err1 != nil { + log.NewError(pbData.OperationID, "kafka send msg err:RecvID", pbData.RecvID, pbData.String()) + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } + } err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) - if err1 != nil || err2 != nil { + if err2 != nil { + log.NewError(pbData.OperationID, "kafka send msg err:SendID", pbData.SendID, pbData.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) @@ -154,16 +163,25 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* groupID := pbData.RecvID for i, v := range reply.MemberList { pbData.RecvID = v.UserId + " " + groupID - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) - if err != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + isSend := modifyMessageByUserMessageReceiveOpt(v.UserId, groupID, constant.GroupChatType, &pbData) + if isSend { + err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) + if err != nil { + log.NewError(pbData.OperationID, "kafka send msg err:UserId", v.UserId, pbData.String()) + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } } + } for i, v := range addUidList { pbData.RecvID = v + " " + groupID - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) - if err != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &pbData) + if isSend { + err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) + if err != nil { + log.NewError(pbData.OperationID, "kafka send msg err:UserId", v, pbData.String()) + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } } } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) @@ -193,3 +211,25 @@ func returnMsg(replay *pbChat.UserSendMsgResp, pb *pbChat.UserSendMsgReq, errCod replay.SendTime = sendTime return replay, nil } +func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, msg *pbChat.WSToMsgSvrChatMsg) bool { + conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) + opt, err := db.DB.GetConversationMsgOpt(userID, conversationID) + if err != nil { + log.NewError(msg.OperationID, "GetConversationMsgOpt from redis err", msg.String()) + return true + } + switch opt { + case constant.ReceiveMessage: + return true + case constant.NotReceiveMessage: + return false + case constant.ReceiveNotNotifyMessage: + m := utils.JsonStringToMap(msg.OfflineInfo) + utils.SetSwitchFromOptions(m, "offlinePush", 0) + s := utils.MapToJsonString(m) + msg.OfflineInfo = s + return true + } + + return true +} diff --git a/pkg/utils/map.go b/pkg/utils/map.go index 2807805db..48acadc60 100644 --- a/pkg/utils/map.go +++ b/pkg/utils/map.go @@ -116,8 +116,11 @@ func JsonStringToMap(str string) (tempMap map[string]interface{}) { return tempMap } func GetSwitchFromOptions(Options map[string]interface{}, key string) (result bool) { - if flag, ok := Options[key]; !ok || flag == 1 { + if flag, ok := Options[key]; !ok || flag.(int) == 1 { return true } return false } +func SetSwitchFromOptions(Options map[string]interface{}, key string, value interface{}) { + Options[key] = value +} diff --git a/pkg/utils/strings.go b/pkg/utils/strings.go index 4249eee6b..5ee2a6503 100644 --- a/pkg/utils/strings.go +++ b/pkg/utils/strings.go @@ -7,6 +7,7 @@ package utils import ( + "Open_IM/pkg/common/constant" "encoding/json" "math/rand" "strconv" @@ -63,6 +64,15 @@ func GetMsgID(sendID string) string { t := int64ToString(GetCurrentTimestampByNano()) return Md5(t + sendID + int64ToString(rand.Int63n(GetCurrentTimestampByNano()))) } +func GetConversationIDBySessionType(sourceID string, sessionType int) string { + switch sessionType { + case constant.SingleChatType: + return "single_" + sourceID + case constant.GroupChatType: + return "group_" + sourceID + } + return "" +} func int64ToString(i int64) string { return strconv.FormatInt(i, 10) }