diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index ba3ad3a7c..07e974540 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -14,107 +14,50 @@ import ( ) func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) { - log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String()) - var rResp msg.SetMessageReactionExtensionsResp - rResp.ClientMsgID = req.ClientMsgID - rResp.MsgFirstModifyTime = req.MsgFirstModifyTime - callbackResp := notification.callbackSetMessageReactionExtensions(req) - if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 { - rResp.ErrCode = int32(callbackResp.ErrCode) - rResp.ErrMsg = callbackResp.ErrMsg - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = callbackResp.ErrMsg - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil + resp = &msg.SetMessageReactionExtensionsResp{} + //resp.ClientMsgID = req.ClientMsgID + //resp.MsgFirstModifyTime = req.MsgFirstModifyTime + + if err := CallbackSetMessageReactionExtensions(ctx, req); err != nil { + return nil, err } //if ExternalExtension if req.IsExternalExtensions { - var isHistory bool - if req.IsReact { - isHistory = false - } else { - isHistory = true - } - rResp.MsgFirstModifyTime = callbackResp.MsgFirstModifyTime - rResp.Result = callbackResp.ResultReactionExtensionList - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, isHistory, false) - return &rResp, nil - } - for _, v := range callbackResp.ResultReactionExtensionList { - if v.ErrCode == 0 { - req.ReactionExtensionList[v.KeyValue.TypeKey] = v.KeyValue - } else { - delete(req.ReactionExtensionList, v.KeyValue.TypeKey) - rResp.Result = append(rResp.Result, v) - } + resp.MsgFirstModifyTime = req.MsgFirstModifyTime + notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false) + return resp, nil } - isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType) + isExists, err := m.MsgInterface.JudgeMessageReactionEXISTS(ctx, req.ClientMsgID, req.SessionType) if err != nil { - rResp.ErrCode = 100 - rResp.ErrMsg = err.Error() - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil + return nil, err } if !isExists { if !req.IsReact { - log.Debug(req.OperationID, "redis handle firstly", req.String()) - rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() + resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() for k, v := range req.ReactionExtensionList { - err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) + err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k) if err != nil { - setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) - continue + return nil, err } v.LatestUpdateTime = utils.GetCurrentTimestampByMill() - newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) - if newerr != nil { - setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, v) - continue + if err := m.MsgInterface.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil { + return nil, err } - setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v) } - rResp.IsReact = true - _, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) + resp.IsReact = true + _, err := m.MsgInterface.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) if err != nil { - log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String()) + return nil, err } } else { - err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID) + err := m.MessageLocker.LockGlobalMessage(ctx, req.ClientMsgID) if err != nil { - rResp.ErrCode = 100 - rResp.ErrMsg = err.Error() - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil + return nil, err } - mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) + mongoValue, err := m.MsgInterface.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime) if err != nil { - rResp.ErrCode = 200 - rResp.ErrMsg = err.Error() - for _, value := range req.ReactionExtensionList { - temp := new(msg.KeyValueResp) - temp.KeyValue = value - temp.ErrMsg = err.Error() - temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) - } - return &rResp, nil + return nil, err } setValue := make(map[string]*sdkws.KeyValue) for k, v := range req.ReactionExtensionList { @@ -123,7 +66,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S if vv, ok := mongoValue.ReactionExtensionList[k]; ok { utils.CopyStructFields(temp, &vv) if v.LatestUpdateTime != vv.LatestUpdateTime { - setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp) + setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp) continue } } @@ -139,13 +82,13 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S temp.KeyValue = value temp.ErrMsg = err.Error() temp.ErrCode = 100 - rResp.Result = append(rResp.Result, temp) + resp.Result = append(resp.Result, temp) } } else { for _, value := range setValue { temp := new(msg.KeyValueResp) temp.KeyValue = value - rResp.Result = append(rResp.Result, temp) + resp.Result = append(resp.Result, temp) } } lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) @@ -160,42 +103,42 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S for k, v := range req.ReactionExtensionList { err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) if err != nil { - setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) + setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v) continue } redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) if err != nil && err != go_redis.Nil { - setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v) + setKeyResultInfo(&resp, 200, err.Error(), req.ClientMsgID, k, v) continue } temp := new(sdkws.KeyValue) utils.JsonStringToStruct(redisValue, temp) if v.LatestUpdateTime != temp.LatestUpdateTime { - setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp) + setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp) continue } else { v.LatestUpdateTime = utils.GetCurrentTimestampByMill() newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) if newerr != nil { - setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp) + setKeyResultInfo(&resp, 201, newerr.Error(), req.ClientMsgID, k, temp) continue } - setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v) + setKeyResultInfo(&resp, 0, "", req.ClientMsgID, k, v) } } } if !isExists { if !req.IsReact { - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true, true) + notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, true, true) } else { - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false) + notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, false) } } else { - notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true) + notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, true) } - log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String()) - return &rResp, nil + log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", resp.String()) + return &resp, nil } func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) { diff --git a/internal/rpc/msg/extend_msg_callback.go b/internal/rpc/msg/extend_msg_callback.go index 4027cfa4a..790ee017a 100644 --- a/internal/rpc/msg/extend_msg_callback.go +++ b/internal/rpc/msg/extend_msg_callback.go @@ -27,7 +27,11 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe MsgFirstModifyTime: setReq.MsgFirstModifyTime, } resp := &cbapi.CallbackBeforeSetMessageReactionExtResp{} - return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) + if err := http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil { + return err + } + setReq.MsgFirstModifyTime = resp.MsgFirstModifyTime + return nil } func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) error { diff --git a/internal/rpc/msg/lock.go b/internal/rpc/msg/lock.go index bee152b15..7c4677dba 100644 --- a/internal/rpc/msg/lock.go +++ b/internal/rpc/msg/lock.go @@ -1,16 +1,17 @@ package msg import ( + "context" "time" ) const GlOBLLOCK = "GLOBAL_LOCK" type MessageLocker interface { - LockMessageTypeKey(clientMsgID, typeKey string) (err error) - UnLockMessageTypeKey(clientMsgID string, typeKey string) error - LockGlobalMessage(clientMsgID string) (err error) - UnLockGlobalMessage(clientMsgID string) (err error) + LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) + UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error + LockGlobalMessage(ctx context.Context, clientMsgID string) (err error) + UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error) } type LockerMessage struct{} diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index cba7e619c..da57150e2 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -13,6 +13,7 @@ import ( ) func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { + resp = &msg.SendMsgResp{} promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) // callback if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index f9304b5f0..7423a393f 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,13 +16,14 @@ import ( type msgServer struct { RegisterCenter discoveryRegistry.SvcDiscoveryRegistry - MsgInterface controller.MsgInterface + MsgInterface controller.MsgDatabaseInterface Group *check.GroupChecker User *check.UserCheck Conversation *check.ConversationChecker friend *check.FriendChecker *localcache.GroupLocalCache - black *check.BlackChecker + black *check.BlackChecker + MessageLocker MessageLocker } type deleteMsg struct { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index e1c45dbfb..b8ed937e5 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -10,6 +10,7 @@ import ( "Open_IM/pkg/common/tracelog" "github.com/gogo/protobuf/sortkeys" "sync" + "time" pbMsg "Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/sdkws" @@ -22,96 +23,96 @@ import ( "github.com/golang/protobuf/proto" ) -type MsgInterface interface { - // 批量插入消息到db - BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error - // 刪除redis中消息缓存 - DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error - // incrSeq然后批量插入缓存 - BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) - // 删除消息 返回不存在的seqList - DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) - // 通过seqList获取db中写扩散消息 - GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) - // 通过seqList获取大群在db里面的消息 没找到返回错误 - GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) - // 删除用户所有消息/cache/db然后重置seq - CleanUpUserMsg(ctx context.Context, userID string) error - // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) - DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error - // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) - DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error - // 获取用户 seq mongo和redis - GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) - // 获取群 seq mongo和redis - GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) - // 设置群用户最小seq 直接调用cache - SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - // 设置用户最小seq 直接调用cache - SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) - - MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error) -} - -func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface { - return &MsgController{} -} - -type MsgController struct { - database MsgDatabase -} - -func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { - return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq) -} - -func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error { - return m.database.DeleteMessageFromCache(ctx, sourceID, msgList) -} - -func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { - return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList) -} - -func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { - return m.database.DelMsgBySeqs(ctx, userID, seqs) -} - -func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { - return m.database.GetMsgBySeqs(ctx, userID, seqs) -} - -func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { - return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs) -} - -func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error { - return m.database.CleanUpUserMsg(ctx, userID) -} - -func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { - return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime) -} - -func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { - return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime) -} - -func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { - return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID) -} - -func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { - return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) -} - -func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { - return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) -} - -func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { - return m.database.SetUserMinSeq(ctx, userID, minSeq) -} +//type MsgInterface interface { +// // 批量插入消息到db +// BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error +// // 刪除redis中消息缓存 +// DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error +// // incrSeq然后批量插入缓存 +// BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) +// // 删除消息 返回不存在的seqList +// DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) +// // 通过seqList获取db中写扩散消息 +// GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) +// // 通过seqList获取大群在db里面的消息 没找到返回错误 +// GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) +// // 删除用户所有消息/cache/db然后重置seq +// CleanUpUserMsg(ctx context.Context, userID string) error +// // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) +// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error +// // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) +// DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error +// // 获取用户 seq mongo和redis +// GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) +// // 获取群 seq mongo和redis +// GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) +// // 设置群用户最小seq 直接调用cache +// SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) +// // 设置用户最小seq 直接调用cache +// SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) +// +// MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error) +//} +// +//func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface { +// return &MsgController{} +//} +// +//type MsgController struct { +// database MsgDatabase +//} +// +//func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { +// return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq) +//} +// +//func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error { +// return m.database.DeleteMessageFromCache(ctx, sourceID, msgList) +//} +// +//func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { +// return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList) +//} +// +//func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { +// return m.database.DelMsgBySeqs(ctx, userID, seqs) +//} +// +//func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +// return m.database.GetMsgBySeqs(ctx, userID, seqs) +//} +// +//func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +// return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs) +//} +// +//func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error { +// return m.database.CleanUpUserMsg(ctx, userID) +//} +// +//func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { +// return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime) +//} +// +//func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { +// return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime) +//} +// +//func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { +// return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID) +//} +// +//func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { +// return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) +//} +// +//func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { +// return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) +//} +// +//func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { +// return m.database.SetUserMinSeq(ctx, userID, minSeq) +//} type MsgDatabaseInterface interface { // 批量插入消息 @@ -141,8 +142,14 @@ type MsgDatabaseInterface interface { SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) // 设置用户最小seq 直接调用cache SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) -} + JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) + + SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error + + SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) + GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) +} type MsgDatabase struct { mgo unRelationTb.MsgDocModelInterface cache cache.Cache