fix: not support redis cluster. CROSSSLOT Keys in request don't hash to the same slot

pull/1277/head
withchao 2 years ago committed by Gordon
parent afc2284204
commit b46bb893b3

@ -52,6 +52,7 @@ func NewBlackCacheRedis(
options rockscache.Options, options rockscache.Options,
) BlackCache { ) BlackCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
return &BlackCacheRedis{ return &BlackCacheRedis{
expireTime: blackExpireTime, expireTime: blackExpireTime,
rcClient: rcClient, rcClient: rcClient,
@ -61,8 +62,12 @@ func NewBlackCacheRedis(
} }
func (b *BlackCacheRedis) NewCache() BlackCache { func (b *BlackCacheRedis) NewCache() BlackCache {
return &BlackCacheRedis{expireTime: b.expireTime, rcClient: b.rcClient, blackDB: b.blackDB, return &BlackCacheRedis{
metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...)} expireTime: b.expireTime,
rcClient: b.rcClient,
blackDB: b.blackDB,
metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...),
}
} }
func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
@ -70,13 +75,20 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
} }
func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) {
return getCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) { return getCache(
ctx,
b.rcClient,
b.getBlackIDsKey(userID),
b.expireTime,
func(ctx context.Context) ([]string, error) {
return b.blackDB.FindBlackUserIDs(ctx, userID) return b.blackDB.FindBlackUserIDs(ctx, userID)
}) },
)
} }
func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache { func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache {
cache := b.NewCache() cache := b.NewCache()
cache.AddKeys(b.getBlackIDsKey(userID)) cache.AddKeys(b.getBlackIDsKey(userID))
return cache return cache
} }

@ -73,7 +73,7 @@ type ConversationCache interface {
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache
GetConversationsByConversationID(ctx context.Context, GetConversationsByConversationID(ctx context.Context,
@ -83,12 +83,9 @@ type ConversationCache interface {
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
} }
func NewConversationRedis( func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache {
rdb redis.UniversalClient,
opts rockscache.Options,
db relationtb.ConversationModelInterface,
) ConversationCache {
rcClient := rockscache.NewClient(rdb, opts) rcClient := rockscache.NewClient(rdb, opts)
return &ConversationRedisCache{ return &ConversationRedisCache{
rcClient: rcClient, rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient), metaCache: NewMetaCacheRedis(rcClient),
@ -110,6 +107,7 @@ func NewNewConversationRedis(
options rockscache.Options, options rockscache.Options,
) ConversationCache { ) ConversationCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
return &ConversationRedisCache{ return &ConversationRedisCache{
rcClient: rcClient, rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient), metaCache: NewMetaCacheRedis(rcClient),
@ -156,24 +154,19 @@ func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conv
} }
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache( return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
ctx,
c.rcClient,
c.getConversationIDsKey(ownerUserID),
c.expireTime,
func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
}, })
)
} }
func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache {
var keys []string keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs { for _, userID := range userIDs {
keys = append(keys, c.getConversationIDsKey(userID)) keys = append(keys, c.getConversationIDsKey(userID))
} }
cache := c.NewCache() cache := c.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
@ -181,10 +174,7 @@ func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID strin
return conversationIDsHashKey + ownerUserID return conversationIDsHashKey + ownerUserID
} }
func (c *ConversationRedisCache) GetUserConversationIDsHash( func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
ctx context.Context,
ownerUserID string,
) (hash uint64, err error) {
return getCache( return getCache(
ctx, ctx,
c.rcClient, c.rcClient,
@ -204,160 +194,127 @@ func (c *ConversationRedisCache) GetUserConversationIDsHash(
} }
func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache {
var keys []string keys := make([]string, 0, len(ownerUserIDs))
for _, ownerUserID := range ownerUserIDs { for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID)) keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID))
} }
cache := c.NewCache() cache := c.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
func (c *ConversationRedisCache) GetConversation( func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationtb.ConversationModel, error) {
ctx context.Context, return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationtb.ConversationModel, error) {
ownerUserID, conversationID string,
) (*relationtb.ConversationModel, error) {
return getCache(
ctx,
c.rcClient,
c.getConversationKey(ownerUserID, conversationID),
c.expireTime,
func(ctx context.Context) (*relationtb.ConversationModel, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID) return c.conversationDB.Take(ctx, ownerUserID, conversationID)
}, })
)
} }
func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache {
var keys []string keys := make([]string, 0, len(conversationIDs))
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
} }
cache := c.NewCache() cache := c.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
func (c *ConversationRedisCache) getConversationIndex( func (c *ConversationRedisCache) getConversationIndex(convsation *relationtb.ConversationModel, keys []string) (int, error) {
convsation *relationtb.ConversationModel,
keys []string,
) (int, error) {
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID) key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
for _i, _key := range keys { for _i, _key := range keys {
if _key == key { if _key == key {
return _i, nil return _i, nil
} }
} }
return 0, errors.New("not found key:" + key + " in keys") return 0, errors.New("not found key:" + key + " in keys")
} }
func (c *ConversationRedisCache) GetConversations( func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
ctx context.Context, //var keys []string
ownerUserID string, //for _, conversarionID := range conversationIDs {
conversationIDs []string, // keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
) ([]*relationtb.ConversationModel, error) { //}
var keys []string //return batchGetCache(
for _, conversarionID := range conversationIDs { // ctx,
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) // c.rcClient,
} // keys,
return batchGetCache( // c.expireTime,
ctx, // c.getConversationIndex,
c.rcClient, // func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
keys, // return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
c.expireTime, // },
c.getConversationIndex, //)
func(ctx context.Context) ([]*relationtb.ConversationModel, error) { return batchGetCache2(ctx, c.rcClient, c.expireTime, conversationIDs, func(conversationID string) string {
return c.conversationDB.Find(ctx, ownerUserID, conversationIDs) return c.getConversationKey(ownerUserID, conversationID)
}, }, func(ctx context.Context, conversationID string) (*relationtb.ConversationModel, error) {
) return c.conversationDB.Take(ctx, ownerUserID, conversationID)
})
} }
func (c *ConversationRedisCache) GetUserAllConversations( func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error) {
ctx context.Context,
ownerUserID string,
) ([]*relationtb.ConversationModel, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var keys []string //var keys []string
for _, conversarionID := range conversationIDs { //for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) // keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
} //}
return batchGetCache( //return batchGetCache(
ctx, // ctx,
c.rcClient, // c.rcClient,
keys, // keys,
c.expireTime, // c.expireTime,
c.getConversationIndex, // c.getConversationIndex,
func(ctx context.Context) ([]*relationtb.ConversationModel, error) { // func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID) // return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID)
}, // },
) //)
} return c.GetConversations(ctx, ownerUserID, conversationIDs)
}
func (c *ConversationRedisCache) GetUserRecvMsgOpt(
ctx context.Context, func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
ownerUserID, conversationID string, return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) {
) (opt int, err error) {
return getCache(
ctx,
c.rcClient,
c.getRecvMsgOptKey(ownerUserID, conversationID),
c.expireTime,
func(ctx context.Context) (opt int, err error) {
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID) return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
}, })
)
} }
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs( func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
ctx context.Context, return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) {
groupID string,
) (userIDs []string, err error) {
return getCache(
ctx,
c.rcClient,
c.getSuperGroupRecvNotNotifyUserIDsKey(groupID),
c.expireTime,
func(ctx context.Context) (userIDs []string, err error) {
return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
}, })
)
} }
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache {
var keys []string keys := make([]string, 0, len(ownerUserIDs))
for _, ownerUserID := range ownerUserIDs { for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
} }
cache := c.NewCache() cache := c.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache { func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache {
cache := c.NewCache() cache := c.NewCache()
cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID)) cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID))
return cache return cache
} }
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache { func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache {
cache := c.NewCache() cache := c.NewCache()
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID)) cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID))
return cache return cache
} }
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash( func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) {
ctx context.Context, return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) {
groupID string,
) (hash uint64, err error) {
return getCache(
ctx,
c.rcClient,
c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID),
c.expireTime,
func(ctx context.Context) (hash uint64, err error) {
userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil { if err != nil {
return 0, err return 0, err
@ -373,60 +330,44 @@ func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache { func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache {
cache := c.NewCache() cache := c.NewCache()
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID)) cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID))
return cache return cache
} }
func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex( func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) {
conversationID string,
conversationIDs []string,
) (int, error) {
for _i, _conversationID := range conversationIDs { for _i, _conversationID := range conversationIDs {
if _conversationID == conversationID { if _conversationID == conversationID {
return _i, nil return _i, nil
} }
} }
return 0, errors.New("not found key:" + conversationID + " in keys")
}
func (c *ConversationRedisCache) GetUserAllHasReadSeqs( return 0, errors.New("not found key:" + conversationID + " in keys")
ctx context.Context,
ownerUserID string,
) (map[string]int64, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
var keys []string
for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
}
return batchGetCacheMap(
ctx,
c.rcClient,
keys,
conversationIDs,
c.expireTime,
c.getUserAllHasReadSeqsIndex,
func(ctx context.Context) (map[string]int64, error) {
return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
},
)
} }
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, //func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
conversationIDs ...string, // conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
) ConversationCache { // if err != nil {
// return nil, err
// }
// var keys []string
// for _, conversarionID := range conversationIDs {
// keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
// }
// return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) {
// return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
// })
//}
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache {
cache := c.NewCache() cache := c.NewCache()
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID)) cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID))
} }
return cache return cache
} }
func (c *ConversationRedisCache) GetConversationsByConversationID( func (c *ConversationRedisCache) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
ctx context.Context,
conversationIDs []string,
) ([]*relationtb.ConversationModel, error) {
panic("implement me") panic("implement me")
} }
@ -435,15 +376,9 @@ func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs
} }
func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return getCache( return getCache(ctx, c.rcClient, c.getConversationNotReceiveMessageUserIDsKey(conversationID), c.expireTime, func(ctx context.Context) ([]string, error) {
ctx,
c.rcClient,
c.getConversationNotReceiveMessageUserIDsKey(conversationID),
c.expireTime,
func(ctx context.Context) ([]string, error) {
return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
}, })
)
} }
func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache {
@ -451,5 +386,6 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID)) cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
} }
return cache return cache
} }

@ -64,12 +64,12 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo
} }
} }
func (c *FriendCacheRedis) NewCache() FriendCache { func (f *FriendCacheRedis) NewCache() FriendCache {
return &FriendCacheRedis{ return &FriendCacheRedis{
rcClient: c.rcClient, rcClient: f.rcClient,
metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), metaCache: NewMetaCacheRedis(f.rcClient, f.metaCache.GetPreDelKeys()...),
friendDB: c.friendDB, friendDB: f.friendDB,
expireTime: c.expireTime, expireTime: f.expireTime,
} }
} }
@ -86,25 +86,24 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string
} }
func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
func(ctx context.Context) ([]string, error) {
return f.friendDB.FindFriendUserIDs(ctx, ownerUserID) return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
}) })
} }
func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache { func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) FriendCache {
new := f.NewCache() newGroupCache := f.NewCache()
var keys []string keys := make([]string, 0, len(ownerUserIDs))
for _, userID := range ownerUserID { for _, userID := range ownerUserIDs {
keys = append(keys, f.getFriendIDsKey(userID)) keys = append(keys, f.getFriendIDsKey(userID))
} }
new.AddKeys(keys...) newGroupCache.AddKeys(keys...)
return new
return newGroupCache
} }
// todo. // todo.
func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
ownerUserID string) (twoWayFriendIDs []string, err error) {
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID) friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -118,13 +117,15 @@ func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context,
twoWayFriendIDs = append(twoWayFriendIDs, ownerUserID) twoWayFriendIDs = append(twoWayFriendIDs, ownerUserID)
} }
} }
return twoWayFriendIDs, nil return twoWayFriendIDs, nil
} }
func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache { func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache {
new := f.NewCache() newFriendCache := f.NewCache()
new.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID)) newFriendCache.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID))
return new
return newFriendCache
} }
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID,
@ -136,7 +137,8 @@ func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID,
} }
func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache { func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache {
new := f.NewCache() newFriendCache := f.NewCache()
new.AddKeys(f.getFriendKey(ownerUserID, friendUserID)) newFriendCache.AddKeys(f.getFriendKey(ownerUserID, friendUserID))
return new
return newFriendCache
} }

@ -65,22 +65,10 @@ type GroupCache interface {
GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error)
DelJoinedGroupID(userID ...string) GroupCache DelJoinedGroupID(userID ...string) GroupCache
GetGroupMemberInfo( GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationtb.GroupMemberModel, err error)
ctx context.Context, GetGroupMembersInfo(ctx context.Context, groupID string, userID []string) (groupMembers []*relationtb.GroupMemberModel, err error)
groupID, userID string,
) (groupMember *relationtb.GroupMemberModel, err error)
GetGroupMembersInfo(
ctx context.Context,
groupID string,
userID []string,
) (groupMembers []*relationtb.GroupMemberModel, err error)
GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error) GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error)
GetGroupMembersPage( GetGroupMembersPage(ctx context.Context, groupID string, userID []string, showNumber, pageNumber int32) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error)
ctx context.Context,
groupID string,
userID []string,
showNumber, pageNumber int32,
) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error)
DelGroupMembersInfo(groupID string, userID ...string) GroupCache DelGroupMembersInfo(groupID string, userID ...string) GroupCache
@ -109,6 +97,7 @@ func NewGroupCacheRedis(
opts rockscache.Options, opts rockscache.Options,
) GroupCache { ) GroupCache {
rcClient := rockscache.NewClient(rdb, opts) rcClient := rockscache.NewClient(rdb, opts)
return &GroupCacheRedis{ return &GroupCacheRedis{
rcClient: rcClient, expireTime: groupExpireTime, rcClient: rcClient, expireTime: groupExpireTime,
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
@ -169,6 +158,7 @@ func (g *GroupCacheRedis) GetGroupIndex(group *relationtb.GroupModel, keys []str
return i, nil return i, nil
} }
} }
return 0, errIndex return 0, errIndex
} }
@ -179,62 +169,45 @@ func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationtb.GroupMembe
return i, nil return i, nil
} }
} }
return 0, errIndex return 0, errIndex
} }
// / groupInfo. // / groupInfo.
func (g *GroupCacheRedis) GetGroupsInfo( func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error) {
ctx context.Context, //var keys []string
groupIDs []string, //for _, group := range groupIDs {
) (groups []*relationtb.GroupModel, err error) { // keys = append(keys, g.getGroupInfoKey(group))
var keys []string //}
for _, group := range groupIDs { //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupIndex, func(ctx context.Context) ([]*relationtb.GroupModel, error) {
keys = append(keys, g.getGroupInfoKey(group)) // return g.groupDB.Find(ctx, groupIDs)
} //})
return batchGetCache( return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
ctx, return g.getGroupInfoKey(groupID)
g.rcClient, }, func(ctx context.Context, groupID string) (*relationtb.GroupModel, error) {
keys, return g.groupDB.Take(ctx, groupID)
g.expireTime, })
g.GetGroupIndex,
func(ctx context.Context) ([]*relationtb.GroupModel, error) {
return g.groupDB.Find(ctx, groupIDs)
},
)
} }
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error) { func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationtb.GroupModel, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationtb.GroupModel, error) {
ctx,
g.rcClient,
g.getGroupInfoKey(groupID),
g.expireTime,
func(ctx context.Context) (*relationtb.GroupModel, error) {
return g.groupDB.Take(ctx, groupID) return g.groupDB.Take(ctx, groupID)
}, })
)
} }
func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache { func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache {
new := g.NewCache() newGroupCache := g.NewCache()
var keys []string keys := make([]string, 0, len(groupIDs))
for _, groupID := range groupIDs { for _, groupID := range groupIDs {
keys = append(keys, g.getGroupInfoKey(groupID)) keys = append(keys, g.getGroupInfoKey(groupID))
} }
new.AddKeys(keys...) newGroupCache.AddKeys(keys...)
return new
return newGroupCache
} }
func (g *GroupCacheRedis) GetJoinedSuperGroupIDs( func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
ctx context.Context, return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
userID string,
) (joinedSuperGroupIDs []string, err error) {
return getCache(
ctx,
g.rcClient,
g.getJoinedSuperGroupsIDKey(userID),
g.expireTime,
func(ctx context.Context) ([]string, error) {
userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID) userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -244,52 +217,50 @@ func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(
) )
} }
func (g *GroupCacheRedis) GetSuperGroupMemberIDs( func (g *GroupCacheRedis) GetSuperGroupMemberIDs(ctx context.Context, groupIDs ...string) (models []*unrelationtb.SuperGroupModel, err error) {
ctx context.Context, //var keys []string
groupIDs ...string, //for _, group := range groupIDs {
) (models []*unrelationtb.SuperGroupModel, err error) { // keys = append(keys, g.getSuperGroupMemberIDsKey(group))
var keys []string //}
for _, group := range groupIDs { //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, func(model *unrelationtb.SuperGroupModel, keys []string) (int, error) {
keys = append(keys, g.getSuperGroupMemberIDsKey(group)) // for i, key := range keys {
} // if g.getSuperGroupMemberIDsKey(model.GroupID) == key {
return batchGetCache( // return i, nil
ctx, // }
g.rcClient, // }
keys, // return 0, errIndex
g.expireTime, //},
func(model *unrelationtb.SuperGroupModel, keys []string) (int, error) { // func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) {
for i, key := range keys { // return g.mongoDB.FindSuperGroup(ctx, groupIDs)
if g.getSuperGroupMemberIDsKey(model.GroupID) == key { // })
return i, nil return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
} return g.getSuperGroupMemberIDsKey(groupID)
} }, func(ctx context.Context, groupID string) (*unrelationtb.SuperGroupModel, error) {
return 0, errIndex return g.mongoDB.TakeSuperGroup(ctx, groupID)
}, })
func(ctx context.Context) ([]*unrelationtb.SuperGroupModel, error) {
return g.mongoDB.FindSuperGroup(ctx, groupIDs)
},
)
} }
// userJoinSuperGroup. // userJoinSuperGroup.
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache { func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache {
new := g.NewCache() newGroupCache := g.NewCache()
var keys []string keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs { for _, userID := range userIDs {
keys = append(keys, g.getJoinedSuperGroupsIDKey(userID)) keys = append(keys, g.getJoinedSuperGroupsIDKey(userID))
} }
new.AddKeys(keys...) newGroupCache.AddKeys(keys...)
return new
return newGroupCache
} }
func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache { func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache {
new := g.NewCache() newGroupCache := g.NewCache()
var keys []string keys := make([]string, 0, len(groupIDs))
for _, groupID := range groupIDs { for _, groupID := range groupIDs {
keys = append(keys, g.getSuperGroupMemberIDsKey(groupID)) keys = append(keys, g.getSuperGroupMemberIDsKey(groupID))
} }
new.AddKeys(keys...) newGroupCache.AddKeys(keys...)
return new
return newGroupCache
} }
// groupMembersHash. // groupMembersHash.
@ -351,10 +322,7 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin
//) //)
} }
func (g *GroupCacheRedis) GetGroupMemberHashMap( func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error) {
ctx context.Context,
groupIDs []string,
) (map[string]*relationtb.GroupSimpleUserID, error) {
res := make(map[string]*relationtb.GroupSimpleUserID) res := make(map[string]*relationtb.GroupSimpleUserID)
for _, groupID := range groupIDs { for _, groupID := range groupIDs {
hash, err := g.GetGroupMembersHash(ctx, groupID) hash, err := g.GetGroupMembersHash(ctx, groupID)
@ -368,26 +336,22 @@ func (g *GroupCacheRedis) GetGroupMemberHashMap(
} }
res[groupID] = &relationtb.GroupSimpleUserID{Hash: hash, MemberNum: uint32(num)} res[groupID] = &relationtb.GroupSimpleUserID{Hash: hash, MemberNum: uint32(num)}
} }
return res, nil return res, nil
} }
func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache { func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache {
cache := g.NewCache() cache := g.NewCache()
cache.AddKeys(g.getGroupMembersHashKey(groupID)) cache.AddKeys(g.getGroupMembersHashKey(groupID))
return cache return cache
} }
// groupMemberIDs. // groupMemberIDs.
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
ctx,
g.rcClient,
g.getGroupMemberIDsKey(groupID),
g.expireTime,
func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindMemberUserID(ctx, groupID) return g.groupMemberDB.FindMemberUserID(ctx, groupID)
}, })
)
} }
func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) { func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) {
@ -399,79 +363,56 @@ func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []str
} }
m[groupID] = userIDs m[groupID] = userIDs
} }
return m, nil return m, nil
} }
func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache { func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache {
cache := g.NewCache() cache := g.NewCache()
cache.AddKeys(g.getGroupMemberIDsKey(groupID)) cache.AddKeys(g.getGroupMemberIDsKey(groupID))
return cache return cache
} }
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
ctx,
g.rcClient,
g.getJoinedGroupsKey(userID),
g.expireTime,
func(ctx context.Context) ([]string, error) {
return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID) return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID)
}, })
)
} }
func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache { func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache {
var keys []string keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs { for _, userID := range userIDs {
keys = append(keys, g.getJoinedGroupsKey(userID)) keys = append(keys, g.getJoinedGroupsKey(userID))
} }
cache := g.NewCache() cache := g.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
func (g *GroupCacheRedis) GetGroupMemberInfo( func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationtb.GroupMemberModel, err error) {
ctx context.Context, return getCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationtb.GroupMemberModel, error) {
groupID, userID string,
) (groupMember *relationtb.GroupMemberModel, err error) {
return getCache(
ctx,
g.rcClient,
g.getGroupMemberInfoKey(groupID, userID),
g.expireTime,
func(ctx context.Context) (*relationtb.GroupMemberModel, error) {
return g.groupMemberDB.Take(ctx, groupID, userID) return g.groupMemberDB.Take(ctx, groupID, userID)
}, })
)
} }
func (g *GroupCacheRedis) GetGroupMembersInfo( func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]*relationtb.GroupMemberModel, error) {
ctx context.Context, //var keys []string
groupID string, //for _, userID := range userIDs {
userIDs []string, // keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
) ([]*relationtb.GroupMemberModel, error) { //}
var keys []string //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
for _, userID := range userIDs { // return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, nil)
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) //})
} return batchGetCache2(ctx, g.rcClient, g.expireTime, userIDs, func(userID string) string {
return batchGetCache( return g.getGroupMemberInfoKey(groupID, userID)
ctx, }, func(ctx context.Context, userID string) (*relationtb.GroupMemberModel, error) {
g.rcClient, return g.groupMemberDB.Take(ctx, groupID, userID)
keys, })
g.expireTime,
g.GetGroupMemberIndex,
func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, nil)
},
)
} }
func (g *GroupCacheRedis) GetGroupMembersPage( func (g *GroupCacheRedis) GetGroupMembersPage(ctx context.Context, groupID string, userIDs []string, showNumber, pageNumber int32) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) {
ctx context.Context,
groupID string,
userIDs []string,
showNumber, pageNumber int32,
) (total uint32, groupMembers []*relationtb.GroupMemberModel, err error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -482,72 +423,58 @@ func (g *GroupCacheRedis) GetGroupMembersPage(
userIDs = groupMemberIDs userIDs = groupMemberIDs
} }
groupMembers, err = g.GetGroupMembersInfo(ctx, groupID, utils.Paginate(userIDs, int(showNumber), int(showNumber))) groupMembers, err = g.GetGroupMembersInfo(ctx, groupID, utils.Paginate(userIDs, int(showNumber), int(showNumber)))
return uint32(len(userIDs)), groupMembers, err return uint32(len(userIDs)), groupMembers, err
} }
func (g *GroupCacheRedis) GetAllGroupMembersInfo( func (g *GroupCacheRedis) GetAllGroupMembersInfo(ctx context.Context, groupID string) (groupMembers []*relationtb.GroupMemberModel, err error) {
ctx context.Context,
groupID string,
) (groupMembers []*relationtb.GroupMemberModel, err error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs) return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
} }
func (g *GroupCacheRedis) GetAllGroupMemberInfo( func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*relationtb.GroupMemberModel, error) {
ctx context.Context,
groupID string,
) ([]*relationtb.GroupMemberModel, error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var keys []string //var keys []string
for _, groupMemberID := range groupMemberIDs { //for _, groupMemberID := range groupMemberIDs {
keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID)) // keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID))
} //}
return batchGetCache( //return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
ctx, // return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil)
g.rcClient, //})
keys, return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
g.expireTime,
g.GetGroupMemberIndex,
func(ctx context.Context) ([]*relationtb.GroupMemberModel, error) {
return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil)
},
)
} }
func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache { func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache {
var keys []string keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs { for _, userID := range userIDs {
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
} }
cache := g.NewCache() cache := g.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) { func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) {
return getCache( return getCache(ctx, g.rcClient, g.getGroupMemberNumKey(groupID), g.expireTime, func(ctx context.Context) (int64, error) {
ctx,
g.rcClient,
g.getGroupMemberNumKey(groupID),
g.expireTime,
func(ctx context.Context) (int64, error) {
return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID) return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID)
}, })
)
} }
func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache { func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache {
var keys []string keys := make([]string, 0, len(groupID))
for _, groupID := range groupID { for _, groupID := range groupID {
keys = append(keys, g.getGroupMemberNumKey(groupID)) keys = append(keys, g.getGroupMemberNumKey(groupID))
} }
cache := g.NewCache() cache := g.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"time" "time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
@ -59,27 +58,40 @@ type metaCacheRedis struct {
func (m *metaCacheRedis) ExecDel(ctx context.Context) error { func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
if len(m.keys) > 0 { if len(m.keys) > 0 {
log.ZDebug(ctx, "delete cache", "keys", m.keys) log.ZDebug(ctx, "delete cache", "keys", m.keys)
retryTimes := 0 for _, key := range m.keys {
for { for i := 0; i < m.maxRetryTimes; i++ {
if err := m.rcClient.TagAsDeletedBatch2(ctx, m.keys); err != nil { if err := m.rcClient.TagAsDeleted(key); err != nil {
if retryTimes >= m.maxRetryTimes { log.ZError(ctx, "delete cache failed", err, "key", key)
err = errs.ErrInternalServer.Wrap( time.Sleep(m.retryInterval)
fmt.Sprintf( continue
"delete cache error: %v, keys: %v, retry times %d, please check redis server",
err,
m.keys,
retryTimes,
),
)
log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", m.keys)
return err
} }
retryTimes++
} else {
break break
} }
//retryTimes := 0
//for {
// m.rcClient.TagAsDeleted()
// if err := m.rcClient.TagAsDeletedBatch2(ctx, []string{key}); err != nil {
// if retryTimes >= m.maxRetryTimes {
// err = errs.ErrInternalServer.Wrap(
// fmt.Sprintf(
// "delete cache error: %v, keys: %v, retry times %d, please check redis server",
// err,
// key,
// retryTimes,
// ),
// )
// log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", key)
// return err
// }
// retryTimes++
// } else {
// break
// }
//}
} }
} }
return nil return nil
} }
@ -103,16 +115,11 @@ func GetDefaultOpt() rockscache.Options {
opts := rockscache.NewDefaultOptions() opts := rockscache.NewDefaultOptions()
opts.StrongConsistency = true opts.StrongConsistency = true
opts.RandomExpireAdjustment = 0.2 opts.RandomExpireAdjustment = 0.2
return opts return opts
} }
func getCache[T any]( func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
ctx context.Context,
rcClient *rockscache.Client,
key string,
expire time.Duration,
fn func(ctx context.Context) (T, error),
) (T, error) {
var t T var t T
var write bool var write bool
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
@ -125,6 +132,7 @@ func getCache[T any](
return "", utils.Wrap(err, "") return "", utils.Wrap(err, "")
} }
write = true write = true
return string(bs), nil return string(bs), nil
}) })
if err != nil { if err != nil {
@ -139,95 +147,108 @@ func getCache[T any](
err = json.Unmarshal([]byte(v), &t) err = json.Unmarshal([]byte(v), &t)
if err != nil { if err != nil {
log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire)
return t, utils.Wrap(err, "") return t, utils.Wrap(err, "")
} }
return t, nil
}
func batchGetCache[T any]( return t, nil
ctx context.Context,
rcClient *rockscache.Client,
keys []string,
expire time.Duration,
keyIndexFn func(t T, keys []string) (int, error),
fn func(ctx context.Context) ([]T, error),
) ([]T, error) {
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
values := make(map[int]string)
tArrays, err := fn(ctx)
if err != nil {
return nil, err
}
for _, v := range tArrays {
index, err := keyIndexFn(v, keys)
if err != nil {
continue
}
bs, err := json.Marshal(v)
if err != nil {
return nil, utils.Wrap(err, "marshal failed")
}
values[index] = string(bs)
}
return values, nil
})
if err != nil {
return nil, err
}
var tArrays []T
for _, v := range batchMap {
if v != "" {
var t T
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return nil, utils.Wrap(err, "unmarshal failed")
}
tArrays = append(tArrays, t)
}
}
return tArrays, nil
} }
func batchGetCacheMap[T any]( //func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) {
ctx context.Context, // batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
rcClient *rockscache.Client, // values := make(map[int]string)
keys, originKeys []string, // tArrays, err := fn(ctx)
expire time.Duration, // if err != nil {
keyIndexFn func(s string, keys []string) (int, error), // return nil, err
fn func(ctx context.Context) (map[string]T, error), // }
) (map[string]T, error) { // for _, v := range tArrays {
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { // index, err := keyIndexFn(v, keys)
tArrays, err := fn(ctx) // if err != nil {
if err != nil { // continue
return nil, err // }
} // bs, err := json.Marshal(v)
values := make(map[int]string) // if err != nil {
for k, v := range tArrays { // return nil, utils.Wrap(err, "marshal failed")
index, err := keyIndexFn(k, originKeys) // }
if err != nil { // values[index] = string(bs)
continue // }
} // return values, nil
bs, err := json.Marshal(v) // })
if err != nil { // if err != nil {
return nil, utils.Wrap(err, "marshal failed") // return nil, err
} // }
values[index] = string(bs) // var tArrays []T
} // for _, v := range batchMap {
return values, nil // if v != "" {
// var t T
// err = json.Unmarshal([]byte(v), &t)
// if err != nil {
// return nil, utils.Wrap(err, "unmarshal failed")
// }
// tArrays = append(tArrays, t)
// }
// }
// return tArrays, nil
//}
func batchGetCache2[T any, K comparable](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, keys []K, keyFn func(key K) string, fns func(ctx context.Context, key K) (T, error)) ([]T, error) {
if len(keys) == 0 {
return nil, nil
}
res := make([]T, 0, len(keys))
for _, key := range keys {
val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
return fns(ctx, key)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
tMap := make(map[string]T) res = append(res, val)
for i, v := range batchMap { }
if v != "" {
var t T return res, nil
err = json.Unmarshal([]byte(v), &t) }
if err != nil {
return nil, utils.Wrap(err, "unmarshal failed") //func batchGetCacheMap[T any](
} // ctx context.Context,
tMap[originKeys[i]] = t // rcClient *rockscache.Client,
} // keys, originKeys []string,
} // expire time.Duration,
return tMap, nil // keyIndexFn func(s string, keys []string) (int, error),
} // fn func(ctx context.Context) (map[string]T, error),
//) (map[string]T, error) {
// batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
// tArrays, err := fn(ctx)
// if err != nil {
// return nil, err
// }
// values := make(map[int]string)
// for k, v := range tArrays {
// index, err := keyIndexFn(k, originKeys)
// if err != nil {
// continue
// }
// bs, err := json.Marshal(v)
// if err != nil {
// return nil, utils.Wrap(err, "marshal failed")
// }
// values[index] = string(bs)
// }
// return values, nil
// })
// if err != nil {
// return nil, err
// }
// tMap := make(map[string]T)
// for i, v := range batchMap {
// if v != "" {
// var t T
// err = json.Unmarshal([]byte(v), &t)
// if err != nil {
// return nil, utils.Wrap(err, "unmarshal failed")
// }
// tMap[originKeys[i]] = t
// }
// }
// return tMap, nil
//}

@ -16,13 +16,14 @@ package cache
import ( import (
"context" "context"
"errors"
"github.com/dtm-labs/rockscache"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"strconv" "strconv"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/dtm-labs/rockscache"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/jsonpb"
@ -33,7 +34,6 @@ import (
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
@ -105,11 +105,7 @@ type MsgModel interface {
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessagesBySeq( GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
ctx context.Context,
conversationID string,
seqs []int64,
) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error
DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
@ -122,12 +118,7 @@ type MsgModel interface {
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire( SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
ctx context.Context,
clientMsgID string,
sessionType int32,
expiration time.Duration,
) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
@ -158,50 +149,51 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string
return hasReadSeq + userID + ":" + conversationID return hasReadSeq + userID + ":" + conversationID
} }
func (c *msgCache) setSeq( func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
ctx context.Context,
conversationID string,
seq int64,
getkey func(conversationID string) string,
) error {
return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
} }
func (c *msgCache) getSeq( func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
ctx context.Context,
conversationID string,
getkey func(conversationID string) string,
) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64()) return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64())
} }
func (c *msgCache) getSeqs( func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
ctx context.Context, m = make(map[string]int64, len(items))
items []string, for i, v := range items {
getkey func(s string) string, res, err := c.rdb.Get(ctx, getkey(v)).Result()
) (m map[string]int64, err error) {
pipe := c.rdb.Pipeline()
for _, v := range items {
if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
}
result, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
m = make(map[string]int64, len(items)) val := utils.StringToInt64(res)
for i, v := range result {
seq := v.(*redis.StringCmd)
if seq.Err() != nil && seq.Err() != redis.Nil {
return nil, errs.Wrap(v.Err())
}
val := utils.StringToInt64(seq.Val())
if val != 0 { if val != 0 {
m[items[i]] = val m[items[i]] = val
} }
} }
return m, nil return m, nil
//pipe := c.rdb.Pipeline()
//for _, v := range items {
// if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil {
// return nil, errs.Wrap(err)
// }
//}
//result, err := pipe.Exec(ctx)
//if err != nil && err != redis.Nil {
// return nil, errs.Wrap(err)
//}
//m = make(map[string]int64, len(items))
//for i, v := range result {
// seq := v.(*redis.StringCmd)
// if seq.Err() != nil && seq.Err() != redis.Nil {
// return nil, errs.Wrap(v.Err())
// }
// val := utils.StringToInt64(seq.Val())
// if val != 0 {
// m[items[i]] = val
// }
//}
//return m, nil
} }
func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
@ -221,15 +213,21 @@ func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq
} }
func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
pipe := c.rdb.Pipeline() for conversationID, seq := range seqs {
for k, seq := range seqs { if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
err := pipe.Set(ctx, getkey(k), seq, 0).Err()
if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
_, err := pipe.Exec(ctx) return nil
return err //pipe := c.rdb.Pipeline()
//for k, seq := range seqs {
// err := pipe.Set(ctx, getkey(k), seq, 0).Err()
// if err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return err
} }
func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
@ -252,30 +250,17 @@ func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID
return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()) return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64())
} }
func (c *msgCache) GetConversationUserMinSeqs( func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
ctx context.Context,
conversationID string,
userIDs []string,
) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string { return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID) return c.getConversationUserMinSeqKey(conversationID, userID)
}) })
} }
func (c *msgCache) SetConversationUserMinSeq( func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
ctx context.Context,
conversationID string,
userID string,
minSeq int64,
) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
} }
func (c *msgCache) SetConversationUserMinSeqs( func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
ctx context.Context,
conversationID string,
seqs map[string]int64,
) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string { return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID) return c.getConversationUserMinSeqKey(conversationID, userID)
}) })
@ -303,11 +288,7 @@ func (c *msgCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasRea
}) })
} }
func (c *msgCache) GetHasReadSeqs( func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
ctx context.Context,
userID string,
conversationIDs []string,
) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID) return c.getHasReadSeqKey(conversationID, userID)
}) })
@ -319,6 +300,7 @@ func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversatio
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
} }
@ -332,6 +314,7 @@ func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID string, pla
for k, v := range m { for k, v := range m {
mm[k] = utils.StringToInt(v) mm[k] = utils.StringToInt(v)
} }
return mm, nil return mm, nil
} }
@ -341,11 +324,13 @@ func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platf
for k, v := range m { for k, v := range m {
mm[k] = v mm[k] = v
} }
return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err()) return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err())
} }
func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform int, fields []string) error { func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform) key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform)
return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err()) return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err())
} }
@ -357,58 +342,86 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string {
return messageCache + conversationID + "_*" return messageCache + conversationID + "_*"
} }
func (c *msgCache) GetMessagesBySeq( func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
ctx context.Context, for _, seq := range seqs {
conversationID string, res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
seqs []int64, if err != nil {
) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
pipe := c.rdb.Pipeline() failedSeqs = append(failedSeqs, seq)
for _, v := range seqs {
// MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := c.getMessageCacheKey(conversationID, v)
if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
return nil, nil, err
}
}
result, err := pipe.Exec(ctx)
for i, v := range result {
cmd := v.(*redis.StringCmd)
if cmd.Err() != nil {
failedSeqs = append(failedSeqs, seqs[i])
} else {
msg := sdkws.MsgData{}
err = msgprocessor.String2Pb(cmd.Val(), &msg)
if err == nil {
if msg.Status != constant.MsgDeleted {
seqMsgs = append(seqMsgs, &msg)
continue continue
} }
} else { msg := sdkws.MsgData{}
log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) if err = msgprocessor.String2Pb(res, &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq)
continue
} }
failedSeqs = append(failedSeqs, seqs[i]) if msg.Status == constant.MsgDeleted {
failedSeqs = append(failedSeqs, seq)
continue
} }
seqMsgs = append(seqMsgs, &msg)
} }
return seqMsgs, failedSeqs, err
return
//pipe := c.rdb.Pipeline()
//for _, v := range seqs {
// // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
// key := c.getMessageCacheKey(conversationID, v)
// if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
// return nil, nil, err
// }
//}
//result, err := pipe.Exec(ctx)
//for i, v := range result {
// cmd := v.(*redis.StringCmd)
// if cmd.Err() != nil {
// failedSeqs = append(failedSeqs, seqs[i])
// } else {
// msg := sdkws.MsgData{}
// err = msgprocessor.String2Pb(cmd.Val(), &msg)
// if err == nil {
// if msg.Status != constant.MsgDeleted {
// seqMsgs = append(seqMsgs, &msg)
// continue
// }
// } else {
// log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val())
// }
// failedSeqs = append(failedSeqs, seqs[i])
// }
//}
//return seqMsgs, failedSeqs, err
} }
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
pipe := c.rdb.Pipeline()
var failedMsgs []*sdkws.MsgData
for _, msg := range msgs { for _, msg := range msgs {
key := c.getMessageCacheKey(conversationID, msg.Seq)
s, err := msgprocessor.Pb2String(msg) s, err := msgprocessor.Pb2String(msg)
if err != nil { if err != nil {
return 0, errs.Wrap(err) return 0, errs.Wrap(err)
} }
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() key := c.getMessageCacheKey(conversationID, msg.Seq)
if err != nil { if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
failedMsgs = append(failedMsgs, msg) return 0, errs.Wrap(err)
log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs)
} }
} }
_, err := pipe.Exec(ctx) return len(msgs), nil
return len(failedMsgs), err //pipe := c.rdb.Pipeline()
//var failedMsgs []*sdkws.MsgData
//for _, msg := range msgs {
// key := c.getMessageCacheKey(conversationID, msg.Seq)
// s, err := msgprocessor.Pb2String(msg)
// if err != nil {
// return 0, errs.Wrap(err)
// }
// err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
// if err != nil {
// failedMsgs = append(failedMsgs, msg)
// log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs)
// }
//}
//_, err := pipe.Exec(ctx)
//return len(failedMsgs), err
} }
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
@ -420,27 +433,47 @@ func (c *msgCache) getUserDelList(conversationID, userID string) string {
} }
func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error { func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error {
pipe := c.rdb.Pipeline()
for _, seq := range seqs { for _, seq := range seqs {
delUserListKey := c.getMessageDelUserListKey(conversationID, seq) delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
userDelListKey := c.getUserDelList(conversationID, userID) userDelListKey := c.getUserDelList(conversationID, userID)
err := pipe.SAdd(ctx, delUserListKey, userID).Err() err := c.rdb.SAdd(ctx, delUserListKey, userID).Err()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
err = pipe.SAdd(ctx, userDelListKey, seq).Err() err = c.rdb.SAdd(ctx, userDelListKey, seq).Err()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
_, err := pipe.Exec(ctx)
return errs.Wrap(err) return nil
//pipe := c.rdb.Pipeline()
//for _, seq := range seqs {
// delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
// userDelListKey := c.getUserDelList(conversationID, userID)
// err := pipe.SAdd(ctx, delUserListKey, userID).Err()
// if err != nil {
// return errs.Wrap(err)
// }
// err = pipe.SAdd(ctx, userDelListKey, seq).Err()
// if err != nil {
// return errs.Wrap(err)
// }
// if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
// return errs.Wrap(err)
// }
// if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return errs.Wrap(err)
} }
func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) { func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) {
@ -452,6 +485,7 @@ func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID st
for i, v := range result { for i, v := range result {
seqs[i] = utils.StringToInt64(v) seqs[i] = utils.StringToInt64(v)
} }
return seqs, nil return seqs, nil
} }
@ -460,67 +494,102 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str
delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result() delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
if err != nil { if err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
continue continue
} }
if len(delUsers) > 0 { if len(delUsers) > 0 {
pipe := c.rdb.Pipeline()
var failedFlag bool var failedFlag bool
for _, userID := range delUsers { for _, userID := range delUsers {
err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err() err = c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err()
if err != nil { if err != nil {
failedFlag = true failedFlag = true
log.ZWarn( log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID)
ctx,
"DelUserDeleteMsgsList failed",
err,
"conversationID",
conversationID,
"seq",
seq,
"userID",
userID,
)
} }
} }
if !failedFlag { if !failedFlag {
if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
} }
} }
if _, err := pipe.Exec(ctx); err != nil {
log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq)
}
} }
} }
//for _, seq := range seqs {
// delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
// if err != nil {
// log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
// continue
// }
// if len(delUsers) > 0 {
// pipe := c.rdb.Pipeline()
// var failedFlag bool
// for _, userID := range delUsers {
// err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err()
// if err != nil {
// failedFlag = true
// log.ZWarn(
// ctx,
// "DelUserDeleteMsgsList failed",
// err,
// "conversationID",
// conversationID,
// "seq",
// seq,
// "userID",
// userID,
// )
// }
// }
// if !failedFlag {
// if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
// log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
// }
// }
// if _, err := pipe.Exec(ctx); err != nil {
// log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq)
// }
// }
//}
} }
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
pipe := c.rdb.Pipeline()
for _, seq := range seqs { for _, seq := range seqs {
if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { if err := c.rdb.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
_, err := pipe.Exec(ctx) return nil
return errs.Wrap(err) //pipe := c.rdb.Pipeline()
//for _, seq := range seqs {
// if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return errs.Wrap(err)
} }
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result() vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result()
if err == redis.Nil { if errors.Is(err, redis.Nil) {
return nil return nil
} }
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
pipe := c.rdb.Pipeline()
for _, v := range vals { for _, v := range vals {
if err := pipe.Del(ctx, v).Err(); err != nil { if err := c.rdb.Del(ctx, v).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
_, err = pipe.Exec(ctx) return nil
return errs.Wrap(err) //pipe := c.rdb.Pipeline()
//for _, v := range vals {
// if err := pipe.Del(ctx, v).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err = pipe.Exec(ctx)
//return errs.Wrap(err)
} }
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
@ -528,13 +597,15 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
key := c.getMessageCacheKey(userID, seq) key := c.getMessageCacheKey(userID, seq)
result, err := c.rdb.Get(ctx, key).Result() result, err := c.rdb.Get(ctx, key).Result()
if err != nil { if err != nil {
if err == redis.Nil { if errors.Is(err, redis.Nil) {
continue continue
} }
return errs.Wrap(err) return errs.Wrap(err)
} }
var msg sdkws.MsgData var msg sdkws.MsgData
if err := jsonpb.UnmarshalString(result, &msg); err != nil { err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
return err return err
} }
msg.Status = constant.MsgDeleted msg.Status = constant.MsgDeleted
@ -546,6 +617,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
return nil return nil
} }
@ -571,20 +643,12 @@ func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32
func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int() result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
return int32(result), errs.Wrap(err) return int32(result), errs.Wrap(err)
} }
func (c *msgCache) SetFcmToken( func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
ctx context.Context, return errs.Wrap(c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
account string,
platformID int,
fcmToken string,
expireTime int64,
) (err error) {
return errs.Wrap(
c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).
Err(),
)
} }
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
@ -597,6 +661,7 @@ func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID i
func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), errs.Wrap(err) return int(seq), errs.Wrap(err)
} }
@ -610,11 +675,13 @@ func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
} }
func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.Del(ctx, key).Err()) return errs.Wrap(c.rdb.Del(ctx, key).Err())
} }
@ -629,6 +696,7 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in
case constant.NotificationChatType: case constant.NotificationChatType:
return "EX_NOTIFICATION" + clientMsgID return "EX_NOTIFICATION" + clientMsgID
} }
return "" return ""
} }
@ -637,6 +705,7 @@ func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID st
if err != nil { if err != nil {
return false, utils.Wrap(err, "") return false, utils.Wrap(err, "")
} }
return n > 0, nil return n > 0, nil
} }
@ -649,21 +718,11 @@ func (c *msgCache) SetMessageTypeKeyValue(
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
} }
func (c *msgCache) SetMessageReactionExpire( func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
ctx context.Context,
clientMsgID string,
sessionType int32,
expiration time.Duration,
) (bool, error) {
return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
} }
func (c *msgCache) GetMessageTypeKeyValue( func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
ctx context.Context,
clientMsgID string,
sessionType int32,
typeKey string,
) (string, error) {
return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
} }

@ -17,6 +17,7 @@ package cache
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"hash/crc32" "hash/crc32"
"strconv" "strconv"
"time" "time"
@ -70,6 +71,7 @@ func NewUserCacheRedis(
options rockscache.Options, options rockscache.Options,
) UserCache { ) UserCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
return &UserCacheRedis{ return &UserCacheRedis{
rdb: rdb, rdb: rdb,
metaCache: NewMetaCacheRedis(rcClient), metaCache: NewMetaCacheRedis(rcClient),
@ -97,10 +99,6 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string {
return userGlobalRecvMsgOptKey + userID return userGlobalRecvMsgOptKey + userID
} }
func (u *UserCacheRedis) getUserStatusHashKey(userID string, Id int32) string {
return userID + "_" + string(Id) + platformID
}
func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) { func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) {
return getCache( return getCache(
ctx, ctx,
@ -114,36 +112,42 @@ func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userIn
} }
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) { func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) {
var keys []string //var keys []string
for _, userID := range userIDs { //for _, userID := range userIDs {
keys = append(keys, u.getUserInfoKey(userID)) // keys = append(keys, u.getUserInfoKey(userID))
} //}
return batchGetCache( //return batchGetCache(
ctx, // ctx,
u.rcClient, // u.rcClient,
keys, // keys,
u.expireTime, // u.expireTime,
func(user *relationtb.UserModel, keys []string) (int, error) { // func(user *relationtb.UserModel, keys []string) (int, error) {
for i, key := range keys { // for i, key := range keys {
if key == u.getUserInfoKey(user.UserID) { // if key == u.getUserInfoKey(user.UserID) {
return i, nil // return i, nil
} // }
} // }
return 0, errIndex // return 0, errIndex
}, // },
func(ctx context.Context) ([]*relationtb.UserModel, error) { // func(ctx context.Context) ([]*relationtb.UserModel, error) {
return u.userDB.Find(ctx, userIDs) // return u.userDB.Find(ctx, userIDs)
}, // },
) //)
return batchGetCache2(ctx, u.rcClient, u.expireTime, userIDs, func(userID string) string {
return u.getUserInfoKey(userID)
}, func(ctx context.Context, userID string) (*relationtb.UserModel, error) {
return u.userDB.Take(ctx, userID)
})
} }
func (u *UserCacheRedis) DelUsersInfo(userIDs ...string) UserCache { func (u *UserCacheRedis) DelUsersInfo(userIDs ...string) UserCache {
var keys []string keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs { for _, userID := range userIDs {
keys = append(keys, u.getUserInfoKey(userID)) keys = append(keys, u.getUserInfoKey(userID))
} }
cache := u.NewCache() cache := u.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
@ -160,22 +164,19 @@ func (u *UserCacheRedis) GetUserGlobalRecvMsgOpt(ctx context.Context, userID str
} }
func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache { func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache {
var keys []string keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs { for _, userID := range userIDs {
keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID)) keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID))
} }
cache := u.NewCache() cache := u.NewCache()
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache
}
func (u *UserCacheRedis) getOnlineStatusKey(userID string) string { return cache
return olineStatusKey + userID
} }
// GetUserStatus get user status. // GetUserStatus get user status.
func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) {
var res []*user.OnlineStatus userStatus := make([]*user.OnlineStatus, 0, len(userIDs))
for _, userID := range userIDs { for _, userID := range userIDs {
UserIDNum := crc32.ChecksumIEEE([]byte(userID)) UserIDNum := crc32.ChecksumIEEE([]byte(userID))
modKey := strconv.Itoa(int(UserIDNum % statusMod)) modKey := strconv.Itoa(int(UserIDNum % statusMod))
@ -183,13 +184,14 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([
key := olineStatusKey + modKey key := olineStatusKey + modKey
result, err := u.rdb.HGet(ctx, key, userID).Result() result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil { if err != nil {
if err == redis.Nil { if errors.Is(err, redis.Nil) {
// key or field does not exist // key or field does not exist
res = append(res, &user.OnlineStatus{ userStatus = append(userStatus, &user.OnlineStatus{
UserID: userID, UserID: userID,
Status: constant.Offline, Status: constant.Offline,
PlatformIDs: nil, PlatformIDs: nil,
}) })
continue continue
} else { } else {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -201,9 +203,10 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([
} }
onlineStatus.UserID = userID onlineStatus.UserID = userID
onlineStatus.Status = constant.Online onlineStatus.Status = constant.Online
res = append(res, &onlineStatus) userStatus = append(userStatus, &onlineStatus)
} }
return res, nil
return userStatus, nil
} }
// SetUserStatus Set the user status and save it in redis. // SetUserStatus Set the user status and save it in redis.
@ -224,15 +227,16 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu
Status: constant.Online, Status: constant.Online,
PlatformIDs: []int32{platformID}, PlatformIDs: []int32{platformID},
} }
jsonData, err := json.Marshal(onlineStatus) jsonData, err2 := json.Marshal(&onlineStatus)
if err != nil { if err2 != nil {
return errs.Wrap(err) return errs.Wrap(err2)
} }
_, err = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result() _, err2 = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result()
if err != nil { if err2 != nil {
return errs.Wrap(err) return errs.Wrap(err2)
} }
u.rdb.Expire(ctx, key, userOlineStatusExpireTime) u.rdb.Expire(ctx, key, userOlineStatusExpireTime)
return nil return nil
} }
} }
@ -240,7 +244,7 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu
isNil := false isNil := false
result, err := u.rdb.HGet(ctx, key, userID).Result() result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil { if err != nil {
if err == redis.Nil { if errors.Is(err, redis.Nil) {
isNil = true isNil = true
} else { } else {
return errs.Wrap(err) return errs.Wrap(err)
@ -248,9 +252,25 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu
} }
if status == constant.Offline { if status == constant.Offline {
err = u.refreshStatusOffline(ctx, userID, status, platformID, isNil, err, result, key)
if err != nil {
return err
}
} else {
err = u.refreshStatusOnline(ctx, userID, platformID, isNil, err, result, key)
if err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (u *UserCacheRedis) refreshStatusOffline(ctx context.Context, userID string, status, platformID int32, isNil bool, err error, result, key string) error {
if isNil { if isNil {
log.ZWarn(ctx, "this user not online,maybe trigger order not right", log.ZWarn(ctx, "this user not online,maybe trigger order not right",
err, "userStatus", status) err, "userStatus", status)
return nil return nil
} }
var onlineStatus user.OnlineStatus var onlineStatus user.OnlineStatus
@ -280,12 +300,16 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
} else {
return nil
}
func (u *UserCacheRedis) refreshStatusOnline(ctx context.Context, userID string, platformID int32, isNil bool, err error, result, key string) error {
var onlineStatus user.OnlineStatus var onlineStatus user.OnlineStatus
if !isNil { if !isNil {
err = json.Unmarshal([]byte(result), &onlineStatus) err2 := json.Unmarshal([]byte(result), &onlineStatus)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err2)
} }
onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID)) onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID))
} else { } else {
@ -302,8 +326,6 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, statu
return errs.Wrap(err) return errs.Wrap(err)
} }
}
return nil return nil
} }

Loading…
Cancel
Save