You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
426 lines
15 KiB
426 lines
15 KiB
package msg
|
|
|
|
import (
|
|
"Open_IM/internal/common/notification"
|
|
"Open_IM/pkg/common/constant"
|
|
"Open_IM/pkg/common/db"
|
|
"Open_IM/pkg/common/log"
|
|
"Open_IM/pkg/proto/msg"
|
|
"Open_IM/pkg/proto/sdkws"
|
|
"Open_IM/pkg/utils"
|
|
"context"
|
|
go_redis "github.com/go-redis/redis/v8"
|
|
|
|
"time"
|
|
)
|
|
|
|
func (rpc *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
|
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
|
|
var rResp msg.SetMessageReactionExtensionsResp
|
|
rResp.ClientMsgID = req.ClientMsgID
|
|
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
|
callbackResp := notification.callbackSetMessageReactionExtensions(req)
|
|
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
|
rResp.ErrCode = int32(callbackResp.ErrCode)
|
|
rResp.ErrMsg = callbackResp.ErrMsg
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = callbackResp.ErrMsg
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
//if ExternalExtension
|
|
if req.IsExternalExtensions {
|
|
var isHistory bool
|
|
if req.IsReact {
|
|
isHistory = false
|
|
} else {
|
|
isHistory = true
|
|
}
|
|
rResp.MsgFirstModifyTime = callbackResp.MsgFirstModifyTime
|
|
rResp.Result = callbackResp.ResultReactionExtensionList
|
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, isHistory, false)
|
|
return &rResp, nil
|
|
}
|
|
for _, v := range callbackResp.ResultReactionExtensionList {
|
|
if v.ErrCode == 0 {
|
|
req.ReactionExtensionList[v.KeyValue.TypeKey] = v.KeyValue
|
|
} else {
|
|
delete(req.ReactionExtensionList, v.KeyValue.TypeKey)
|
|
rResp.Result = append(rResp.Result, v)
|
|
}
|
|
}
|
|
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
|
|
if err != nil {
|
|
rResp.ErrCode = 100
|
|
rResp.ErrMsg = err.Error()
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
|
|
if !isExists {
|
|
if !req.IsReact {
|
|
log.Debug(req.OperationID, "redis handle firstly", req.String())
|
|
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
|
for k, v := range req.ReactionExtensionList {
|
|
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
|
if err != nil {
|
|
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
|
continue
|
|
}
|
|
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
|
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
|
if newerr != nil {
|
|
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, v)
|
|
continue
|
|
}
|
|
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
|
|
}
|
|
rResp.IsReact = true
|
|
_, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
|
|
if err != nil {
|
|
log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String())
|
|
}
|
|
} else {
|
|
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
|
|
if err != nil {
|
|
rResp.ErrCode = 100
|
|
rResp.ErrMsg = err.Error()
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
|
if err != nil {
|
|
rResp.ErrCode = 200
|
|
rResp.ErrMsg = err.Error()
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
setValue := make(map[string]*sdkws.KeyValue)
|
|
for k, v := range req.ReactionExtensionList {
|
|
|
|
temp := new(sdkws.KeyValue)
|
|
if vv, ok := mongoValue.ReactionExtensionList[k]; ok {
|
|
utils.CopyStructFields(temp, &vv)
|
|
if v.LatestUpdateTime != vv.LatestUpdateTime {
|
|
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
|
|
continue
|
|
}
|
|
}
|
|
temp.TypeKey = k
|
|
temp.Value = v.Value
|
|
temp.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
|
setValue[k] = temp
|
|
}
|
|
err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
|
if err != nil {
|
|
for _, value := range setValue {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
} else {
|
|
for _, value := range setValue {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
}
|
|
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
|
if lockErr != nil {
|
|
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
|
}
|
|
}
|
|
|
|
} else {
|
|
log.Debug(req.OperationID, "redis handle secondly", req.String())
|
|
|
|
for k, v := range req.ReactionExtensionList {
|
|
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
|
if err != nil {
|
|
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
|
continue
|
|
}
|
|
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
|
if err != nil && err != go_redis.Nil {
|
|
setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v)
|
|
continue
|
|
}
|
|
temp := new(sdkws.KeyValue)
|
|
utils.JsonStringToStruct(redisValue, temp)
|
|
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
|
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
|
|
continue
|
|
} else {
|
|
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
|
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
|
if newerr != nil {
|
|
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
|
continue
|
|
}
|
|
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
|
|
}
|
|
|
|
}
|
|
}
|
|
if !isExists {
|
|
if !req.IsReact {
|
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true, true)
|
|
} else {
|
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
|
|
}
|
|
} else {
|
|
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true)
|
|
}
|
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
|
|
return &rResp, nil
|
|
|
|
}
|
|
func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = keyValue
|
|
temp.ErrCode = errCode
|
|
temp.ErrMsg = errMsg
|
|
r.Result = append(r.Result, temp)
|
|
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
|
}
|
|
func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = keyValue
|
|
temp.ErrCode = errCode
|
|
temp.ErrMsg = errMsg
|
|
r.Result = append(r.Result, temp)
|
|
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
|
}
|
|
|
|
func (rpc *msgServer) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) {
|
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
|
|
var rResp msg.GetMessageListReactionExtensionsResp
|
|
for _, messageValue := range req.MessageReactionKeyList {
|
|
var oneMessage msg.SingleMessageExtensionResult
|
|
oneMessage.ClientMsgID = messageValue.ClientMsgID
|
|
|
|
isExists, err := db.DB.JudgeMessageReactionEXISTS(messageValue.ClientMsgID, req.SessionType)
|
|
if err != nil {
|
|
rResp.ErrCode = 100
|
|
rResp.ErrMsg = err.Error()
|
|
return &rResp, nil
|
|
}
|
|
if isExists {
|
|
redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType)
|
|
if err != nil {
|
|
oneMessage.ErrCode = 100
|
|
oneMessage.ErrMsg = err.Error()
|
|
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
|
continue
|
|
}
|
|
keyMap := make(map[string]*sdkws.KeyValue)
|
|
|
|
for k, v := range redisValue {
|
|
temp := new(sdkws.KeyValue)
|
|
utils.JsonStringToStruct(v, temp)
|
|
keyMap[k] = temp
|
|
}
|
|
oneMessage.ReactionExtensionList = keyMap
|
|
|
|
} else {
|
|
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime)
|
|
if err != nil {
|
|
oneMessage.ErrCode = 100
|
|
oneMessage.ErrMsg = err.Error()
|
|
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
|
continue
|
|
}
|
|
keyMap := make(map[string]*sdkws.KeyValue)
|
|
|
|
for k, v := range mongoValue.ReactionExtensionList {
|
|
temp := new(sdkws.KeyValue)
|
|
temp.TypeKey = v.TypeKey
|
|
temp.Value = v.Value
|
|
temp.LatestUpdateTime = v.LatestUpdateTime
|
|
keyMap[k] = temp
|
|
}
|
|
oneMessage.ReactionExtensionList = keyMap
|
|
}
|
|
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
|
}
|
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
|
|
return &rResp, nil
|
|
|
|
}
|
|
|
|
func (rpc *msgServer) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) {
|
|
return
|
|
}
|
|
|
|
func (rpc *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
|
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
|
|
var rResp msg.DeleteMessageListReactionExtensionsResp
|
|
callbackResp := notification.callbackDeleteMessageReactionExtensions(req)
|
|
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
|
rResp.ErrCode = int32(callbackResp.ErrCode)
|
|
rResp.ErrMsg = callbackResp.ErrMsg
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = callbackResp.ErrMsg
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
//if ExternalExtension
|
|
if req.IsExternalExtensions {
|
|
rResp.Result = callbackResp.ResultReactionExtensionList
|
|
notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
|
|
return &rResp, nil
|
|
|
|
}
|
|
for _, v := range callbackResp.ResultReactionExtensionList {
|
|
if v.ErrCode != 0 {
|
|
func(req *[]*sdkws.KeyValue, typeKey string) {
|
|
for i := 0; i < len(*req); i++ {
|
|
if (*req)[i].TypeKey == typeKey {
|
|
*req = append((*req)[:i], (*req)[i+1:]...)
|
|
}
|
|
}
|
|
}(&req.ReactionExtensionList, v.KeyValue.TypeKey)
|
|
rResp.Result = append(rResp.Result, v)
|
|
}
|
|
}
|
|
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
|
|
if err != nil {
|
|
rResp.ErrCode = 100
|
|
rResp.ErrMsg = err.Error()
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
|
|
if isExists {
|
|
log.Debug(req.OperationID, "redis handle this delete", req.String())
|
|
for _, v := range req.ReactionExtensionList {
|
|
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
|
|
if err != nil {
|
|
setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
|
continue
|
|
}
|
|
|
|
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey)
|
|
if err != nil && err != go_redis.Nil {
|
|
setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
|
continue
|
|
}
|
|
temp := new(sdkws.KeyValue)
|
|
utils.JsonStringToStruct(redisValue, temp)
|
|
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
|
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
|
continue
|
|
} else {
|
|
newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey)
|
|
if newErr != nil {
|
|
setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp)
|
|
continue
|
|
}
|
|
setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v)
|
|
}
|
|
}
|
|
} else {
|
|
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
|
|
if err != nil {
|
|
rResp.ErrCode = 100
|
|
rResp.ErrMsg = err.Error()
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
|
if err != nil {
|
|
rResp.ErrCode = 200
|
|
rResp.ErrMsg = err.Error()
|
|
for _, value := range req.ReactionExtensionList {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
return &rResp, nil
|
|
}
|
|
setValue := make(map[string]*sdkws.KeyValue)
|
|
for _, v := range req.ReactionExtensionList {
|
|
|
|
temp := new(sdkws.KeyValue)
|
|
if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok {
|
|
utils.CopyStructFields(temp, &vv)
|
|
if v.LatestUpdateTime != vv.LatestUpdateTime {
|
|
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
|
continue
|
|
}
|
|
} else {
|
|
setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v)
|
|
continue
|
|
}
|
|
temp.TypeKey = v.TypeKey
|
|
setValue[v.TypeKey] = temp
|
|
}
|
|
err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
|
if err != nil {
|
|
for _, value := range setValue {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
temp.ErrMsg = err.Error()
|
|
temp.ErrCode = 100
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
} else {
|
|
for _, value := range setValue {
|
|
temp := new(msg.KeyValueResp)
|
|
temp.KeyValue = value
|
|
rResp.Result = append(rResp.Result, temp)
|
|
}
|
|
}
|
|
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
|
if lockErr != nil {
|
|
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
|
}
|
|
|
|
}
|
|
notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists)
|
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
|
|
return &rResp, nil
|
|
}
|