fix: redis cache

pull/1258/head
withchao 2 years ago
parent 89a98fcfca
commit 3f52d83a19

@ -61,12 +61,7 @@ func NewBlackCacheRedis(
} }
func (b *BlackCacheRedis) NewCache() BlackCache { func (b *BlackCacheRedis) NewCache() BlackCache {
return &BlackCacheRedis{ return &BlackCacheRedis{expireTime: b.expireTime, rcClient: b.rcClient, blackDB: b.blackDB, metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...)}
expireTime: b.expireTime,
rcClient: b.rcClient,
blackDB: b.blackDB,
metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...),
}
} }
func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
@ -74,15 +69,9 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
} }
func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) {
return getCache( return getCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) {
ctx, return b.blackDB.FindBlackUserIDs(ctx, userID)
b.rcClient, })
b.getBlackIDsKey(userID),
b.expireTime,
func(ctx context.Context) ([]string, error) {
return b.blackDB.FindBlackUserIDs(ctx, userID)
},
)
} }
func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache { func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache {

@ -73,7 +73,7 @@ type ConversationCache interface {
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache
GetConversationsByConversationID(ctx context.Context, GetConversationsByConversationID(ctx context.Context,
@ -83,11 +83,7 @@ type ConversationCache interface {
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
} }
func NewConversationRedis( func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache {
rdb redis.UniversalClient,
opts rockscache.Options,
db relationtb.ConversationModelInterface,
) ConversationCache {
rcClient := rockscache.NewClient(rdb, opts) rcClient := rockscache.NewClient(rdb, opts)
return &ConversationRedisCache{ return &ConversationRedisCache{
rcClient: rcClient, rcClient: rcClient,
@ -156,15 +152,9 @@ func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conv
} }
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache( return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
ctx, return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
c.rcClient, })
c.getConversationIDsKey(ownerUserID),
c.expireTime,
func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
},
)
} }
func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache {
@ -181,10 +171,7 @@ func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID strin
return conversationIDsHashKey + ownerUserID return conversationIDsHashKey + ownerUserID
} }
func (c *ConversationRedisCache) GetUserConversationIDsHash( func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
ctx context.Context,
ownerUserID string,
) (hash uint64, err error) {
return getCache( return getCache(
ctx, ctx,
c.rcClient, c.rcClient,
@ -213,19 +200,10 @@ func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...stri
return cache return cache
} }
func (c *ConversationRedisCache) GetConversation( func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationtb.ConversationModel, error) {
ctx context.Context, return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationtb.ConversationModel, error) {
ownerUserID, conversationID string, return c.conversationDB.Take(ctx, ownerUserID, conversationID)
) (*relationtb.ConversationModel, error) { })
return getCache(
ctx,
c.rcClient,
c.getConversationKey(ownerUserID, conversationID),
c.expireTime,
func(ctx context.Context) (*relationtb.ConversationModel, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID)
},
)
} }
func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache {
@ -238,10 +216,7 @@ func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversati
return cache return cache
} }
func (c *ConversationRedisCache) getConversationIndex( func (c *ConversationRedisCache) getConversationIndex(convsation *relationtb.ConversationModel, keys []string) (int, error) {
convsation *relationtb.ConversationModel,
keys []string,
) (int, error) {
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID) key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
for _i, _key := range keys { for _i, _key := range keys {
if _key == key { if _key == key {
@ -251,79 +226,60 @@ func (c *ConversationRedisCache) getConversationIndex(
return 0, errors.New("not found key:" + key + " in keys") return 0, errors.New("not found key:" + key + " in keys")
} }
func (c *ConversationRedisCache) GetConversations( func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
ctx context.Context, //var keys []string
ownerUserID string, //for _, conversarionID := range conversationIDs {
conversationIDs []string, // keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
) ([]*relationtb.ConversationModel, error) { //}
var keys []string //return batchGetCache(
for _, conversarionID := range conversationIDs { // ctx,
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) // c.rcClient,
} // keys,
return batchGetCache( // c.expireTime,
ctx, // c.getConversationIndex,
c.rcClient, // func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
keys, // return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
c.expireTime, // },
c.getConversationIndex, //)
func(ctx context.Context) ([]*relationtb.ConversationModel, error) { return batchGetCache2(ctx, c.rcClient, c.expireTime, conversationIDs, func(conversationID string) string {
return c.conversationDB.Find(ctx, ownerUserID, conversationIDs) return c.getConversationKey(ownerUserID, conversationID)
}, }, func(ctx context.Context, conversationID string) (*relationtb.ConversationModel, error) {
) return c.conversationDB.Take(ctx, ownerUserID, conversationID)
} })
}
func (c *ConversationRedisCache) GetUserAllConversations(
ctx context.Context, func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error) {
ownerUserID string,
) ([]*relationtb.ConversationModel, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var keys []string //var keys []string
for _, conversarionID := range conversationIDs { //for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) // keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
} //}
return batchGetCache( //return batchGetCache(
ctx, // ctx,
c.rcClient, // c.rcClient,
keys, // keys,
c.expireTime, // c.expireTime,
c.getConversationIndex, // c.getConversationIndex,
func(ctx context.Context) ([]*relationtb.ConversationModel, error) { // func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID) // return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID)
}, // },
) //)
} return c.GetConversations(ctx, ownerUserID, conversationIDs)
}
func (c *ConversationRedisCache) GetUserRecvMsgOpt(
ctx context.Context, func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
ownerUserID, conversationID string, return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) {
) (opt int, err error) { return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
return getCache( })
ctx, }
c.rcClient,
c.getRecvMsgOptKey(ownerUserID, conversationID), func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
c.expireTime, return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) {
func(ctx context.Context) (opt int, err error) { return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID) })
},
)
}
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(
ctx context.Context,
groupID string,
) (userIDs []string, err error) {
return getCache(
ctx,
c.rcClient,
c.getSuperGroupRecvNotNotifyUserIDsKey(groupID),
c.expireTime,
func(ctx context.Context) (userIDs []string, err error) {
return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
},
)
} }
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache {
@ -348,25 +304,17 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID st
return cache return cache
} }
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash( func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) {
ctx context.Context, return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) {
groupID string, userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
) (hash uint64, err error) { if err != nil {
return getCache( return 0, err
ctx, }
c.rcClient, utils.Sort(userIDs, true)
c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), bi := big.NewInt(0)
c.expireTime, bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
func(ctx context.Context) (hash uint64, err error) { return bi.Uint64(), nil
userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) },
if err != nil {
return 0, err
}
utils.Sort(userIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
return bi.Uint64(), nil
},
) )
} }
@ -376,10 +324,7 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupI
return cache return cache
} }
func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex( func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) {
conversationID string,
conversationIDs []string,
) (int, error) {
for _i, _conversationID := range conversationIDs { for _i, _conversationID := range conversationIDs {
if _conversationID == conversationID { if _conversationID == conversationID {
return _i, nil return _i, nil
@ -388,34 +333,21 @@ func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(
return 0, errors.New("not found key:" + conversationID + " in keys") return 0, errors.New("not found key:" + conversationID + " in keys")
} }
func (c *ConversationRedisCache) GetUserAllHasReadSeqs( //func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
ctx context.Context, // conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
ownerUserID string, // if err != nil {
) (map[string]int64, error) { // return nil, err
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) // }
if err != nil { // var keys []string
return nil, err // for _, conversarionID := range conversationIDs {
} // keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
var keys []string // }
for _, conversarionID := range conversationIDs { // return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) {
keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID)) // return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
} // })
return batchGetCacheMap( //}
ctx,
c.rcClient, func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache {
keys,
conversationIDs,
c.expireTime,
c.getUserAllHasReadSeqsIndex,
func(ctx context.Context) (map[string]int64, error) {
return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
},
)
}
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string,
conversationIDs ...string,
) ConversationCache {
cache := c.NewCache() cache := c.NewCache()
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID)) cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID))
@ -423,10 +355,7 @@ func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string,
return cache return cache
} }
func (c *ConversationRedisCache) GetConversationsByConversationID( func (c *ConversationRedisCache) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
ctx context.Context,
conversationIDs []string,
) ([]*relationtb.ConversationModel, error) {
panic("implement me") panic("implement me")
} }
@ -435,15 +364,9 @@ func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs
} }
func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return getCache( return getCache(ctx, c.rcClient, c.getConversationNotReceiveMessageUserIDsKey(conversationID), c.expireTime, func(ctx context.Context) ([]string, error) {
ctx, return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
c.rcClient, })
c.getConversationNotReceiveMessageUserIDsKey(conversationID),
c.expireTime,
func(ctx context.Context) ([]string, error) {
return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
},
)
} }
func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache {

@ -53,11 +53,7 @@ type FriendCacheRedis struct {
rcClient *rockscache.Client rcClient *rockscache.Client
} }
func NewFriendCacheRedis( func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface, options rockscache.Options) FriendCache {
rdb redis.UniversalClient,
friendDB relationtb.FriendModelInterface,
options rockscache.Options,
) FriendCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
return &FriendCacheRedis{ return &FriendCacheRedis{
metaCache: NewMetaCacheRedis(rcClient), metaCache: NewMetaCacheRedis(rcClient),
@ -89,15 +85,9 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string
} }
func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
return getCache( return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
ctx, return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
f.rcClient, })
f.getFriendIDsKey(ownerUserID),
f.expireTime,
func(ctx context.Context) ([]string, error) {
return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
},
)
} }
func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache { func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache {
@ -111,10 +101,7 @@ func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache {
} }
// todo. // todo.
func (f *FriendCacheRedis) GetTwoWayFriendIDs( func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
ctx context.Context,
ownerUserID string,
) (twoWayFriendIDs []string, err error) {
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID) friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -137,19 +124,10 @@ func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID s
return new return new
} }
func (f *FriendCacheRedis) GetFriend( func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationtb.FriendModel, err error) {
ctx context.Context, return getCache(ctx, f.rcClient, f.getFriendKey(ownerUserID, friendUserID), f.expireTime, func(ctx context.Context) (*relationtb.FriendModel, error) {
ownerUserID, friendUserID string, return f.friendDB.Take(ctx, ownerUserID, friendUserID)
) (friend *relationtb.FriendModel, err error) { })
return getCache(
ctx,
f.rcClient,
f.getFriendKey(ownerUserID, friendUserID),
f.expireTime,
func(ctx context.Context) (*relationtb.FriendModel, error) {
return f.friendDB.Take(ctx, ownerUserID, friendUserID)
},
)
} }
func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache { func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache {

@ -65,22 +65,10 @@ type GroupCache interface {
GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error)
DelJoinedGroupID(userID ...string) GroupCache DelJoinedGroupID(userID ...string) GroupCache
GetGroupMemberInfo( GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationtb.GroupMemberModel, err error)
ctx context.Context, GetGroupMembersInfo(ctx context.Context, groupID string, userID []string) (groupMembers []*relationtb.GroupMemberModel, err error)
groupID, userID string,
) (groupMember *relationtb.GroupMemberModel, err error)
GetGroupMembersInfo(
ctx context.Context,
groupID string,
userID []string,
) (groupMembers []*relationtb.GroupMemberModel, err error)
GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error) GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error)
GetGroupMembersPage( GetGroupMembersPage(ctx context.Context, groupID string, userID []string, showNumber, pageNumber int32) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error)
ctx context.Context,
groupID string,
userID []string,
showNumber, pageNumber int32,
) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error)
DelGroupMembersInfo(groupID string, userID ...string) GroupCache DelGroupMembersInfo(groupID string, userID ...string) GroupCache
@ -183,36 +171,25 @@ func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationtb.GroupMembe
} }
// / groupInfo. // / groupInfo.
func (g *GroupCacheRedis) GetGroupsInfo( func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error) {
ctx context.Context, //var keys []string
groupIDs []string, //for _, group := range groupIDs {
) (groups []*relationtb.GroupModel, err error) { // keys = append(keys, g.getGroupInfoKey(group))
var keys []string //}
for _, group := range groupIDs { //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupIndex, func(ctx context.Context) ([]*relationtb.GroupModel, error) {
keys = append(keys, g.getGroupInfoKey(group)) // return g.groupDB.Find(ctx, groupIDs)
} //})
return batchGetCache( return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
ctx, return g.getGroupInfoKey(groupID)
g.rcClient, }, func(ctx context.Context, groupID string) (*relationtb.GroupModel, error) {
keys, return g.groupDB.Take(ctx, groupID)
g.expireTime, })
g.GetGroupIndex,
func(ctx context.Context) ([]*relationtb.GroupModel, error) {
return g.groupDB.Find(ctx, groupIDs)
},
)
} }
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error) { func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationtb.GroupModel, error) {
ctx, return g.groupDB.Take(ctx, groupID)
g.rcClient, })
g.getGroupInfoKey(groupID),
g.expireTime,
func(ctx context.Context) (*relationtb.GroupModel, error) {
return g.groupDB.Take(ctx, groupID)
},
)
} }
func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache { func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache {
@ -225,50 +202,38 @@ func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache {
return new return new
} }
func (g *GroupCacheRedis) GetJoinedSuperGroupIDs( func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
ctx context.Context, return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
userID string, userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
) (joinedSuperGroupIDs []string, err error) { if err != nil {
return getCache( return nil, err
ctx, }
g.rcClient, return userGroup.GroupIDs, nil
g.getJoinedSuperGroupsIDKey(userID), },
g.expireTime,
func(ctx context.Context) ([]string, error) {
userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
if err != nil {
return nil, err
}
return userGroup.GroupIDs, nil
},
) )
} }
func (g *GroupCacheRedis) GetSuperGroupMemberIDs( func (g *GroupCacheRedis) GetSuperGroupMemberIDs(ctx context.Context, groupIDs ...string) (models []*unrelationtb.SuperGroupModel, err error) {
ctx context.Context, //var keys []string
groupIDs ...string, //for _, group := range groupIDs {
) (models []*unrelationtb.SuperGroupModel, err error) { // keys = append(keys, g.getSuperGroupMemberIDsKey(group))
var keys []string //}
for _, group := range groupIDs { //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, func(model *unrelationtb.SuperGroupModel, keys []string) (int, error) {
keys = append(keys, g.getSuperGroupMemberIDsKey(group)) // for i, key := range keys {
} // if g.getSuperGroupMemberIDsKey(model.GroupID) == key {
return batchGetCache( // return i, nil
ctx, // }
g.rcClient, // }
keys, // return 0, errIndex
g.expireTime, //},
func(model *unrelationtb.SuperGroupModel, keys []string) (int, error) { // func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) {
for i, key := range keys { // return g.mongoDB.FindSuperGroup(ctx, groupIDs)
if g.getSuperGroupMemberIDsKey(model.GroupID) == key { // })
return i, nil return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
} return g.getSuperGroupMemberIDsKey(groupID)
} }, func(ctx context.Context, groupID string) (*unrelationtb.SuperGroupModel, error) {
return 0, errIndex return g.mongoDB.TakeSuperGroup(ctx, groupID)
}, })
func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) {
return g.mongoDB.FindSuperGroup(ctx, groupIDs)
},
)
} }
// userJoinSuperGroup. // userJoinSuperGroup.
@ -351,10 +316,7 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin
//) //)
} }
func (g *GroupCacheRedis) GetGroupMemberHashMap( func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error) {
ctx context.Context,
groupIDs []string,
) (map[string]*relationtb.GroupSimpleUserID, error) {
res := make(map[string]*relationtb.GroupSimpleUserID) res := make(map[string]*relationtb.GroupSimpleUserID)
for _, groupID := range groupIDs { for _, groupID := range groupIDs {
hash, err := g.GetGroupMembersHash(ctx, groupID) hash, err := g.GetGroupMembersHash(ctx, groupID)
@ -379,15 +341,9 @@ func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache {
// groupMemberIDs. // groupMemberIDs.
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
ctx, return g.groupMemberDB.FindMemberUserID(ctx, groupID)
g.rcClient, })
g.getGroupMemberIDsKey(groupID),
g.expireTime,
func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindMemberUserID(ctx, groupID)
},
)
} }
func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) { func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) {
@ -409,15 +365,9 @@ func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache {
} }
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
ctx, return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID)
g.rcClient, })
g.getJoinedGroupsKey(userID),
g.expireTime,
func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID)
},
)
} }
func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache { func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache {
@ -430,48 +380,28 @@ func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache {
return cache return cache
} }
func (g *GroupCacheRedis) GetGroupMemberInfo( func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationtb.GroupMemberModel, err error) {
ctx context.Context, return getCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationtb.GroupMemberModel, error) {
groupID, userID string, return g.groupMemberDB.Take(ctx, groupID, userID)
) (groupMember *relationtb.GroupMemberModel, err error) { })
return getCache(
ctx,
g.rcClient,
g.getGroupMemberInfoKey(groupID, userID),
g.expireTime,
func(ctx context.Context) (*relationtb.GroupMemberModel, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
},
)
} }
func (g *GroupCacheRedis) GetGroupMembersInfo( func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]*relationtb.GroupMemberModel, error) {
ctx context.Context, //var keys []string
groupID string, //for _, userID := range userIDs {
userIDs []string, // keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
) ([]*relationtb.GroupMemberModel, error) { //}
var keys []string //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
for _, userID := range userIDs { // return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, nil)
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) //})
} return batchGetCache2(ctx, g.rcClient, g.expireTime, userIDs, func(userID string) string {
return batchGetCache( return g.getGroupMemberInfoKey(groupID, userID)
ctx, }, func(ctx context.Context, userID string) (*relationtb.GroupMemberModel, error) {
g.rcClient, return g.groupMemberDB.Take(ctx, groupID, userID)
keys, })
g.expireTime,
g.GetGroupMemberIndex,
func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, nil)
},
)
} }
func (g *GroupCacheRedis) GetGroupMembersPage( func (g *GroupCacheRedis) GetGroupMembersPage(ctx context.Context, groupID string, userIDs []string, showNumber, pageNumber int32) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) {
ctx context.Context,
groupID string,
userIDs []string,
showNumber, pageNumber int32,
) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -485,10 +415,7 @@ func (g *GroupCacheRedis) GetGroupMembersPage(
return uint32(len(userIDs)), groupMembers, err return uint32(len(userIDs)), groupMembers, err
} }
func (g *GroupCacheRedis) GetAllGroupMembersInfo( func (g *GroupCacheRedis) GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error) {
ctx context.Context,
groupID string,
) (groupMembers []*relationtb.GroupMemberModel, err error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -496,28 +423,19 @@ func (g *GroupCacheRedis) GetAllGroupMembersInfo(
return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs) return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
} }
func (g *GroupCacheRedis) GetAllGroupMemberInfo( func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*relationtb.GroupMemberModel, error) {
ctx context.Context,
groupID string,
) ([]*relationtb.GroupMemberModel, error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var keys []string //var keys []string
for _, groupMemberID := range groupMemberIDs { //for _, groupMemberID := range groupMemberIDs {
keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID)) // keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID))
} //}
return batchGetCache( //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
ctx, // return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil)
g.rcClient, //})
keys, return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
g.expireTime,
g.GetGroupMemberIndex,
func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil)
},
)
} }
func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache { func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache {
@ -531,15 +449,9 @@ func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string)
} }
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) { func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getGroupMemberNumKey(groupID), g.expireTime, func(ctx context.Context) (int64, error) {
ctx, return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID)
g.rcClient, })
g.getGroupMemberNumKey(groupID),
g.expireTime,
func(ctx context.Context) (int64, error) {
return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID)
},
)
} }
func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache { func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache {

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"time" "time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
@ -59,25 +58,37 @@ type metaCacheRedis struct {
func (m *metaCacheRedis) ExecDel(ctx context.Context) error { func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
if len(m.keys) > 0 { if len(m.keys) > 0 {
log.ZDebug(ctx, "delete cache", "keys", m.keys) log.ZDebug(ctx, "delete cache", "keys", m.keys)
retryTimes := 0 for _, key := range m.keys {
for { for i := 0; i < m.maxRetryTimes; i++ {
if err := m.rcClient.TagAsDeletedBatch2(ctx, m.keys); err != nil { if err := m.rcClient.TagAsDeleted(key); err != nil {
if retryTimes >= m.maxRetryTimes { log.ZError(ctx, "delete cache failed", err, "key", key)
err = errs.ErrInternalServer.Wrap( time.Sleep(m.retryInterval)
fmt.Sprintf( continue
"delete cache error: %v, keys: %v, retry times %d, please check redis server",
err,
m.keys,
retryTimes,
),
)
log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", m.keys)
return err
} }
retryTimes++
} else {
break break
} }
//retryTimes := 0
//for {
// m.rcClient.TagAsDeleted()
// if err := m.rcClient.TagAsDeletedBatch2(ctx, []string{key}); err != nil {
// if retryTimes >= m.maxRetryTimes {
// err = errs.ErrInternalServer.Wrap(
// fmt.Sprintf(
// "delete cache error: %v, keys: %v, retry times %d, please check redis server",
// err,
// key,
// retryTimes,
// ),
// )
// log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", key)
// return err
// }
// retryTimes++
// } else {
// break
// }
//}
} }
} }
return nil return nil
@ -138,83 +149,100 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
return t, nil return t, nil
} }
func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) { //func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) {
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { // batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
values := make(map[int]string) // values := make(map[int]string)
tArrays, err := fn(ctx) // tArrays, err := fn(ctx)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
for _, v := range tArrays { // for _, v := range tArrays {
index, err := keyIndexFn(v, keys) // index, err := keyIndexFn(v, keys)
if err != nil { // if err != nil {
continue // continue
} // }
bs, err := json.Marshal(v) // bs, err := json.Marshal(v)
if err != nil { // if err != nil {
return nil, utils.Wrap(err, "marshal failed") // return nil, utils.Wrap(err, "marshal failed")
} // }
values[index] = string(bs) // values[index] = string(bs)
} // }
return values, nil // return values, nil
}) // })
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
var tArrays []T // var tArrays []T
for _, v := range batchMap { // for _, v := range batchMap {
if v != "" { // if v != "" {
var t T // var t T
err = json.Unmarshal([]byte(v), &t) // err = json.Unmarshal([]byte(v), &t)
if err != nil { // if err != nil {
return nil, utils.Wrap(err, "unmarshal failed") // return nil, utils.Wrap(err, "unmarshal failed")
} // }
tArrays = append(tArrays, t) // tArrays = append(tArrays, t)
} // }
// }
// return tArrays, nil
//}
func batchGetCache2[T any, K comparable](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, keys []K, keyFn func(key K) string, fns func(ctx context.Context, key K) (T, error)) ([]T, error) {
if len(keys) == 0 {
return nil, nil
} }
return tArrays, nil res := make([]T, 0, len(keys))
} for _, key := range keys {
val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
func batchGetCacheMap[T any]( return fns(ctx, key)
ctx context.Context, })
rcClient *rockscache.Client,
keys, originKeys []string,
expire time.Duration,
keyIndexFn func(s string, keys []string) (int, error),
fn func(ctx context.Context) (map[string]T, error),
) (map[string]T, error) {
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
tArrays, err := fn(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
values := make(map[int]string) res = append(res, val)
for k, v := range tArrays {
index, err := keyIndexFn(k, originKeys)
if err != nil {
continue
}
bs, err := json.Marshal(v)
if err != nil {
return nil, utils.Wrap(err, "marshal failed")
}
values[index] = string(bs)
}
return values, nil
})
if err != nil {
return nil, err
} }
tMap := make(map[string]T) return res, nil
for i, v := range batchMap {
if v != "" {
var t T
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return nil, utils.Wrap(err, "unmarshal failed")
}
tMap[originKeys[i]] = t
}
}
return tMap, nil
} }
//func batchGetCacheMap[T any](
// ctx context.Context,
// rcClient *rockscache.Client,
// keys, originKeys []string,
// expire time.Duration,
// keyIndexFn func(s string, keys []string) (int, error),
// fn func(ctx context.Context) (map[string]T, error),
//) (map[string]T, error) {
// batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
// tArrays, err := fn(ctx)
// if err != nil {
// return nil, err
// }
// values := make(map[int]string)
// for k, v := range tArrays {
// index, err := keyIndexFn(k, originKeys)
// if err != nil {
// continue
// }
// bs, err := json.Marshal(v)
// if err != nil {
// return nil, utils.Wrap(err, "marshal failed")
// }
// values[index] = string(bs)
// }
// return values, nil
// })
// if err != nil {
// return nil, err
// }
// tMap := make(map[string]T)
// for i, v := range batchMap {
// if v != "" {
// var t T
// err = json.Unmarshal([]byte(v), &t)
// if err != nil {
// return nil, utils.Wrap(err, "unmarshal failed")
// }
// tMap[originKeys[i]] = t
// }
// }
// return tMap, nil
//}

@ -105,11 +105,7 @@ type MsgModel interface {
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessagesBySeq( GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
ctx context.Context,
conversationID string,
seqs []int64,
) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
@ -122,12 +118,7 @@ type MsgModel interface {
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire( SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
ctx context.Context,
clientMsgID string,
sessionType int32,
expiration time.Duration,
) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
@ -158,28 +149,15 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string
return hasReadSeq + userID + ":" + conversationID return hasReadSeq + userID + ":" + conversationID
} }
func (c *msgCache) setSeq( func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
ctx context.Context,
conversationID string,
seq int64,
getkey func(conversationID string) string,
) error {
return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
} }
func (c *msgCache) getSeq( func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
ctx context.Context,
conversationID string,
getkey func(conversationID string) string,
) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64()) return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64())
} }
func (c *msgCache) getSeqs( func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
ctx context.Context,
items []string,
getkey func(s string) string,
) (m map[string]int64, err error) {
pipe := c.rdb.Pipeline() pipe := c.rdb.Pipeline()
for _, v := range items { for _, v := range items {
if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil {
@ -252,30 +230,17 @@ func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID
return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()) return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64())
} }
func (c *msgCache) GetConversationUserMinSeqs( func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
ctx context.Context,
conversationID string,
userIDs []string,
) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string { return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID) return c.getConversationUserMinSeqKey(conversationID, userID)
}) })
} }
func (c *msgCache) SetConversationUserMinSeq( func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
ctx context.Context,
conversationID string,
userID string,
minSeq int64,
) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
} }
func (c *msgCache) SetConversationUserMinSeqs( func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
ctx context.Context,
conversationID string,
seqs map[string]int64,
) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string { return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID) return c.getConversationUserMinSeqKey(conversationID, userID)
}) })
@ -303,11 +268,7 @@ func (c *msgCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasRea
}) })
} }
func (c *msgCache) GetHasReadSeqs( func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
ctx context.Context,
userID string,
conversationIDs []string,
) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID) return c.getHasReadSeqKey(conversationID, userID)
}) })
@ -357,11 +318,7 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string {
return messageCache + conversationID + "_*" return messageCache + conversationID + "_*"
} }
func (c *msgCache) GetMessagesBySeq( func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
ctx context.Context,
conversationID string,
seqs []int64,
) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline() pipe := c.rdb.Pipeline()
for _, v := range seqs { for _, v := range seqs {
// MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
@ -574,17 +531,8 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
return int32(result), errs.Wrap(err) return int32(result), errs.Wrap(err)
} }
func (c *msgCache) SetFcmToken( func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
ctx context.Context, return errs.Wrap(c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
account string,
platformID int,
fcmToken string,
expireTime int64,
) (err error) {
return errs.Wrap(
c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).
Err(),
)
} }
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {

@ -113,23 +113,6 @@ func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userIn
) )
} }
func batchGetCache2[T any, K comparable](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, keys []K, keyFn func(key K) string, fns func(ctx context.Context, key K) (T, error)) ([]T, error) {
if len(keys) == 0 {
return nil, nil
}
res := make([]T, 0, len(keys))
for _, key := range keys {
val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
return fns(ctx, key)
})
if err != nil {
return nil, err
}
res = append(res, val)
}
return res, nil
}
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) { func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) {
//var keys []string //var keys []string
//for _, userID := range userIDs { //for _, userID := range userIDs {

@ -50,7 +50,7 @@ type ConversationDatabase interface {
GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetConversationIDs(ctx context.Context, userID string) ([]string, error)
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error)
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
@ -295,9 +295,9 @@ func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]str
return c.conversationDB.GetAllConversationIDs(ctx) return c.conversationDB.GetAllConversationIDs(ctx)
} }
func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { //func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) // return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID)
} //}
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) { func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs) return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)

Loading…
Cancel
Save