# Conflicts: # pkg/common/localcache/cache.gopull/2036/head
commit
41d2c7be7a
@ -1,15 +1,15 @@
|
||||
package cachekey
|
||||
|
||||
const (
|
||||
blackIDsKey = "BLACK_IDS:"
|
||||
isBlackKey = "IS_BLACK:"
|
||||
BlackIDsKey = "BLACK_IDS:"
|
||||
IsBlackKey = "IS_BLACK:" // local cache
|
||||
)
|
||||
|
||||
func GetBlackIDsKey(ownerUserID string) string {
|
||||
return blackIDsKey + ownerUserID
|
||||
return BlackIDsKey + ownerUserID
|
||||
|
||||
}
|
||||
|
||||
func GetIsBlackIDsKey(possibleBlackUserID, userID string) string {
|
||||
return isBlackKey + userID + "-" + possibleBlackUserID
|
||||
return IsBlackKey + userID + "-" + possibleBlackUserID
|
||||
}
|
||||
|
@ -1,48 +1,44 @@
|
||||
package cachekey
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
conversationKey = "CONVERSATION:"
|
||||
conversationIDsKey = "CONVERSATION_IDS:"
|
||||
conversationIDsHashKey = "CONVERSATION_IDS_HASH:"
|
||||
conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
|
||||
recvMsgOptKey = "RECV_MSG_OPT:"
|
||||
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
||||
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
|
||||
conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
|
||||
|
||||
conversationExpireTime = time.Second * 60 * 60 * 12
|
||||
ConversationKey = "CONVERSATION:"
|
||||
ConversationIDsKey = "CONVERSATION_IDS:"
|
||||
ConversationIDsHashKey = "CONVERSATION_IDS_HASH:"
|
||||
ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
|
||||
RecvMsgOptKey = "RECV_MSG_OPT:"
|
||||
SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
||||
SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
|
||||
ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
|
||||
)
|
||||
|
||||
func GetConversationKey(ownerUserID, conversationID string) string {
|
||||
return conversationKey + ownerUserID + ":" + conversationID
|
||||
return ConversationKey + ownerUserID + ":" + conversationID
|
||||
}
|
||||
|
||||
func GetConversationIDsKey(ownerUserID string) string {
|
||||
return conversationIDsKey + ownerUserID
|
||||
return ConversationIDsKey + ownerUserID
|
||||
}
|
||||
|
||||
func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
|
||||
return superGroupRecvMsgNotNotifyUserIDsKey + groupID
|
||||
return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID
|
||||
}
|
||||
|
||||
func GetRecvMsgOptKey(ownerUserID, conversationID string) string {
|
||||
return recvMsgOptKey + ownerUserID + ":" + conversationID
|
||||
return RecvMsgOptKey + ownerUserID + ":" + conversationID
|
||||
}
|
||||
|
||||
func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
|
||||
return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID
|
||||
return SuperGroupRecvMsgNotNotifyUserIDsHashKey + groupID
|
||||
}
|
||||
|
||||
func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string {
|
||||
return conversationHasReadSeqKey + ownerUserID + ":" + conversationID
|
||||
return ConversationHasReadSeqKey + ownerUserID + ":" + conversationID
|
||||
}
|
||||
|
||||
func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string {
|
||||
return conversationNotReceiveMessageUserIDsKey + conversationID
|
||||
return ConversationNotReceiveMessageUserIDsKey + conversationID
|
||||
}
|
||||
|
||||
func GetUserConversationIDsHashKey(ownerUserID string) string {
|
||||
return conversationIDsHashKey + ownerUserID
|
||||
return ConversationIDsHashKey + ownerUserID
|
||||
}
|
||||
|
@ -1,27 +1,24 @@
|
||||
package cachekey
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
friendExpireTime = time.Second * 60 * 60 * 12
|
||||
friendIDsKey = "FRIEND_IDS:"
|
||||
twoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:"
|
||||
friendKey = "FRIEND_INFO:"
|
||||
isFriendKey = "IS_FRIEND:"
|
||||
FriendIDsKey = "FRIEND_IDS:"
|
||||
TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:"
|
||||
FriendKey = "FRIEND_INFO:"
|
||||
IsFriendKey = "IS_FRIEND:" // local cache key
|
||||
)
|
||||
|
||||
func GetFriendIDsKey(ownerUserID string) string {
|
||||
return friendIDsKey + ownerUserID
|
||||
return FriendIDsKey + ownerUserID
|
||||
}
|
||||
|
||||
func GetTwoWayFriendsIDsKey(ownerUserID string) string {
|
||||
return twoWayFriendsIDsKey + ownerUserID
|
||||
return TwoWayFriendsIDsKey + ownerUserID
|
||||
}
|
||||
|
||||
func GetFriendKey(ownerUserID, friendUserID string) string {
|
||||
return friendKey + ownerUserID + "-" + friendUserID
|
||||
return FriendKey + ownerUserID + "-" + friendUserID
|
||||
}
|
||||
|
||||
func GetIsFriendKey(possibleFriendUserID, userID string) string {
|
||||
return isFriendKey + possibleFriendUserID + "-" + userID
|
||||
return IsFriendKey + possibleFriendUserID + "-" + userID
|
||||
}
|
||||
|
@ -1,23 +0,0 @@
|
||||
package cachekey
|
||||
|
||||
const (
|
||||
maxSeq = "MAX_SEQ:"
|
||||
minSeq = "MIN_SEQ:"
|
||||
conversationUserMinSeq = "CON_USER_MIN_SEQ:"
|
||||
hasReadSeq = "HAS_READ_SEQ:"
|
||||
|
||||
//appleDeviceToken = "DEVICE_TOKEN"
|
||||
getuiToken = "GETUI_TOKEN"
|
||||
getuiTaskID = "GETUI_TASK_ID"
|
||||
//signalCache = "SIGNAL_CACHE:"
|
||||
//signalListCache = "SIGNAL_LIST_CACHE:"
|
||||
FCM_TOKEN = "FCM_TOKEN:"
|
||||
|
||||
messageCache = "MESSAGE_CACHE:"
|
||||
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
|
||||
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
|
||||
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
|
||||
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
|
||||
exTypeKeyLocker = "EX_LOCK:"
|
||||
uidPidToken = "UID_PID_TOKEN_STATUS:"
|
||||
)
|
@ -1 +0,0 @@
|
||||
package cachekey
|
@ -1,21 +0,0 @@
|
||||
package cachekey
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
userExpireTime = time.Second * 60 * 60 * 12
|
||||
userInfoKey = "USER_INFO:"
|
||||
userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
|
||||
olineStatusKey = "ONLINE_STATUS:"
|
||||
userOlineStatusExpireTime = time.Second * 60 * 60 * 24
|
||||
statusMod = 501
|
||||
platformID = "_PlatformIDSuffix"
|
||||
)
|
||||
|
||||
func GetUserInfoKey(userID string) string {
|
||||
return userInfoKey + userID
|
||||
}
|
||||
|
||||
func GetUserGlobalRecvMsgOptKey(userID string) string {
|
||||
return userGlobalRecvMsgOptKey + userID
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
subscribe map[string][]string
|
||||
)
|
||||
|
||||
func getPublishKey(topic string, key []string) []string {
|
||||
if topic == "" || len(key) == 0 {
|
||||
return nil
|
||||
}
|
||||
once.Do(func() {
|
||||
list := []struct {
|
||||
Local config.LocalCache
|
||||
Keys []string
|
||||
}{
|
||||
{
|
||||
Local: config.Config.LocalCache.Group,
|
||||
Keys: []string{cachekey.GroupMemberIDsKey},
|
||||
},
|
||||
{
|
||||
Local: config.Config.LocalCache.Friend,
|
||||
Keys: []string{cachekey.FriendIDsKey, cachekey.BlackIDsKey},
|
||||
},
|
||||
{
|
||||
Local: config.Config.LocalCache.Conversation,
|
||||
Keys: []string{cachekey.ConversationIDsKey},
|
||||
},
|
||||
}
|
||||
subscribe = make(map[string][]string)
|
||||
for _, v := range list {
|
||||
if v.Local.Enable() {
|
||||
subscribe[v.Local.Topic] = v.Keys
|
||||
}
|
||||
}
|
||||
})
|
||||
prefix, ok := subscribe[topic]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
res := make([]string, 0, len(key))
|
||||
for _, k := range key {
|
||||
var exist bool
|
||||
for _, p := range prefix {
|
||||
if strings.HasPrefix(k, p) {
|
||||
exist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if exist {
|
||||
res = append(res, k)
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
@ -1,87 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package localcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/conversation"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
)
|
||||
|
||||
type ConversationLocalCache struct {
|
||||
lock sync.Mutex
|
||||
superGroupRecvMsgNotNotifyUserIDs map[string]Hash
|
||||
conversationIDs map[string]Hash
|
||||
client *rpcclient.ConversationRpcClient
|
||||
}
|
||||
|
||||
type Hash struct {
|
||||
hash uint64
|
||||
ids []string
|
||||
}
|
||||
|
||||
func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache {
|
||||
return &ConversationLocalCache{
|
||||
superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
|
||||
conversationIDs: make(map[string]Hash),
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
|
||||
resp, err := g.client.Client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{
|
||||
GroupID: groupID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.UserIDs, nil
|
||||
}
|
||||
|
||||
func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
|
||||
resp, err := g.client.Client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{
|
||||
OwnerUserID: userID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
g.lock.Lock()
|
||||
hash, ok := g.conversationIDs[userID]
|
||||
g.lock.Unlock()
|
||||
|
||||
if !ok || hash.hash != resp.Hash {
|
||||
conversationIDsResp, err := g.client.Client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{
|
||||
UserID: userID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
g.conversationIDs[userID] = Hash{
|
||||
hash: resp.Hash,
|
||||
ids: conversationIDsResp.ConversationIDs,
|
||||
}
|
||||
|
||||
return conversationIDsResp.ConversationIDs, nil
|
||||
}
|
||||
|
||||
return hash.ids, nil
|
||||
}
|
@ -1,15 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package localcache // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
|
@ -1,78 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package localcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/group"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
)
|
||||
|
||||
type GroupLocalCache struct {
|
||||
lock sync.Mutex
|
||||
cache map[string]GroupMemberIDsHash
|
||||
client *rpcclient.GroupRpcClient
|
||||
}
|
||||
|
||||
type GroupMemberIDsHash struct {
|
||||
memberListHash uint64
|
||||
userIDs []string
|
||||
}
|
||||
|
||||
func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache {
|
||||
return &GroupLocalCache{
|
||||
cache: make(map[string]GroupMemberIDsHash, 0),
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
|
||||
resp, err := g.client.Client.GetGroupAbstractInfo(ctx, &group.GetGroupAbstractInfoReq{
|
||||
GroupIDs: []string{groupID},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(resp.GroupAbstractInfos) < 1 {
|
||||
return nil, errs.ErrGroupIDNotFound
|
||||
}
|
||||
|
||||
g.lock.Lock()
|
||||
localHashInfo, ok := g.cache[groupID]
|
||||
if ok && localHashInfo.memberListHash == resp.GroupAbstractInfos[0].GroupMemberListHash {
|
||||
g.lock.Unlock()
|
||||
return localHashInfo.userIDs, nil
|
||||
}
|
||||
g.lock.Unlock()
|
||||
|
||||
groupMembersResp, err := g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{
|
||||
GroupID: groupID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
g.cache[groupID] = GroupMemberIDsHash{
|
||||
memberListHash: resp.GroupAbstractInfos[0].GroupMemberListHash,
|
||||
userIDs: groupMembersResp.UserIDs,
|
||||
}
|
||||
return g.cache[groupID].userIDs, nil
|
||||
}
|
@ -1,15 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package localcache
|
@ -1,70 +0,0 @@
|
||||
package localcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option {
|
||||
return WithDeleteLocal(func(fn func(key ...string)) {
|
||||
if fn == nil {
|
||||
log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic)
|
||||
return
|
||||
}
|
||||
msg := cli.Subscribe(context.Background(), topic).Channel()
|
||||
for m := range msg {
|
||||
log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload)
|
||||
var key []string
|
||||
if err := json.Unmarshal([]byte(m.Payload), &key); err != nil {
|
||||
log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload)
|
||||
continue
|
||||
}
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
fn(key...)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option {
|
||||
return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
|
||||
data, err := json.Marshal(key)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key)
|
||||
return
|
||||
}
|
||||
if err := cli.Publish(ctx, topic, data).Err(); err != nil {
|
||||
log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key)
|
||||
} else {
|
||||
log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func WithRedisDelete(cli redis.UniversalClient) Option {
|
||||
return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
|
||||
for _, s := range key {
|
||||
if err := cli.Del(ctx, s).Err(); err != nil {
|
||||
log.ZError(ctx, "redis delete error", err, "key", s)
|
||||
} else {
|
||||
log.ZDebug(ctx, "redis delete success", "key", s)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func WithRocksCacheDelete(cli *rockscache.Client) Option {
|
||||
return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
|
||||
for _, k := range key {
|
||||
if err := cli.TagAsDeleted2(ctx, k); err != nil {
|
||||
log.ZError(ctx, "rocksdb delete error", err, "key", k)
|
||||
} else {
|
||||
log.ZDebug(ctx, "rocksdb delete success", "key", k)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
@ -1,108 +0,0 @@
|
||||
package localcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link"
|
||||
opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
|
||||
"hash/fnv"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
const TimingWheelSize = 500
|
||||
|
||||
type Cache[V any] interface {
|
||||
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error)
|
||||
Del(ctx context.Context, key ...string)
|
||||
}
|
||||
|
||||
func NewCache[V any](expire time.Duration, opts ...Option) Cache[V] {
|
||||
opt := defaultOption()
|
||||
for _, o := range opts {
|
||||
o(opt)
|
||||
}
|
||||
c := &cache[V]{
|
||||
opt: opt,
|
||||
link: link.New(opt.localSlotNum),
|
||||
n: uint64(opt.localSlotNum),
|
||||
}
|
||||
c.timingWheel = NewTimeWheel[string, V](TimingWheelSize, time.Second, c.exec)
|
||||
go c.timingWheel.Start()
|
||||
for i := 0; i < opt.localSlotNum; i++ {
|
||||
c.slots[i] = NewLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||
}
|
||||
go func() {
|
||||
c.opt.delCh(c.del)
|
||||
}()
|
||||
return c
|
||||
}
|
||||
|
||||
type cache[V any] struct {
|
||||
n uint64
|
||||
slots []*LRU[string, V]
|
||||
expire time.Duration
|
||||
opt *option
|
||||
link link.Link
|
||||
timingWheel *TimeWheel[string, V]
|
||||
}
|
||||
|
||||
func (c *cache[V]) index(key string) uint64 {
|
||||
h := fnv.New64a()
|
||||
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&key)))
|
||||
return h.Sum64() % c.n
|
||||
}
|
||||
|
||||
func (c *cache[V]) onEvict(key string, value V) {
|
||||
lks := c.link.Del(key)
|
||||
for k := range lks {
|
||||
if key != k { // prevent deadlock
|
||||
c.slots[c.index(k)].Del(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache[V]) del(key ...string) {
|
||||
for _, k := range key {
|
||||
lks := c.link.Del(k)
|
||||
c.slots[c.index(k)].Del(k)
|
||||
for k := range lks {
|
||||
c.slots[c.index(k)].Del(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) {
|
||||
enable := c.opt.enable
|
||||
if len(opts) > 0 && opts[0].Enable != nil {
|
||||
enable = *opts[0].Enable
|
||||
}
|
||||
if enable {
|
||||
if len(opts) > 0 && len(opts[0].Link) > 0 {
|
||||
c.link.Link(key, opts[0].Link...)
|
||||
}
|
||||
return c.slots[c.index(key)].Get(key, func() (V, error) {
|
||||
return fetch(ctx)
|
||||
})
|
||||
} else {
|
||||
return fetch(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache[V]) Del(ctx context.Context, key ...string) {
|
||||
if len(key) == 0 {
|
||||
return
|
||||
}
|
||||
for _, fn := range c.opt.delFn {
|
||||
fn(ctx, key...)
|
||||
}
|
||||
if c.opt.enable {
|
||||
c.del(key...)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache[V]) set(key string, value V) {
|
||||
|
||||
}
|
||||
func (c *cache[V]) exec(key string, _ V) {
|
||||
c.del(key)
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
package localcache
|
||||
|
||||
//func TestName(t *testing.T) {
|
||||
// target := &cacheTarget{}
|
||||
// l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil)
|
||||
// //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target)
|
||||
//
|
||||
// fn := func(key string, n int, fetch func() (string, error)) {
|
||||
// for i := 0; i < n; i++ {
|
||||
// //v, err := l.Get(key, fetch)
|
||||
// //if err == nil {
|
||||
// // t.Log("key", key, "value", v)
|
||||
// //} else {
|
||||
// // t.Error("key", key, err)
|
||||
// //}
|
||||
// l.Get(key, fetch)
|
||||
// //time.Sleep(time.Second / 100)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// tmp := make(map[string]struct{})
|
||||
//
|
||||
// var wg sync.WaitGroup
|
||||
// for i := 0; i < 10000; i++ {
|
||||
// wg.Add(1)
|
||||
// key := fmt.Sprintf("key_%d", i%200)
|
||||
// tmp[key] = struct{}{}
|
||||
// go func() {
|
||||
// defer wg.Done()
|
||||
// //t.Log(key)
|
||||
// fn(key, 10000, func() (string, error) {
|
||||
// //time.Sleep(time.Second * 3)
|
||||
// //t.Log(time.Now(), "key", key, "fetch")
|
||||
// //if rand.Uint32()%5 == 0 {
|
||||
// // return "value_" + key, nil
|
||||
// //}
|
||||
// //return "", errors.New("rand error")
|
||||
// return "value_" + key, nil
|
||||
// })
|
||||
// }()
|
||||
//
|
||||
// //wg.Add(1)
|
||||
// //go func() {
|
||||
// // defer wg.Done()
|
||||
// // for i := 0; i < 10; i++ {
|
||||
// // l.Del(key)
|
||||
// // time.Sleep(time.Second / 3)
|
||||
// // }
|
||||
// //}()
|
||||
// }
|
||||
// wg.Wait()
|
||||
// t.Log(len(tmp))
|
||||
// t.Log(target.String())
|
||||
//
|
||||
//}
|
@ -1,43 +0,0 @@
|
||||
package localcache
|
||||
|
||||
import "sync"
|
||||
|
||||
type call[K comparable, V any] struct {
|
||||
wg sync.WaitGroup
|
||||
val V
|
||||
err error
|
||||
}
|
||||
|
||||
type SingleFlight[K comparable, V any] struct {
|
||||
mu sync.Mutex
|
||||
m map[K]*call[K, V]
|
||||
}
|
||||
|
||||
func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V] {
|
||||
return &SingleFlight[K, V]{m: make(map[K]*call[K, V])}
|
||||
}
|
||||
|
||||
func (r *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (V, error) {
|
||||
r.mu.Lock()
|
||||
if r.m == nil {
|
||||
r.m = make(map[K]*call[K, V])
|
||||
}
|
||||
if c, ok := r.m[key]; ok {
|
||||
r.mu.Unlock()
|
||||
c.wg.Wait()
|
||||
return c.val, c.err
|
||||
}
|
||||
c := new(call[K, V])
|
||||
c.wg.Add(1)
|
||||
r.m[key] = c
|
||||
r.mu.Unlock()
|
||||
|
||||
c.val, c.err = fn()
|
||||
c.wg.Done()
|
||||
|
||||
r.mu.Lock()
|
||||
delete(r.m, key)
|
||||
r.mu.Unlock()
|
||||
|
||||
return c.val, c.err
|
||||
}
|
@ -1,59 +0,0 @@
|
||||
package localcache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type Target interface {
|
||||
IncrGetHit()
|
||||
IncrGetSuccess()
|
||||
IncrGetFailed()
|
||||
|
||||
IncrDelHit()
|
||||
IncrDelNotFound()
|
||||
}
|
||||
|
||||
type cacheTarget struct {
|
||||
getHit int64
|
||||
getSuccess int64
|
||||
getFailed int64
|
||||
delHit int64
|
||||
delNotFound int64
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrGetHit() {
|
||||
atomic.AddInt64(&r.getHit, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrGetSuccess() {
|
||||
atomic.AddInt64(&r.getSuccess, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrGetFailed() {
|
||||
atomic.AddInt64(&r.getFailed, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrDelHit() {
|
||||
atomic.AddInt64(&r.delHit, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrDelNotFound() {
|
||||
atomic.AddInt64(&r.delNotFound, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) String() string {
|
||||
return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound)
|
||||
}
|
||||
|
||||
type emptyTarget struct{}
|
||||
|
||||
func (e emptyTarget) IncrGetHit() {}
|
||||
|
||||
func (e emptyTarget) IncrGetSuccess() {}
|
||||
|
||||
func (e emptyTarget) IncrGetFailed() {}
|
||||
|
||||
func (e emptyTarget) IncrDelHit() {}
|
||||
|
||||
func (e emptyTarget) IncrDelNotFound() {}
|
@ -1,71 +0,0 @@
|
||||
package localcache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Execute[K comparable, V any] func(K, V)
|
||||
|
||||
type Task[K comparable, V any] struct {
|
||||
key K
|
||||
value V
|
||||
}
|
||||
|
||||
type TimeWheel[K comparable, V any] struct {
|
||||
ticker *time.Ticker
|
||||
slots [][]Task[K, V]
|
||||
currentPos int
|
||||
size int
|
||||
slotMutex sync.Mutex
|
||||
execute Execute[K, V]
|
||||
}
|
||||
|
||||
func NewTimeWheel[K comparable, V any](size int, tickDuration time.Duration, execute Execute[K, V]) *TimeWheel[K, V] {
|
||||
return &TimeWheel[K, V]{
|
||||
ticker: time.NewTicker(tickDuration),
|
||||
slots: make([][]Task[K, V], size),
|
||||
currentPos: 0,
|
||||
size: size,
|
||||
execute: execute,
|
||||
}
|
||||
}
|
||||
|
||||
func (tw *TimeWheel[K, V]) Start() {
|
||||
for range tw.ticker.C {
|
||||
tw.tick()
|
||||
}
|
||||
}
|
||||
|
||||
func (tw *TimeWheel[K, V]) Stop() {
|
||||
tw.ticker.Stop()
|
||||
}
|
||||
|
||||
func (tw *TimeWheel[K, V]) tick() {
|
||||
tw.slotMutex.Lock()
|
||||
defer tw.slotMutex.Unlock()
|
||||
|
||||
tasks := tw.slots[tw.currentPos]
|
||||
tw.slots[tw.currentPos] = nil
|
||||
if len(tasks) > 0 {
|
||||
go func(tasks []Task[K, V]) {
|
||||
for _, task := range tasks {
|
||||
tw.execute(task.key, task.value)
|
||||
}
|
||||
}(tasks)
|
||||
}
|
||||
|
||||
tw.currentPos = (tw.currentPos + 1) % tw.size
|
||||
}
|
||||
|
||||
func (tw *TimeWheel[K, V]) AddTask(delay int, task Task[K, V]) {
|
||||
if delay < 0 || delay >= tw.size {
|
||||
return
|
||||
}
|
||||
|
||||
tw.slotMutex.Lock()
|
||||
defer tw.slotMutex.Unlock()
|
||||
|
||||
pos := (tw.currentPos + delay) % tw.size
|
||||
tw.slots[pos] = append(tw.slots[pos], task)
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package localcache
|
||||
|
||||
//
|
||||
//import (
|
||||
// "context"
|
||||
// "encoding/json"
|
||||
// "github.com/OpenIMSDK/tools/log"
|
||||
// "github.com/dtm-labs/rockscache"
|
||||
// "github.com/redis/go-redis/v9"
|
||||
//)
|
||||
//
|
||||
//func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option {
|
||||
// return WithDeleteLocal(func(fn func(key ...string)) {
|
||||
// if fn == nil {
|
||||
// log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic)
|
||||
// return
|
||||
// }
|
||||
// msg := cli.Subscribe(context.Background(), topic).Channel()
|
||||
// for m := range msg {
|
||||
// log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload)
|
||||
// var key []string
|
||||
// if err := json.Unmarshal([]byte(m.Payload), &key); err != nil {
|
||||
// log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload)
|
||||
// continue
|
||||
// }
|
||||
// if len(key) == 0 {
|
||||
// continue
|
||||
// }
|
||||
// fn(key...)
|
||||
// }
|
||||
// })
|
||||
//}
|
||||
//
|
||||
//func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option {
|
||||
// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
|
||||
// data, err := json.Marshal(key)
|
||||
// if err != nil {
|
||||
// log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key)
|
||||
// return
|
||||
// }
|
||||
// if err := cli.Publish(ctx, topic, data).Err(); err != nil {
|
||||
// log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key)
|
||||
// } else {
|
||||
// log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key)
|
||||
// }
|
||||
// })
|
||||
//}
|
||||
//
|
||||
//func WithRedisDelete(cli redis.UniversalClient) Option {
|
||||
// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
|
||||
// for _, s := range key {
|
||||
// if err := cli.Del(ctx, s).Err(); err != nil {
|
||||
// log.ZError(ctx, "redis delete error", err, "key", s)
|
||||
// } else {
|
||||
// log.ZDebug(ctx, "redis delete success", "key", s)
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
//}
|
||||
//
|
||||
//func WithRocksCacheDelete(cli *rockscache.Client) Option {
|
||||
// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
|
||||
// for _, k := range key {
|
||||
// if err := cli.TagAsDeleted2(ctx, k); err != nil {
|
||||
// log.ZError(ctx, "rocksdb delete error", err, "key", k)
|
||||
// } else {
|
||||
// log.ZDebug(ctx, "rocksdb delete success", "key", k)
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
//}
|
@ -0,0 +1,85 @@
|
||||
package localcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/localcache/link"
|
||||
"github.com/openimsdk/localcache/local"
|
||||
lopt "github.com/openimsdk/localcache/option"
|
||||
)
|
||||
|
||||
type Cache[V any] interface {
|
||||
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error)
|
||||
Del(ctx context.Context, key ...string)
|
||||
}
|
||||
|
||||
func New[V any](opts ...Option) Cache[V] {
|
||||
opt := defaultOption()
|
||||
for _, o := range opts {
|
||||
o(opt)
|
||||
}
|
||||
c := cache[V]{opt: opt}
|
||||
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
|
||||
c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||
go func() {
|
||||
c.opt.delCh(c.del)
|
||||
}()
|
||||
if opt.linkSlotNum > 0 {
|
||||
c.link = link.New(opt.linkSlotNum)
|
||||
}
|
||||
}
|
||||
return &c
|
||||
}
|
||||
|
||||
type cache[V any] struct {
|
||||
opt *option
|
||||
link link.Link
|
||||
local local.Cache[V]
|
||||
}
|
||||
|
||||
func (c *cache[V]) onEvict(key string, value V) {
|
||||
if c.link != nil {
|
||||
lks := c.link.Del(key)
|
||||
for k := range lks {
|
||||
if key != k { // prevent deadlock
|
||||
c.local.Del(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache[V]) del(key ...string) {
|
||||
for _, k := range key {
|
||||
lks := c.link.Del(k)
|
||||
c.local.Del(k)
|
||||
for k := range lks {
|
||||
c.local.Del(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) {
|
||||
if c.local != nil {
|
||||
return c.local.Get(key, func() (V, error) {
|
||||
if c.link != nil {
|
||||
for _, o := range opts {
|
||||
c.link.Link(key, o.Link...)
|
||||
}
|
||||
}
|
||||
return fetch(ctx)
|
||||
})
|
||||
} else {
|
||||
return fetch(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache[V]) Del(ctx context.Context, key ...string) {
|
||||
if len(key) == 0 {
|
||||
return
|
||||
}
|
||||
for _, fn := range c.opt.delFn {
|
||||
fn(ctx, key...)
|
||||
}
|
||||
if c.local != nil {
|
||||
c.del(key...)
|
||||
}
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
module github.com/openimsdk/localcache
|
||||
|
||||
go 1.19
|
||||
|
||||
require github.com/hashicorp/golang-lru/v2 v2.0.7
|
@ -0,0 +1,50 @@
|
||||
package local
|
||||
|
||||
import (
|
||||
"hash/fnv"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type Cache[V any] interface {
|
||||
Get(key string, fetch func() (V, error)) (V, error)
|
||||
Del(key string) bool
|
||||
}
|
||||
|
||||
func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[string, V]) Cache[V] {
|
||||
c := &slot[V]{
|
||||
n: uint64(slotNum),
|
||||
slots: make([]*LRU[string, V], slotNum),
|
||||
target: target,
|
||||
}
|
||||
for i := 0; i < slotNum; i++ {
|
||||
c.slots[i] = NewLRU[string, V](slotSize, successTTL, failedTTL, c.target, onEvict)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
type slot[V any] struct {
|
||||
n uint64
|
||||
slots []*LRU[string, V]
|
||||
target Target
|
||||
}
|
||||
|
||||
func (c *slot[V]) index(s string) uint64 {
|
||||
h := fnv.New64a()
|
||||
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s)))
|
||||
return h.Sum64() % c.n
|
||||
}
|
||||
|
||||
func (c *slot[V]) Get(key string, fetch func() (V, error)) (V, error) {
|
||||
return c.slots[c.index(key)].Get(key, fetch)
|
||||
}
|
||||
|
||||
func (c *slot[V]) Del(key string) bool {
|
||||
if c.slots[c.index(key)].Del(key) {
|
||||
c.target.IncrDelHit()
|
||||
return true
|
||||
} else {
|
||||
c.target.IncrDelNotFound()
|
||||
return false
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package localcache
|
||||
package local
|
||||
|
||||
import "github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
|
@ -0,0 +1,95 @@
|
||||
package local
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type cacheTarget struct {
|
||||
getHit int64
|
||||
getSuccess int64
|
||||
getFailed int64
|
||||
delHit int64
|
||||
delNotFound int64
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrGetHit() {
|
||||
atomic.AddInt64(&r.getHit, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrGetSuccess() {
|
||||
atomic.AddInt64(&r.getSuccess, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrGetFailed() {
|
||||
atomic.AddInt64(&r.getFailed, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrDelHit() {
|
||||
atomic.AddInt64(&r.delHit, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) IncrDelNotFound() {
|
||||
atomic.AddInt64(&r.delNotFound, 1)
|
||||
}
|
||||
|
||||
func (r *cacheTarget) String() string {
|
||||
return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound)
|
||||
}
|
||||
|
||||
func TestName(t *testing.T) {
|
||||
target := &cacheTarget{}
|
||||
l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil)
|
||||
//l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target)
|
||||
|
||||
fn := func(key string, n int, fetch func() (string, error)) {
|
||||
for i := 0; i < n; i++ {
|
||||
//v, err := l.Get(key, fetch)
|
||||
//if err == nil {
|
||||
// t.Log("key", key, "value", v)
|
||||
//} else {
|
||||
// t.Error("key", key, err)
|
||||
//}
|
||||
l.Get(key, fetch)
|
||||
//time.Sleep(time.Second / 100)
|
||||
}
|
||||
}
|
||||
|
||||
tmp := make(map[string]struct{})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10000; i++ {
|
||||
wg.Add(1)
|
||||
key := fmt.Sprintf("key_%d", i%200)
|
||||
tmp[key] = struct{}{}
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
//t.Log(key)
|
||||
fn(key, 10000, func() (string, error) {
|
||||
//time.Sleep(time.Second * 3)
|
||||
//t.Log(time.Now(), "key", key, "fetch")
|
||||
//if rand.Uint32()%5 == 0 {
|
||||
// return "value_" + key, nil
|
||||
//}
|
||||
//return "", errors.New("rand error")
|
||||
return "value_" + key, nil
|
||||
})
|
||||
}()
|
||||
|
||||
//wg.Add(1)
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
// for i := 0; i < 10; i++ {
|
||||
// l.Del(key)
|
||||
// time.Sleep(time.Second / 3)
|
||||
// }
|
||||
//}()
|
||||
}
|
||||
wg.Wait()
|
||||
t.Log(len(tmp))
|
||||
t.Log(target.String())
|
||||
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package local
|
||||
|
||||
type Target interface {
|
||||
IncrGetHit()
|
||||
IncrGetSuccess()
|
||||
IncrGetFailed()
|
||||
|
||||
IncrDelHit()
|
||||
IncrDelNotFound()
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package rpccache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/localcache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache {
|
||||
return &ConversationLocalCache{
|
||||
local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Conversation.Topic, cli)),
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type ConversationLocalCache struct {
|
||||
local localcache.Cache[any]
|
||||
client rpcclient.ConversationRpcClient
|
||||
}
|
||||
|
||||
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
|
||||
return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) {
|
||||
return c.client.GetConversationIDs(ctx, ownerUserID)
|
||||
}))
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package rpccache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/localcache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache {
|
||||
return &GroupLocalCache{
|
||||
local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Group.Topic, cli)),
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type GroupLocalCache struct {
|
||||
local localcache.Cache[any]
|
||||
client rpcclient.GroupRpcClient
|
||||
}
|
||||
|
||||
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
|
||||
return localcache.AnyValue[[]string](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) {
|
||||
return g.client.GetGroupMemberIDs(ctx, groupID)
|
||||
}))
|
||||
}
|
Loading…
Reference in new issue