From 3a23153e488f64f16761d299b19c6a44d5d3395d Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 3 Mar 2023 18:44:14 +0800 Subject: [PATCH] proto modify --- internal/api/route.go | 8 +- internal/common/check/msg.go | 4 +- internal/rpc/msg/delete.go | 2 +- internal/rpc/msg/extend_msg.go | 659 ++++++++++++++++----------------- pkg/common/db/cache/group.go | 2 +- pkg/common/db/cache/redis.go | 2 +- pkg/proto/friend/friend.proto | 15 +- pkg/proto/group/group.proto | 6 +- 8 files changed, 348 insertions(+), 350 deletions(-) diff --git a/internal/api/route.go b/internal/api/route.go index 4ac9e3bf2..a71bc9f08 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -129,10 +129,10 @@ func NewGinRouter() *gin.Engine { chatGroup.POST("/check_msg_is_send_success", manage.CheckMsgIsSendSuccess) chatGroup.POST("/set_msg_min_seq", msg.SetMsgMinSeq) - chatGroup.POST("/set_message_reaction_extensions", msg.SetMessageReactionExtensions) - chatGroup.POST("/get_message_list_reaction_extensions", msg.GetMessageListReactionExtensions) - chatGroup.POST("/add_message_reaction_extensions", msg.AddMessageReactionExtensions) - chatGroup.POST("/delete_message_reaction_extensions", msg.DeleteMessageReactionExtensions) + //chatGroup.POST("/set_message_reaction_extensions", msg.SetMessageReactionExtensions) + //chatGroup.POST("/get_message_list_reaction_extensions", msg.GetMessageListReactionExtensions) + //chatGroup.POST("/add_message_reaction_extensions", msg.AddMessageReactionExtensions) + //chatGroup.POST("/delete_message_reaction_extensions", msg.DeleteMessageReactionExtensions) } ////Conversation conversationGroup := r.Group("/conversation") diff --git a/internal/common/check/msg.go b/internal/common/check/msg.go index e9c5a17ec..12a3761a6 100644 --- a/internal/common/check/msg.go +++ b/internal/common/check/msg.go @@ -39,12 +39,12 @@ func (m *MsgCheck) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinS return resp, err } -func (m *MsgCheck) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) { +func (m *MsgCheck) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { cc, err := m.getConn() if err != nil { return nil, err } - resp, err := msg.NewMsgClient(cc).PullMessageBySeqList(ctx, req) + resp, err := msg.NewMsgClient(cc).PullMessageBySeqs(ctx, req) return resp, err } diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index 74c4f2c07..68af72f50 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -11,7 +11,7 @@ func (m *msgServer) DelMsgs(ctx context.Context, req *msg.DelMsgsReq) (*msg.DelM if _, err := m.MsgDatabase.DelMsgBySeqs(ctx, req.UserID, req.Seqs); err != nil { return nil, err } - DeleteMessageNotification(ctx, req.UserID, req.Seqs) + //DeleteMessageNotification(ctx, req.UserID, req.Seqs) return resp, nil } diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index eebd42361..25d223ff6 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -1,216 +1,209 @@ package msg import ( - "OpenIM/internal/common/notification" - "OpenIM/pkg/common/constant" - "OpenIM/pkg/common/log" "OpenIM/pkg/proto/msg" "OpenIM/pkg/proto/sdkws" - "OpenIM/pkg/utils" "context" - go_redis "github.com/go-redis/redis/v8" - - "time" ) func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) { - resp = &msg.SetMessageReactionExtensionsResp{} - //resp.ClientMsgID = req.ClientMsgID - //resp.MsgFirstModifyTime = req.MsgFirstModifyTime - - if err := CallbackSetMessageReactionExtensions(ctx, req); err != nil { - return nil, err - } - //if ExternalExtension - if req.IsExternalExtensions { - resp.MsgFirstModifyTime = req.MsgFirstModifyTime - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false) - return resp, nil - } - isExists, err := m.MsgDatabase.JudgeMessageReactionExist(ctx, req.ClientMsgID, req.SessionType) - if err != nil { - return nil, err - } - - if !isExists { - if !req.IsReact { - resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() - for k, v := range req.ReactionExtensions { - err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k) - if err != nil { - return nil, err - } - v.LatestUpdateTime = utils.GetCurrentTimestampByMill() - if err := m.MsgDatabase.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil { - return nil, err - } - } - resp.IsReact = true - _, err := m.MsgDatabase.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) - if err != nil { - return nil, err - } - } else { - err := m.MessageLocker.LockGlobalMessage(ctx, req.ClientMsgID) - if err != nil { - return nil, err - } - mongoValue, err := m.MsgDatabase.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) - if err != nil { - return nil, err - } - setValue := make(map[string]*sdkws.KeyValue) - for k, v := range req.ReactionExtensions { - - temp := new(sdkws.KeyValue) - if vv, ok := mongoValue.ReactionExtensions[k]; ok { - utils.CopyStructFields(temp, &vv) - if v.LatestUpdateTime != vv.LatestUpdateTime { - setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp) - continue - } - } - temp.TypeKey = k - temp.Value = v.Value - temp.LatestUpdateTime = utils.GetCurrentTimestampByMill() - setValue[k] = temp - } - err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue) - if err != nil { - for _, value := range setValue { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - resp.Result = append(resp.Result, temp) - } - } else { - for _, value := range setValue { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - resp.Result = append(resp.Result, temp) - } - } - lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) - if lockErr != nil { - log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) - } - } - - } else { - log.Debug(req.OperationID, "redis handle secondly", req.String()) - - for k, v := range req.ReactionExtensionList { - err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) - if err != nil { - setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v) - continue - } - redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) - if err != nil && err != go_redis.Nil { - setKeyResultInfo(&resp, 200, err.Error(), req.ClientMsgID, k, v) - continue - } - temp := new(sdkws.KeyValue) - utils.JsonStringToStruct(redisValue, temp) - if v.LatestUpdateTime != temp.LatestUpdateTime { - setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp) - continue - } else { - v.LatestUpdateTime = utils.GetCurrentTimestampByMill() - newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) - if newerr != nil { - setKeyResultInfo(&resp, 201, newerr.Error(), req.ClientMsgID, k, temp) - continue - } - setKeyResultInfo(&resp, 0, "", req.ClientMsgID, k, v) - } - - } - } - if !isExists { - if !req.IsReact { - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, true, true) - } else { - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, false) - } - } else { - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, true) - } - log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", resp.String()) - return &resp, nil + //resp = &msg.SetMessageReactionExtensionsResp{} + ////resp.ClientMsgID = req.ClientMsgID + ////resp.MsgFirstModifyTime = req.MsgFirstModifyTime + // + //if err := CallbackSetMessageReactionExtensions(ctx, req); err != nil { + // return nil, err + //} + ////if ExternalExtension + //if req.IsExternalExtensions { + // resp.MsgFirstModifyTime = req.MsgFirstModifyTime + // notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false) + // return resp, nil + //} + //isExists, err := m.MsgDatabase.JudgeMessageReactionExist(ctx, req.ClientMsgID, req.SessionType) + //if err != nil { + // return nil, err + //} + // + //if !isExists { + // if !req.IsReact { + // resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() + // for k, v := range req.ReactionExtensions { + // err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k) + // if err != nil { + // return nil, err + // } + // v.LatestUpdateTime = utils.GetCurrentTimestampByMill() + // if err := m.MsgDatabase.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil { + // return nil, err + // } + // } + // resp.IsReact = true + // _, err := m.MsgDatabase.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) + // if err != nil { + // return nil, err + // } + // } else { + // err := m.MessageLocker.LockGlobalMessage(ctx, req.ClientMsgID) + // if err != nil { + // return nil, err + // } + // mongoValue, err := m.MsgDatabase.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) + // if err != nil { + // return nil, err + // } + // setValue := make(map[string]*sdkws.KeyValue) + // for k, v := range req.ReactionExtensions { + // + // temp := new(sdkws.KeyValue) + // if vv, ok := mongoValue.ReactionExtensions[k]; ok { + // utils.CopyStructFields(temp, &vv) + // if v.LatestUpdateTime != vv.LatestUpdateTime { + // setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp) + // continue + // } + // } + // temp.TypeKey = k + // temp.Value = v.Value + // temp.LatestUpdateTime = utils.GetCurrentTimestampByMill() + // setValue[k] = temp + // } + // err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue) + // if err != nil { + // for _, value := range setValue { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // temp.ErrMsg = err.Error() + // temp.ErrCode = 100 + // resp.Result = append(resp.Result, temp) + // } + // } else { + // for _, value := range setValue { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // resp.Result = append(resp.Result, temp) + // } + // } + // lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) + // if lockErr != nil { + // log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) + // } + // } + // + //} else { + // log.Debug(req.OperationID, "redis handle secondly", req.String()) + // + // for k, v := range req.ReactionExtensionList { + // err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) + // if err != nil { + // setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v) + // continue + // } + // redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) + // if err != nil && err != go_redis.Nil { + // setKeyResultInfo(&resp, 200, err.Error(), req.ClientMsgID, k, v) + // continue + // } + // temp := new(sdkws.KeyValue) + // utils.JsonStringToStruct(redisValue, temp) + // if v.LatestUpdateTime != temp.LatestUpdateTime { + // setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp) + // continue + // } else { + // v.LatestUpdateTime = utils.GetCurrentTimestampByMill() + // newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) + // if newerr != nil { + // setKeyResultInfo(&resp, 201, newerr.Error(), req.ClientMsgID, k, temp) + // continue + // } + // setKeyResultInfo(&resp, 0, "", req.ClientMsgID, k, v) + // } + // + // } + //} + //if !isExists { + // if !req.IsReact { + // notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, true, true) + // } else { + // notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, false) + // } + //} else { + // notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, true) + //} + //log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", resp.String()) + return resp, nil } -func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) { +func (m *msgServer) setKeyResultInfo(ctx context.Context, r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) { temp := new(msg.KeyValueResp) temp.KeyValue = keyValue temp.ErrCode = errCode temp.ErrMsg = errMsg r.Result = append(r.Result, temp) - _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) + _ = m.MessageLocker.UnLockMessageTypeKey(ctx, clientMsgID, typeKey) } -func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) { +func (m *msgServer) setDeleteKeyResultInfo(ctx context.Context, r *msg.DeleteMessagesReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) { temp := new(msg.KeyValueResp) temp.KeyValue = keyValue temp.ErrCode = errCode temp.ErrMsg = errMsg r.Result = append(r.Result, temp) - _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) + _ = m.MessageLocker.UnLockMessageTypeKey(ctx, clientMsgID, typeKey) } func (m *msgServer) GetMessagesReactionExtensions(ctx context.Context, req *msg.GetMessagesReactionExtensionsReq) (resp *msg.GetMessagesReactionExtensionsResp, err error) { - log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String()) - var rResp msg.GetMessageListReactionExtensionsResp - for _, messageValue := range req.MessageReactionKeyList { - var oneMessage msg.SingleMessageExtensionResult - oneMessage.ClientMsgID = messageValue.ClientMsgID - - isExists, err := db.DB.JudgeMessageReactionExist(messageValue.ClientMsgID, req.SessionType) - if err != nil { - rResp.ErrCode = 100 - rResp.ErrMsg = err.Error() - return &rResp, nil - } - if isExists { - redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType) - if err != nil { - oneMessage.ErrCode = 100 - oneMessage.ErrMsg = err.Error() - rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) - continue - } - keyMap := make(map[string]*sdkws.KeyValue) - - for k, v := range redisValue { - temp := new(sdkws.KeyValue) - utils.JsonStringToStruct(v, temp) - keyMap[k] = temp - } - oneMessage.ReactionExtensionList = keyMap - - } else { - mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime) - if err != nil { - oneMessage.ErrCode = 100 - oneMessage.ErrMsg = err.Error() - rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) - continue - } - keyMap := make(map[string]*sdkws.KeyValue) - - for k, v := range mongoValue.ReactionExtensionList { - temp := new(sdkws.KeyValue) - temp.TypeKey = v.TypeKey - temp.Value = v.Value - temp.LatestUpdateTime = v.LatestUpdateTime - keyMap[k] = temp - } - oneMessage.ReactionExtensionList = keyMap - } - rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) - } - log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String()) - return &rResp, nil + //log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String()) + //var rResp msg.GetMessageListReactionExtensionsResp + //for _, messageValue := range req.MessageReactionKeyList { + // var oneMessage msg.SingleMessageExtensionResult + // oneMessage.ClientMsgID = messageValue.ClientMsgID + // + // isExists, err := db.DB.JudgeMessageReactionExist(messageValue.ClientMsgID, req.SessionType) + // if err != nil { + // rResp.ErrCode = 100 + // rResp.ErrMsg = err.Error() + // return &rResp, nil + // } + // if isExists { + // redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType) + // if err != nil { + // oneMessage.ErrCode = 100 + // oneMessage.ErrMsg = err.Error() + // rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) + // continue + // } + // keyMap := make(map[string]*sdkws.KeyValue) + // + // for k, v := range redisValue { + // temp := new(sdkws.KeyValue) + // utils.JsonStringToStruct(v, temp) + // keyMap[k] = temp + // } + // oneMessage.ReactionExtensionList = keyMap + // + // } else { + // mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime) + // if err != nil { + // oneMessage.ErrCode = 100 + // oneMessage.ErrMsg = err.Error() + // rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) + // continue + // } + // keyMap := make(map[string]*sdkws.KeyValue) + // + // for k, v := range mongoValue.ReactionExtensionList { + // temp := new(sdkws.KeyValue) + // temp.TypeKey = v.TypeKey + // temp.Value = v.Value + // temp.LatestUpdateTime = v.LatestUpdateTime + // keyMap[k] = temp + // } + // oneMessage.ReactionExtensionList = keyMap + // } + // rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) + //} + //log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String()) + return resp, nil } @@ -219,149 +212,149 @@ func (m *msgServer) AddMessageReactionExtensions(ctx context.Context, req *msg.M } func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessagesReactionExtensionsReq) (resp *msg.DeleteMessagesReactionExtensionsResp, err error) { - log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String()) - var rResp msg.DeleteMessagesReactionExtensionsResp - callbackResp := notification.callbackDeleteMessageReactionExtensions(req) - if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 { - rResp.ErrCode = int32(callbackResp.ErrCode) - rResp.ErrMsg = callbackResp.ErrMsg - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = callbackResp.ErrMsg - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil - } - //if ExternalExtension - if req.IsExternalExtensions { - rResp.Result = callbackResp.ResultReactionExtensionList - notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false) - return &rResp, nil - - } - for _, v := range callbackResp.ResultReactionExtensions { - if v.ErrCode != 0 { - func(req *[]*sdkws.KeyValue, typeKey string) { - for i := 0; i < len(*req); i++ { - if (*req)[i].TypeKey == typeKey { - *req = append((*req)[:i], (*req)[i+1:]...) - } - } - }(&req.ReactionExtensionList, v.KeyValue.TypeKey) - rResp.Result = append(rResp.Result, v) - } - } - isExists, err := db.DB.JudgeMessageReactionExist(req.ClientMsgID, req.SessionType) - if err != nil { - rResp.ErrCode = 100 - rResp.ErrMsg = err.Error() - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil - } - - if isExists { - log.Debug(req.OperationID, "redis handle this delete", req.String()) - for _, v := range req.ReactionExtensionList { - err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey) - if err != nil { - setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v) - continue - } - - redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey) - if err != nil && err != go_redis.Nil { - setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v) - continue - } - temp := new(sdkws.KeyValue) - utils.JsonStringToStruct(redisValue, temp) - if v.LatestUpdateTime != temp.LatestUpdateTime { - setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp) - continue - } else { - newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey) - if newErr != nil { - setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp) - continue - } - setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v) - } - } - } else { - err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID) - if err != nil { - rResp.ErrCode = 100 - rResp.ErrMsg = err.Error() - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil - } - mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) - if err != nil { - rResp.ErrCode = 200 - rResp.ErrMsg = err.Error() - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil - } - setValue := make(map[string]*sdkws.KeyValue) - for _, v := range req.ReactionExtensionList { - - temp := new(sdkws.KeyValue) - if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok { - utils.CopyStructFields(temp, &vv) - if v.LatestUpdateTime != vv.LatestUpdateTime { - setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp) - continue - } - } else { - setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v) - continue - } - temp.TypeKey = v.TypeKey - setValue[v.TypeKey] = temp - } - err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue) - if err != nil { - for _, value := range setValue { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - } else { - for _, value := range setValue { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - rResp.Result = append(rResp.Result, temp) - } - } - lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) - if lockErr != nil { - log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) - } - - } - notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists) - log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String()) - return &rResp, nil + //log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String()) + //var rResp msg.DeleteMessagesReactionExtensionsResp + //callbackResp := notification.callbackDeleteMessageReactionExtensions(req) + //if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 { + // rResp.ErrCode = int32(callbackResp.ErrCode) + // rResp.ErrMsg = callbackResp.ErrMsg + // for _, value := range req.ReactionExtensionList { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // temp.ErrMsg = callbackResp.ErrMsg + // temp.ErrCode = 100 + // rResp.Result = append(rResp.Result, temp) + // } + // return &rResp, nil + //} + ////if ExternalExtension + //if req.IsExternalExtensions { + // rResp.Result = callbackResp.ResultReactionExtensionList + // notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false) + // return &rResp, nil + // + //} + //for _, v := range callbackResp.ResultReactionExtensions { + // if v.ErrCode != 0 { + // func(req *[]*sdkws.KeyValue, typeKey string) { + // for i := 0; i < len(*req); i++ { + // if (*req)[i].TypeKey == typeKey { + // *req = append((*req)[:i], (*req)[i+1:]...) + // } + // } + // }(&req.ReactionExtensionList, v.KeyValue.TypeKey) + // rResp.Result = append(rResp.Result, v) + // } + //} + //isExists, err := db.DB.JudgeMessageReactionExist(req.ClientMsgID, req.SessionType) + //if err != nil { + // rResp.ErrCode = 100 + // rResp.ErrMsg = err.Error() + // for _, value := range req.ReactionExtensionList { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // temp.ErrMsg = err.Error() + // temp.ErrCode = 100 + // rResp.Result = append(rResp.Result, temp) + // } + // return &rResp, nil + //} + // + //if isExists { + // log.Debug(req.OperationID, "redis handle this delete", req.String()) + // for _, v := range req.ReactionExtensionList { + // err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey) + // if err != nil { + // setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v) + // continue + // } + // + // redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey) + // if err != nil && err != go_redis.Nil { + // setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v) + // continue + // } + // temp := new(sdkws.KeyValue) + // utils.JsonStringToStruct(redisValue, temp) + // if v.LatestUpdateTime != temp.LatestUpdateTime { + // setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp) + // continue + // } else { + // newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey) + // if newErr != nil { + // setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp) + // continue + // } + // setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v) + // } + // } + //} else { + // err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID) + // if err != nil { + // rResp.ErrCode = 100 + // rResp.ErrMsg = err.Error() + // for _, value := range req.ReactionExtensionList { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // temp.ErrMsg = err.Error() + // temp.ErrCode = 100 + // rResp.Result = append(rResp.Result, temp) + // } + // return &rResp, nil + // } + // mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) + // if err != nil { + // rResp.ErrCode = 200 + // rResp.ErrMsg = err.Error() + // for _, value := range req.ReactionExtensionList { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // temp.ErrMsg = err.Error() + // temp.ErrCode = 100 + // rResp.Result = append(rResp.Result, temp) + // } + // return &rResp, nil + // } + // setValue := make(map[string]*sdkws.KeyValue) + // for _, v := range req.ReactionExtensionList { + // + // temp := new(sdkws.KeyValue) + // if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok { + // utils.CopyStructFields(temp, &vv) + // if v.LatestUpdateTime != vv.LatestUpdateTime { + // setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp) + // continue + // } + // } else { + // setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v) + // continue + // } + // temp.TypeKey = v.TypeKey + // setValue[v.TypeKey] = temp + // } + // err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue) + // if err != nil { + // for _, value := range setValue { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // temp.ErrMsg = err.Error() + // temp.ErrCode = 100 + // rResp.Result = append(rResp.Result, temp) + // } + // } else { + // for _, value := range setValue { + // temp := new(msg.KeyValueResp) + // temp.KeyValue = value + // rResp.Result = append(rResp.Result, temp) + // } + // } + // lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) + // if lockErr != nil { + // log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) + // } + // + //} + //notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists) + //log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String()) + return resp, nil } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 1a4c5df04..7020d0924 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -52,7 +52,7 @@ type GroupCacheRedis struct { rcClient *rockscache.Client } -func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelation2.SuperGroupModelInterface, opts rockscache.Options) GroupCacheRedisInterface { +func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelation2.SuperGroupModelInterface, opts rockscache.Options) GroupCache { return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime, group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, mongoDB: mongoClient, diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 16b3b99f0..06922ba13 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -218,7 +218,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList [] } } if len(failedMsgs) != 0 { - return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx))) + return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedMsgs, tracelog.GetOperationID(ctx))) } _, err := pipe.Exec(ctx) return 0, err diff --git a/pkg/proto/friend/friend.proto b/pkg/proto/friend/friend.proto index c8f363fc2..59de54546 100644 --- a/pkg/proto/friend/friend.proto +++ b/pkg/proto/friend/friend.proto @@ -134,6 +134,14 @@ message getPaginationFriendsApplyFromResp{ int32 total = 2; } +message getFriendIDsReq { + string userID = 1; +} + +message getFriendIDsResp { + repeated string = 1; +} + service friend{ //申请加好友 rpc applyToAddFriend(applyToAddFriendReq) returns(applyToAddFriendResp); @@ -163,9 +171,6 @@ service friend{ rpc getDesignatedFriends(getDesignatedFriendsReq) returns(getDesignatedFriendsResp); //获取指定好友信息 有id不存在也返回错误 rpc getPaginationFriends(getPaginationFriendsReq) returns (getPaginationFriendsResp); - - - - - + // 获取好友ID列表 + rpc getFriendIDs(getFriendIDsReq) returns (getFriendIDsResp); } \ No newline at end of file diff --git a/pkg/proto/group/group.proto b/pkg/proto/group/group.proto index c3d980e08..01769468e 100644 --- a/pkg/proto/group/group.proto +++ b/pkg/proto/group/group.proto @@ -283,11 +283,11 @@ message GetUserInGroupMembersResp{ repeated sdkws.GroupMemberFullInfo members = 1; } -message GetGroupMemberUserIDReq{ +message GetGroupMemberUserIDsReq{ string groupID = 1; } -message GetGroupMemberUserIDResp{ +message GetGroupMemberUserIDsResp{ repeated string userIDs = 1; } @@ -356,7 +356,7 @@ service group{ //获取某个用户在指定群中的信息 rpc getUserInGroupMembers(GetUserInGroupMembersReq) returns (GetUserInGroupMembersResp); //获取群成员用户ID - rpc getGroupMemberUserID(GetGroupMemberUserIDReq) returns (GetGroupMemberUserIDResp); + rpc getGroupMemberUserIDs(GetGroupMemberUserIDsReq) returns (GetGroupMemberUserIDsResp); //查询群组中对应级别的成员 rpc GetGroupMemberRoleLevel(GetGroupMemberRoleLevelReq)returns (GetGroupMemberRoleLevelResp); }