diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index 47afbbdaa..6be933548 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -16,6 +16,7 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) var rResp msg.SetMessageReactionExtensionsResp rResp.ClientMsgID = req.ClientMsgID + rResp.MsgFirstModifyTime = req.MsgFirstModifyTime isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType) if err != nil { rResp.ErrCode = 100 @@ -54,32 +55,67 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String()) } } else { - for k, v := range req.ReactionExtensionList { - err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) - if err != nil { - setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) - continue + err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID) + if err != nil { + rResp.ErrCode = 100 + rResp.ErrMsg = err.Error() + for _, value := range req.ReactionExtensionList { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + rResp.Result = append(rResp.Result, temp) } - redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) - if err != nil && err != go_redis.Nil { - setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v) - continue + return &rResp, nil + } + mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) + if err != nil { + rResp.ErrCode = 200 + rResp.ErrMsg = err.Error() + for _, value := range req.ReactionExtensionList { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + rResp.Result = append(rResp.Result, temp) } + return &rResp, nil + } + setValue := make(map[string]*server_api_params.KeyValue) + for k, v := range req.ReactionExtensionList { + temp := new(server_api_params.KeyValue) - utils.JsonStringToStruct(redisValue, temp) - if v.LatestUpdateTime != temp.LatestUpdateTime { - setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp) - continue - } else { - v.LatestUpdateTime = utils.GetCurrentTimestampByMill() - newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) - if newerr != nil { - setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp) + if vv, ok := mongoValue.ReactionExtensionList[k]; ok { + utils.CopyStructFields(temp, &vv) + if v.LatestUpdateTime != vv.LatestUpdateTime { + setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp) continue } - setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v) } - + temp.TypeKey = k + temp.Value = v.Value + temp.LatestUpdateTime = utils.GetCurrentTimestampByMill() + setValue[k] = temp + } + err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue) + if err != nil { + for _, value := range setValue { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + rResp.Result = append(rResp.Result, temp) + } + } else { + for _, value := range setValue { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + rResp.Result = append(rResp.Result, temp) + } + } + lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) + if lockErr != nil { + log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) } } @@ -114,10 +150,14 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S } } - if !isExists && !req.IsReact { - ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true) + if !isExists { + if !req.IsReact { + ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true, true) + } else { + ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false) + } } else { - ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false) + ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true) } log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) return &rResp, nil @@ -171,7 +211,23 @@ func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *m oneMessage.ReactionExtensionList = keyMap } else { + mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime) + if err != nil { + oneMessage.ErrCode = 100 + oneMessage.ErrMsg = err.Error() + rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) + continue + } + keyMap := make(map[string]*server_api_params.KeyValue) + for k, v := range mongoValue.ReactionExtensionList { + temp := new(server_api_params.KeyValue) + temp.TypeKey = v.TypeKey + temp.Value = v.Value + temp.LatestUpdateTime = v.LatestUpdateTime + keyMap[k] = temp + } + oneMessage.ReactionExtensionList = keyMap } rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) } @@ -230,9 +286,72 @@ func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *ms } } } else { + err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID) + if err != nil { + rResp.ErrCode = 100 + rResp.ErrMsg = err.Error() + for _, value := range req.ReactionExtensionList { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + rResp.Result = append(rResp.Result, temp) + } + return &rResp, nil + } + mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) + if err != nil { + rResp.ErrCode = 200 + rResp.ErrMsg = err.Error() + for _, value := range req.ReactionExtensionList { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + rResp.Result = append(rResp.Result, temp) + } + return &rResp, nil + } + setValue := make(map[string]*server_api_params.KeyValue) + for _, v := range req.ReactionExtensionList { + + temp := new(server_api_params.KeyValue) + if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok { + utils.CopyStructFields(temp, &vv) + if v.LatestUpdateTime != vv.LatestUpdateTime { + setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp) + continue + } + } else { + setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v) + continue + } + temp.TypeKey = v.TypeKey + setValue[v.TypeKey] = temp + } + err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue) + if err != nil { + for _, value := range setValue { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + rResp.Result = append(rResp.Result, temp) + } + } else { + for _, value := range setValue { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + rResp.Result = append(rResp.Result, temp) + } + } + lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) + if lockErr != nil { + log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) + } } - ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false) + ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists) log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) return &rResp, nil } diff --git a/internal/rpc/msg/extend_msg.notification.go b/internal/rpc/msg/extend_msg.notification.go index f4f8bd88b..6867de511 100644 --- a/internal/rpc/msg/extend_msg.notification.go +++ b/internal/rpc/msg/extend_msg.notification.go @@ -14,7 +14,7 @@ import ( ) func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32, - req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool) { + req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool, isReactionFromCache bool) { var m base_info.ReactionMessageModifierNotification m.SourceID = req.SourceID m.OpUserID = req.OpUserID @@ -30,10 +30,10 @@ func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID strin m.IsReact = resp.IsReact m.IsExternalExtensions = req.IsExternalExtensions m.MsgFirstModifyTime = resp.MsgFirstModifyTime - messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory) + messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory, isReactionFromCache) } func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string, sessionType int32, - req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool) { + req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool, isReactionFromCache bool) { var m base_info.ReactionMessageDeleteNotification m.SourceID = req.SourceID m.OpUserID = req.OpUserID @@ -48,14 +48,15 @@ func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string m.ClientMsgID = req.ClientMsgID m.MsgFirstModifyTime = req.MsgFirstModifyTime - messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory) + messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory, isReactionFromCache) } -func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool) { +func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool, isReactionFromCache bool) { options := make(map[string]bool, 5) utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false) utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false) utils.SetSwitchFromOptions(options, constant.IsSenderConversationUpdate, false) utils.SetSwitchFromOptions(options, constant.IsUnreadCount, false) + utils.SetSwitchFromOptions(options, constant.IsReactionFromCache, isReactionFromCache) if !isHistory { utils.SetSwitchFromOptions(options, constant.IsHistory, false) utils.SetSwitchFromOptions(options, constant.IsPersistent, false) diff --git a/internal/rpc/msg/lock.go b/internal/rpc/msg/lock.go index 4f40b62a0..cef59cb1a 100644 --- a/internal/rpc/msg/lock.go +++ b/internal/rpc/msg/lock.go @@ -5,9 +5,13 @@ import ( "time" ) +const GlOBLLOCK = "GLOBAL_LOCK" + type MessageLocker interface { LockMessageTypeKey(clientMsgID, typeKey string) (err error) UnLockMessageTypeKey(clientMsgID string, typeKey string) error + LockGlobalMessage(clientMsgID string) (err error) + UnLockGlobalMessage(clientMsgID string) (err error) } type LockerMessage struct{} @@ -26,7 +30,23 @@ func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err err } return err +} +func (l *LockerMessage) LockGlobalMessage(clientMsgID string) (err error) { + for i := 0; i < 3; i++ { + err = db.DB.LockMessageTypeKey(clientMsgID, GlOBLLOCK) + if err != nil { + time.Sleep(time.Millisecond * 100) + continue + } else { + break + } + } + return err + } func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error { return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) } +func (l *LockerMessage) UnLockGlobalMessage(clientMsgID string) error { + return db.DB.UnLockMessageTypeKey(clientMsgID, GlOBLLOCK) +} diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 522b942a7..9ca71ea19 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -171,6 +171,7 @@ const ( IsNotPrivate = "notPrivate" IsSenderConversationUpdate = "senderConversationUpdate" IsSenderNotificationPush = "senderNotificationPush" + IsReactionFromCache = "reactionFromCache" //GroupStatus GroupOk = 0