From 7cb011fd5ea5d3ef1728b6ace4eb95ee55ed9c8d Mon Sep 17 00:00:00 2001 From: cncsmonster Date: Mon, 23 Oct 2023 00:11:17 +0800 Subject: [PATCH] fix: fix lint erros in pkg/common/db/cache --- pkg/common/db/cache/black.go | 2 + pkg/common/db/cache/conversation.go | 32 +++++-- pkg/common/db/cache/friend.go | 29 +++--- pkg/common/db/cache/group.go | 59 ++++++++---- pkg/common/db/cache/meta_cache.go | 10 ++ pkg/common/db/cache/msg.go | 61 ++++++++---- pkg/common/db/cache/user.go | 141 ++++++++++++++++------------ 7 files changed, 217 insertions(+), 117 deletions(-) diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index 6da7d5d05..d1abe945c 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -52,6 +52,7 @@ func NewBlackCacheRedis( options rockscache.Options, ) BlackCache { rcClient := rockscache.NewClient(rdb, options) + return &BlackCacheRedis{ expireTime: blackExpireTime, rcClient: rcClient, @@ -88,5 +89,6 @@ func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (black func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache { cache := b.NewCache() cache.AddKeys(b.getBlackIDsKey(userID)) + return cache } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index d755de645..890552160 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -89,6 +89,7 @@ func NewConversationRedis( db relationtb.ConversationModelInterface, ) ConversationCache { rcClient := rockscache.NewClient(rdb, opts) + return &ConversationRedisCache{ rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), @@ -110,6 +111,7 @@ func NewNewConversationRedis( options rockscache.Options, ) ConversationCache { rcClient := rockscache.NewClient(rdb, options) + return &ConversationRedisCache{ rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), @@ -168,12 +170,13 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own } func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, c.getConversationIDsKey(userID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } @@ -198,18 +201,20 @@ func (c *ConversationRedisCache) GetUserConversationIDsHash( utils.Sort(conversationIDs, true) bi := big.NewInt(0) bi.SetString(utils.Md5(strings.Join(conversationIDs, ";"))[0:8], 16) + return bi.Uint64(), nil }, ) } func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(ownerUserIDs)) for _, ownerUserID := range ownerUserIDs { keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } @@ -229,12 +234,13 @@ func (c *ConversationRedisCache) GetConversation( } func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(conversationIDs)) for _, conversationID := range conversationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } @@ -248,6 +254,7 @@ func (c *ConversationRedisCache) getConversationIndex( return _i, nil } } + return 0, errors.New("not found key:" + key + " in keys") } @@ -256,10 +263,11 @@ func (c *ConversationRedisCache) GetConversations( ownerUserID string, conversationIDs []string, ) ([]*relationtb.ConversationModel, error) { - var keys []string + keys := make([]string, 0, len(conversationIDs)) for _, conversarionID := range conversationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) } + return batchGetCache( ctx, c.rcClient, @@ -280,10 +288,11 @@ func (c *ConversationRedisCache) GetUserAllConversations( if err != nil { return nil, err } - var keys []string + keys := make([]string, 0, len(conversationIDs)) for _, conversarionID := range conversationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) } + return batchGetCache( ctx, c.rcClient, @@ -327,24 +336,27 @@ func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs( } func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache { - var keys []string + keys := make([]string, 0, len(ownerUserIDs)) for _, ownerUserID := range ownerUserIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) } cache := c.NewCache() cache.AddKeys(keys...) + return cache } func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache { cache := c.NewCache() cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID)) + return cache } func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache { cache := c.NewCache() cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID)) + return cache } @@ -365,6 +377,7 @@ func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash( utils.Sort(userIDs, true) bi := big.NewInt(0) bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) + return bi.Uint64(), nil }, ) @@ -373,6 +386,7 @@ func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash( func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache { cache := c.NewCache() cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID)) + return cache } @@ -385,6 +399,7 @@ func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex( return _i, nil } } + return 0, errors.New("not found key:" + conversationID + " in keys") } @@ -396,10 +411,11 @@ func (c *ConversationRedisCache) GetUserAllHasReadSeqs( if err != nil { return nil, err } - var keys []string + keys := make([]string, 0, len(conversationIDs)) for _, conversarionID := range conversationIDs { keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID)) } + return batchGetCacheMap( ctx, c.rcClient, @@ -420,6 +436,7 @@ func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, for _, conversationID := range conversationIDs { cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID)) } + return cache } @@ -451,5 +468,6 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers for _, conversationID := range conversationIDs { cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID)) } + return cache } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index fd8c1d3c0..37f5b0a98 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -59,6 +59,7 @@ func NewFriendCacheRedis( options rockscache.Options, ) FriendCache { rcClient := rockscache.NewClient(rdb, options) + return &FriendCacheRedis{ metaCache: NewMetaCacheRedis(rcClient), friendDB: friendDB, @@ -100,14 +101,15 @@ func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) ) } -func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache { - new := f.NewCache() - var keys []string - for _, userID := range ownerUserID { +func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) FriendCache { + newGroupCache := f.NewCache() + keys := make([]string, 0, len(ownerUserIDs)) + for _, userID := range ownerUserIDs { keys = append(keys, f.getFriendIDsKey(userID)) } - new.AddKeys(keys...) - return new + newGroupCache.AddKeys(keys...) + + return newGroupCache } // todo. @@ -128,13 +130,15 @@ func (f *FriendCacheRedis) GetTwoWayFriendIDs( twoWayFriendIDs = append(twoWayFriendIDs, ownerUserID) } } + return twoWayFriendIDs, nil } func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache { - new := f.NewCache() - new.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID)) - return new + newFriendCache := f.NewCache() + newFriendCache.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID)) + + return newFriendCache } func (f *FriendCacheRedis) GetFriend( @@ -153,7 +157,8 @@ func (f *FriendCacheRedis) GetFriend( } func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache { - new := f.NewCache() - new.AddKeys(f.getFriendKey(ownerUserID, friendUserID)) - return new + newFriendCache := f.NewCache() + newFriendCache.AddKeys(f.getFriendKey(ownerUserID, friendUserID)) + + return newFriendCache } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 7d4c2b043..0505241d0 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -109,6 +109,7 @@ func NewGroupCacheRedis( opts rockscache.Options, ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) + return &GroupCacheRedis{ rcClient: rcClient, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, @@ -169,6 +170,7 @@ func (g *GroupCacheRedis) GetGroupIndex(group *relationtb.GroupModel, keys []str return i, nil } } + return 0, errIndex } @@ -179,6 +181,7 @@ func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationtb.GroupMembe return i, nil } } + return 0, errIndex } @@ -187,10 +190,11 @@ func (g *GroupCacheRedis) GetGroupsInfo( ctx context.Context, groupIDs []string, ) (groups []*relationtb.GroupModel, err error) { - var keys []string + keys := make([]string, 0, len(groupIDs)) for _, group := range groupIDs { keys = append(keys, g.getGroupInfoKey(group)) } + return batchGetCache( ctx, g.rcClient, @@ -216,13 +220,14 @@ func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (gro } func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache { - new := g.NewCache() - var keys []string + newGroupCache := g.NewCache() + keys := make([]string, 0, len(groupIDs)) for _, groupID := range groupIDs { keys = append(keys, g.getGroupInfoKey(groupID)) } - new.AddKeys(keys...) - return new + newGroupCache.AddKeys(keys...) + + return newGroupCache } func (g *GroupCacheRedis) GetJoinedSuperGroupIDs( @@ -239,6 +244,7 @@ func (g *GroupCacheRedis) GetJoinedSuperGroupIDs( if err != nil { return nil, err } + return userGroup.GroupIDs, nil }, ) @@ -248,10 +254,11 @@ func (g *GroupCacheRedis) GetSuperGroupMemberIDs( ctx context.Context, groupIDs ...string, ) (models []*unrelationtb.SuperGroupModel, err error) { - var keys []string + keys := make([]string, 0, len(groupIDs)) for _, group := range groupIDs { keys = append(keys, g.getSuperGroupMemberIDsKey(group)) } + return batchGetCache( ctx, g.rcClient, @@ -263,6 +270,7 @@ func (g *GroupCacheRedis) GetSuperGroupMemberIDs( return i, nil } } + return 0, errIndex }, func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) { @@ -273,23 +281,25 @@ func (g *GroupCacheRedis) GetSuperGroupMemberIDs( // userJoinSuperGroup. func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache { - new := g.NewCache() - var keys []string + newGroupCache := g.NewCache() + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, g.getJoinedSuperGroupsIDKey(userID)) } - new.AddKeys(keys...) - return new + newGroupCache.AddKeys(keys...) + + return newGroupCache } func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache { - new := g.NewCache() - var keys []string + newGroupCache := g.NewCache() + keys := make([]string, 0, len(groupIDs)) for _, groupID := range groupIDs { keys = append(keys, g.getSuperGroupMemberIDsKey(groupID)) } - new.AddKeys(keys...) - return new + newGroupCache.AddKeys(keys...) + + return newGroupCache } // groupMembersHash. @@ -368,12 +378,14 @@ func (g *GroupCacheRedis) GetGroupMemberHashMap( } res[groupID] = &relationtb.GroupSimpleUserID{Hash: hash, MemberNum: uint32(num)} } + return res, nil } func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache { cache := g.NewCache() cache.AddKeys(g.getGroupMembersHashKey(groupID)) + return cache } @@ -399,12 +411,14 @@ func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []str } m[groupID] = userIDs } + return m, nil } func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache { cache := g.NewCache() cache.AddKeys(g.getGroupMemberIDsKey(groupID)) + return cache } @@ -421,12 +435,13 @@ func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) } func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, g.getJoinedGroupsKey(userID)) } cache := g.NewCache() cache.AddKeys(keys...) + return cache } @@ -450,10 +465,11 @@ func (g *GroupCacheRedis) GetGroupMembersInfo( groupID string, userIDs []string, ) ([]*relationtb.GroupMemberModel, error) { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) } + return batchGetCache( ctx, g.rcClient, @@ -482,6 +498,7 @@ func (g *GroupCacheRedis) GetGroupMembersPage( userIDs = groupMemberIDs } groupMembers, err = g.GetGroupMembersInfo(ctx, groupID, utils.Paginate(userIDs, int(showNumber), int(showNumber))) + return uint32(len(userIDs)), groupMembers, err } @@ -493,6 +510,7 @@ func (g *GroupCacheRedis) GetAllGroupMembersInfo( if err != nil { return nil, err } + return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs) } @@ -504,10 +522,11 @@ func (g *GroupCacheRedis) GetAllGroupMemberInfo( if err != nil { return nil, err } - var keys []string + keys := make([]string, 0, len(groupMemberIDs)) for _, groupMemberID := range groupMemberIDs { keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID)) } + return batchGetCache( ctx, g.rcClient, @@ -521,12 +540,13 @@ func (g *GroupCacheRedis) GetAllGroupMemberInfo( } func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) } cache := g.NewCache() cache.AddKeys(keys...) + return cache } @@ -543,11 +563,12 @@ func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) } func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache { - var keys []string + keys := make([]string, 0, len(groupID)) for _, groupID := range groupID { keys = append(keys, g.getGroupMemberNumKey(groupID)) } cache := g.NewCache() cache.AddKeys(keys...) + return cache } diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index ca742d4a3..3d62255a7 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -72,6 +72,7 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context) error { ), ) log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", m.keys) + return err } retryTimes++ @@ -80,6 +81,7 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context) error { } } } + return nil } @@ -103,6 +105,7 @@ func GetDefaultOpt() rockscache.Options { opts := rockscache.NewDefaultOptions() opts.StrongConsistency = true opts.RandomExpireAdjustment = 0.2 + return opts } @@ -125,6 +128,7 @@ func getCache[T any]( return "", utils.Wrap(err, "") } write = true + return string(bs), nil }) if err != nil { @@ -139,8 +143,10 @@ func getCache[T any]( err = json.Unmarshal([]byte(v), &t) if err != nil { log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) + return t, utils.Wrap(err, "") } + return t, nil } @@ -169,6 +175,7 @@ func batchGetCache[T any]( } values[index] = string(bs) } + return values, nil }) if err != nil { @@ -185,6 +192,7 @@ func batchGetCache[T any]( tArrays = append(tArrays, t) } } + return tArrays, nil } @@ -213,6 +221,7 @@ func batchGetCacheMap[T any]( } values[index] = string(bs) } + return values, nil }) if err != nil { @@ -229,5 +238,6 @@ func batchGetCacheMap[T any]( tMap[originKeys[i]] = t } } + return tMap, nil } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 65b8d63de..66161c424 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -16,13 +16,12 @@ package cache import ( "context" + "errors" "strconv" "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/dtm-labs/rockscache" - "github.com/OpenIMSDK/tools/errs" "github.com/gogo/protobuf/jsonpb" @@ -33,7 +32,6 @@ import ( "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/redis/go-redis/v9" ) @@ -140,10 +138,10 @@ func NewMsgCacheModel(client redis.UniversalClient) MsgModel { type msgCache struct { metaCache - rdb redis.UniversalClient - expireTime time.Duration - rcClient *rockscache.Client - msgDocDatabase unrelationtb.MsgDocModelInterface + rdb redis.UniversalClient + // expireTime time.Duration + // rcClient *rockscache.Client + // msgDocDatabase unrelationtb.MsgDocModelInterface } func (c *msgCache) getMaxSeqKey(conversationID string) string { @@ -182,18 +180,20 @@ func (c *msgCache) getSeqs( ) (m map[string]int64, err error) { pipe := c.rdb.Pipeline() for _, v := range items { - if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { - return nil, errs.Wrap(err) + err2 := pipe.Get(ctx, getkey(v)).Err() + if err2 != nil && !errors.Is(err2, redis.Nil) { + return nil, errs.Wrap(err2) } } result, err := pipe.Exec(ctx) - if err != nil && err != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { return nil, errs.Wrap(err) } m = make(map[string]int64, len(items)) for i, v := range result { seq := v.(*redis.StringCmd) - if seq.Err() != nil && seq.Err() != redis.Nil { + + if seq.Err() != nil && !errors.Is(seq.Err(), redis.Nil) { return nil, errs.Wrap(v.Err()) } val := utils.StringToInt64(seq.Val()) @@ -201,6 +201,7 @@ func (c *msgCache) getSeqs( m[items[i]] = val } } + return m, nil } @@ -229,6 +230,7 @@ func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey fu } } _, err := pipe.Exec(ctx) + return err } @@ -319,6 +321,7 @@ func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversatio func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) } @@ -332,6 +335,7 @@ func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID string, pla for k, v := range m { mm[k] = utils.StringToInt(v) } + return mm, nil } @@ -341,11 +345,13 @@ func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platf for k, v := range m { mm[k] = v } + return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err()) } func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform int, fields []string) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform) + return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err()) } @@ -366,8 +372,9 @@ func (c *msgCache) GetMessagesBySeq( for _, v := range seqs { // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := c.getMessageCacheKey(conversationID, v) - if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { - return nil, nil, err + err2 := pipe.Get(ctx, key).Err() + if err2 != nil && errors.Is(err2, redis.Nil) { + return nil, nil, err2 } } result, err := pipe.Exec(ctx) @@ -381,6 +388,7 @@ func (c *msgCache) GetMessagesBySeq( if err == nil { if msg.Status != constant.MsgDeleted { seqMsgs = append(seqMsgs, &msg) + continue } } else { @@ -389,6 +397,7 @@ func (c *msgCache) GetMessagesBySeq( failedSeqs = append(failedSeqs, seqs[i]) } } + return seqMsgs, failedSeqs, err } @@ -408,6 +417,7 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, } } _, err := pipe.Exec(ctx) + return len(failedMsgs), err } @@ -440,6 +450,7 @@ func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, se } } _, err := pipe.Exec(ctx) + return errs.Wrap(err) } @@ -452,6 +463,7 @@ func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID st for i, v := range result { seqs[i] = utils.StringToInt64(v) } + return seqs, nil } @@ -460,6 +472,7 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result() if err != nil { log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + continue } if len(delUsers) > 0 { @@ -502,12 +515,13 @@ func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, se } } _, err := pipe.Exec(ctx) + return errs.Wrap(err) } func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result() - if err == redis.Nil { + if errors.Is(err, redis.Nil) { return nil } if err != nil { @@ -515,11 +529,13 @@ func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversatio } pipe := c.rdb.Pipeline() for _, v := range vals { - if err := pipe.Del(ctx, v).Err(); err != nil { - return errs.Wrap(err) + err2 := pipe.Del(ctx, v).Err() + if err2 != nil { + return errs.Wrap(err2) } } _, err = pipe.Exec(ctx) + return errs.Wrap(err) } @@ -528,13 +544,15 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in key := c.getMessageCacheKey(userID, seq) result, err := c.rdb.Get(ctx, key).Result() if err != nil { - if err == redis.Nil { + if errors.Is(err, redis.Nil) { continue } + return errs.Wrap(err) } var msg sdkws.MsgData - if err := jsonpb.UnmarshalString(result, &msg); err != nil { + err = jsonpb.UnmarshalString(result, &msg) + if err != nil { return err } msg.Status = constant.MsgDeleted @@ -546,6 +564,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in return errs.Wrap(err) } } + return nil } @@ -571,6 +590,7 @@ func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32 func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int() + return int32(result), errs.Wrap(err) } @@ -597,6 +617,7 @@ func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID i func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() + return int(seq), errs.Wrap(err) } @@ -610,11 +631,13 @@ func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) } func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return errs.Wrap(c.rdb.Del(ctx, key).Err()) } @@ -629,6 +652,7 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in case constant.NotificationChatType: return "EX_NOTIFICATION" + clientMsgID } + return "" } @@ -637,6 +661,7 @@ func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID st if err != nil { return false, utils.Wrap(err, "") } + return n > 0, nil } diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index b821b4a52..0afbd595e 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -17,6 +17,7 @@ package cache import ( "context" "encoding/json" + "errors" "hash/crc32" "strconv" "time" @@ -70,6 +71,7 @@ func NewUserCacheRedis( options rockscache.Options, ) UserCache { rcClient := rockscache.NewClient(rdb, options) + return &UserCacheRedis{ rdb: rdb, metaCache: NewMetaCacheRedis(rcClient), @@ -97,10 +99,6 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string { return userGlobalRecvMsgOptKey + userID } -func (u *UserCacheRedis) getUserStatusHashKey(userID string, Id int32) string { - return userID + "_" + string(Id) + platformID -} - func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) { return getCache( ctx, @@ -114,10 +112,11 @@ func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userIn } func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, u.getUserInfoKey(userID)) } + return batchGetCache( ctx, u.rcClient, @@ -129,6 +128,7 @@ func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([] return i, nil } } + return 0, errIndex }, func(ctx context.Context) ([]*relationtb.UserModel, error) { @@ -138,12 +138,13 @@ func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([] } func (u *UserCacheRedis) DelUsersInfo(userIDs ...string) UserCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, u.getUserInfoKey(userID)) } cache := u.NewCache() cache.AddKeys(keys...) + return cache } @@ -160,22 +161,19 @@ func (u *UserCacheRedis) GetUserGlobalRecvMsgOpt(ctx context.Context, userID str } func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache { - var keys []string + keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID)) } cache := u.NewCache() cache.AddKeys(keys...) - return cache -} -func (u *UserCacheRedis) getOnlineStatusKey(userID string) string { - return olineStatusKey + userID + return cache } // GetUserStatus get user status. func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { - var res []*user.OnlineStatus + userStatus := make([]*user.OnlineStatus, 0, len(userIDs)) for _, userID := range userIDs { UserIDNum := crc32.ChecksumIEEE([]byte(userID)) modKey := strconv.Itoa(int(UserIDNum % statusMod)) @@ -183,13 +181,14 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([ key := olineStatusKey + modKey result, err := u.rdb.HGet(ctx, key, userID).Result() if err != nil { - if err == redis.Nil { + if errors.Is(err, redis.Nil) { // key or field does not exist - res = append(res, &user.OnlineStatus{ + userStatus = append(userStatus, &user.OnlineStatus{ UserID: userID, Status: constant.Offline, PlatformIDs: nil, }) + continue } else { return nil, errs.Wrap(err) @@ -201,9 +200,10 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([ } onlineStatus.UserID = userID onlineStatus.Status = constant.Online - res = append(res, &onlineStatus) + userStatus = append(userStatus, &onlineStatus) } - return res, nil + + return userStatus, nil } // SetUserStatus Set the user status and save it in redis. @@ -224,15 +224,16 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu Status: constant.Online, PlatformIDs: []int32{platformID}, } - jsonData, err := json.Marshal(onlineStatus) - if err != nil { - return errs.Wrap(err) + jsonData, err2 := json.Marshal(&onlineStatus) + if err2 != nil { + return errs.Wrap(err2) } - _, err = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result() - if err != nil { - return errs.Wrap(err) + _, err2 = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result() + if err2 != nil { + return errs.Wrap(err2) } u.rdb.Expire(ctx, key, userOlineStatusExpireTime) + return nil } } @@ -240,7 +241,7 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu isNil := false result, err := u.rdb.HGet(ctx, key, userID).Result() if err != nil { - if err == redis.Nil { + if errors.Is(err, redis.Nil) { isNil = true } else { return errs.Wrap(err) @@ -248,51 +249,45 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu } if status == constant.Offline { - if isNil { - log.ZWarn(ctx, "this user not online,maybe trigger order not right", - err, "userStatus", status) - return nil + err = u.refreshStatusOffline(ctx, userID, status, platformID, isNil, err, result, key) + if err != nil { + return err } - var onlineStatus user.OnlineStatus - err = json.Unmarshal([]byte(result), &onlineStatus) + } else { + err = u.refreshStatusOnline(ctx, userID, platformID, isNil, err, result, key) if err != nil { return errs.Wrap(err) } - var newPlatformIDs []int32 - for _, val := range onlineStatus.PlatformIDs { - if val != platformID { - newPlatformIDs = append(newPlatformIDs, val) - } + } + + return nil +} + +func (u *UserCacheRedis) refreshStatusOffline(ctx context.Context, userID string, status, platformID int32, isNil bool, err error, result, key string) error { + if isNil { + log.ZWarn(ctx, "this user not online,maybe trigger order not right", + err, "userStatus", status) + + return nil + } + var onlineStatus user.OnlineStatus + err = json.Unmarshal([]byte(result), &onlineStatus) + if err != nil { + return errs.Wrap(err) + } + var newPlatformIDs []int32 + for _, val := range onlineStatus.PlatformIDs { + if val != platformID { + newPlatformIDs = append(newPlatformIDs, val) } - if newPlatformIDs == nil { - _, err = u.rdb.HDel(ctx, key, userID).Result() - if err != nil { - return errs.Wrap(err) - } - } else { - onlineStatus.PlatformIDs = newPlatformIDs - newjsonData, err := json.Marshal(&onlineStatus) - if err != nil { - return errs.Wrap(err) - } - _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result() - if err != nil { - return errs.Wrap(err) - } + } + if newPlatformIDs == nil { + _, err = u.rdb.HDel(ctx, key, userID).Result() + if err != nil { + return errs.Wrap(err) } } else { - var onlineStatus user.OnlineStatus - if !isNil { - err = json.Unmarshal([]byte(result), &onlineStatus) - if err != nil { - return errs.Wrap(err) - } - onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID)) - } else { - onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID) - } - onlineStatus.Status = constant.Online - onlineStatus.UserID = userID + onlineStatus.PlatformIDs = newPlatformIDs newjsonData, err := json.Marshal(&onlineStatus) if err != nil { return errs.Wrap(err) @@ -301,7 +296,31 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu if err != nil { return errs.Wrap(err) } + } + + return nil +} +func (u *UserCacheRedis) refreshStatusOnline(ctx context.Context, userID string, platformID int32, isNil bool, err error, result, key string) error { + var onlineStatus user.OnlineStatus + if !isNil { + err2 := json.Unmarshal([]byte(result), &onlineStatus) + if err != nil { + return errs.Wrap(err2) + } + onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID)) + } else { + onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID) + } + onlineStatus.Status = constant.Online + onlineStatus.UserID = userID + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) } return nil