diff --git a/internal/common/notification/msg.go b/internal/common/notification/msg.go index 137faf02f..8eb6c0213 100644 --- a/internal/common/notification/msg.go +++ b/internal/common/notification/msg.go @@ -8,8 +8,8 @@ import ( "github.com/golang/protobuf/proto" ) -func (c *Check) DeleteMessageNotification(ctx context.Context, userID string, seqList []uint32, operationID string) { - DeleteMessageTips := sdkws.DeleteMessageTips{UserID: userID, SeqList: seqList} +func (c *Check) DeleteMessageNotification(ctx context.Context, userID string, seqs []int64, operationID string) { + DeleteMessageTips := sdkws.DeleteMessageTips{UserID: userID, Seqs: seqs} c.MessageNotification(ctx, userID, userID, constant.DeleteMessageNotification, &DeleteMessageTips) } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index dfa58f7ef..7f0df6f66 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -626,7 +626,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) if group.GroupType == constant.SuperGroup { return nil, constant.ErrGroupTypeNotSupport.Wrap() } - user, err := relation.GetUserByUserID(tracelog.GetOpUserID(ctx)) + user, err := s.UserCheck.GetUsersInfo(ctx, tracelog.GetOpUserID(ctx)) if err != nil { return nil, err } diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index aedc616b6..bcd72b529 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -1,10 +1,10 @@ package cache import ( + "Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" - "encoding/json" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" "time" @@ -26,6 +26,7 @@ type BlackCache interface { type BlackCacheRedis struct { expireTime time.Duration rcClient *rockscache.Client + black *relation.BlackGorm } func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB BlackCache, options rockscache.Options) *BlackCacheRedis { @@ -40,31 +41,14 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { } func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { - getBlackIDList := func() (string, error) { - blackIDs, err := b.blackDB.GetBlackIDs(ctx, userID) - if err != nil { - return "", utils.Wrap(err, "") - } - bytes, err := json.Marshal(blackIDs) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "blackIDList", blackIDs) - }() - blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, b.expireTime, getBlackIDList) - if err != nil { - return nil, utils.Wrap(err, "") - } - err = json.Unmarshal([]byte(blackIDListStr), &blackIDs) - return blackIDs, utils.Wrap(err, "") + return GetCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) { + return b.black.FindBlackUserIDs(ctx, userID) + }) } func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) }() - return b.rcClient.TagAsDeleted(blackListCache + userID) + return b.rcClient.TagAsDeleted(b.getBlackIDsKey(userID)) } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 94e07f78e..6d19bddb1 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -6,10 +6,8 @@ import ( "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" - "encoding/json" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" - "strconv" "time" ) @@ -53,7 +51,8 @@ type ConversationCache interface { } type ConversationRedis struct { - rcClient *rockscache.Client + rcClient *rockscache.Client + expireTime time.Duration } func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error { @@ -88,7 +87,7 @@ func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) return superGroupRecvMsgNotNotifyUserIDsKey + groupID } -func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string, f func(userID string) ([]string, error)) (conversationIDs []string, err error) { +func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { //getConversationIDs := func() (string, error) { // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) // if err != nil { @@ -110,7 +109,7 @@ func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUse //} //return conversationIDs, nil return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) { - return f(ownerUserID) + panic("implement me") }) } @@ -172,28 +171,10 @@ func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUse return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err") } -func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.Conversation, err error) { - getConversation := func() (string, error) { - conversation, err := relation.GetConversation(ownerUserID, conversationID) - if err != nil { - return "", err - } - bytes, err := json.Marshal(conversation) - if err != nil { - return "", utils.Wrap(err, "conversation Marshal failed") - } - return string(bytes), nil - } - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "conversation", *conversation) - }() - conversationStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) - if err != nil { - return nil, err - } - conversation = &relationTb.ConversationModel{} - err = json.Unmarshal([]byte(conversationStr), &conversation) - return conversation, utils.Wrap(err, "Unmarshal failed") +func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) { + return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) { + panic("implement me") + }) } func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { @@ -237,21 +218,25 @@ func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUs } func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { - getConversation := func() (string, error) { - conversation, err := relation.GetConversation(ownerUserID, conversationID) - if err != nil { - return "", err - } - return strconv.Itoa(int(conversation.RecvMsgOpt)), nil - } - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) - }() - optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) - if err != nil { - return 0, err - } - return strconv.Atoi(optStr) + //getConversation := func() (string, error) { + // conversation, err := relation.GetConversation(ownerUserID, conversationID) + // if err != nil { + // return "", err + // } + // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil + //} + //defer func() { + // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) + //}() + //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) + //if err != nil { + // return 0, err + //} + //return strconv.Atoi(optStr) + // panic("implement me") + return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) { + panic("implement me") + }) } func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { @@ -259,17 +244,17 @@ func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, } func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { - return nil, nil + panic("implement me") } func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { - return nil + panic("implement me") } func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) { - return + panic("implement me") } func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) { - return + panic("implement me") } diff --git a/pkg/common/db/cache/extend_msg_set.go b/pkg/common/db/cache/extend_msg_set.go index 6a02f0af3..8d254f54f 100644 --- a/pkg/common/db/cache/extend_msg_set.go +++ b/pkg/common/db/cache/extend_msg_set.go @@ -19,34 +19,42 @@ type ExtendMsgSetCache struct { rcClient *rockscache.Client } -func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsg, err error) { - getExtendMsg := func() (string, error) { - extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime) - if err != nil { - return "", utils.Wrap(err, "GetExtendMsgList failed") - } - bytes, err := json.Marshal(extendMsg) - if err != nil { - return "", utils.Wrap(err, "Marshal failed") - } - return string(bytes), nil - } - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType", - sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg) - }() - extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg) - if err != nil { - return nil, utils.Wrap(err, "Fetch failed") - } - extendMsg = &mongoDB.ExtendMsg{} - err = json.Unmarshal([]byte(extendMsgStr), extendMsg) - return extendMsg, utils.Wrap(err, "Unmarshal failed") +func (e *ExtendMsgSetCache) getKey(clientMsgID string) string { + return extendMsgCache + clientMsgID +} + +func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) { + //getExtendMsg := func() (string, error) { + // extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime) + // if err != nil { + // return "", utils.Wrap(err, "GetExtendMsgList failed") + // } + // bytes, err := json.Marshal(extendMsg) + // if err != nil { + // return "", utils.Wrap(err, "Marshal failed") + // } + // return string(bytes), nil + //} + //defer func() { + // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType", + // sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg) + //}() + //extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg) + //if err != nil { + // return nil, utils.Wrap(err, "Fetch failed") + //} + //extendMsg = &mongoDB.ExtendMsg{} + //err = json.Unmarshal([]byte(extendMsgStr), extendMsg) + //return extendMsg, utils.Wrap(err, "Unmarshal failed") + return GetCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) { + panic("") + }) + } func (e *ExtendMsgSetCache) DelExtendMsg(ctx context.Context, clientMsgID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "clientMsgID", clientMsgID) }() - return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+clientMsgID), "DelExtendMsg err") + return utils.Wrap(e.rcClient.TagAsDeleted(e.getKey(clientMsgID)), "DelExtendMsg err") } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 5f7fda202..dfe33f653 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -56,27 +56,10 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string return friendKey + ownerUserID + "-" + friendUserID } -func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserID string) (friendIDs []string, err error)) (friendIDs []string, err error) { - getFriendIDs := func() (string, error) { - friendIDs, err := f.friendDB.GetFriendIDs(ctx, ownerUserID) - if err != nil { - return "", err - } - bytes, err := json.Marshal(friendIDs) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendIDs", friendIDs) - }() - friendIDsStr, err := f.rcClient.Fetch(f.getFriendIDsKey(ownerUserID), f.expireTime, getFriendIDs) - if err != nil { - return nil, err - } - err = json.Unmarshal([]byte(friendIDsStr), &friendIDs) - return friendIDs, utils.Wrap(err, "") +func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { + return GetCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) { + return f.friendDB.FindFriendUserIDs(ctx, ownerUserID) + }) } func (f *FriendCacheRedis) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) { diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index c649368f5..31e3f9517 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -139,6 +139,10 @@ func NewRedisClient(rdb redis.UniversalClient) *RedisClient { return &RedisClient{rdb: rdb} } +func (r *RedisClient) GetClient() redis.UniversalClient { + return r.rdb +} + // Perform seq auto-increment operation of user messages func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) { key := userIncrSeq + uid diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index 066c7367f..1984113fe 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -10,38 +10,36 @@ import ( const scanCount = 3000 - -func (rc *RcClient) DelKeys() { - for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, - groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} { - fName := utils.GetSelfFuncName() - var cursor uint64 - var n int - for { - var keys []string - var err error - keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() - if err != nil { - panic(err.Error()) - } - n += len(keys) - // for each for redis cluster - for _, key := range keys { - if err = rc.rdb.Del(context.Background(), key).Err(); err != nil { - log.NewError("", fName, key, err.Error()) - err = rc.rdb.Del(context.Background(), key).Err() - if err != nil { - panic(err.Error()) - } - } - } - if cursor == 0 { - break - } - } - } -} - +//func (rc *RcClient) DelKeys() { +// for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, +// groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} { +// fName := utils.GetSelfFuncName() +// var cursor uint64 +// var n int +// for { +// var keys []string +// var err error +// keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() +// if err != nil { +// panic(err.Error()) +// } +// n += len(keys) +// // for each for redis cluster +// for _, key := range keys { +// if err = rc.rdb.Del(context.Background(), key).Err(); err != nil { +// log.NewError("", fName, key, err.Error()) +// err = rc.rdb.Del(context.Background(), key).Err() +// if err != nil { +// panic(err.Error()) +// } +// } +// } +// if cursor == 0 { +// break +// } +// } +// } +//} func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { var t T diff --git a/pkg/common/db/relation/black_model.go b/pkg/common/db/relation/black_model.go index 2972c15fa..07498bb48 100644 --- a/pkg/common/db/relation/black_model.go +++ b/pkg/common/db/relation/black_model.go @@ -76,3 +76,10 @@ func (b *BlackGorm) FindOwnerBlacks(ctx context.Context, ownerUserID string, pag err = utils.Wrap(b.DB.Model(&relation.BlackModel{}).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&blacks).Error, "") return } + +func (b *BlackGorm) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "blackUserIDs", blackUserIDs) + }() + return blackUserIDs, utils.Wrap(b.DB.Model(&relation.BlackModel{}).Where("owner_user_id = ?", blackUserIDs).Pluck("block_user_id", &blackUserIDs).Error, "") +} diff --git a/pkg/common/db/relation/friend_model.go b/pkg/common/db/relation/friend_model.go index b452bae4e..a6f423d16 100644 --- a/pkg/common/db/relation/friend_model.go +++ b/pkg/common/db/relation/friend_model.go @@ -133,3 +133,10 @@ func (f *FriendGorm) FindInWhoseFriends(ctx context.Context, friendUserID string err = utils.Wrap(getDBConn(f.DB, tx).Where("friend_user_id = ? ", friendUserID).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&friends).Error, "") return } + +func (f *FriendGorm) FindFriendUserIDs(ctx context.Context, ownerUserID string, tx ...any) (friendUserIDs []string, err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserIDs", friendUserIDs) + }() + return friendUserIDs, utils.Wrap(getDBConn(f.DB, tx).Model(&relation.FriendModel{}).Where("owner_user_id = ? ", ownerUserID).Pluck("friend_user_id", &friendUserIDs).Error, "") +}