diff --git a/config/openim-push.yml b/config/openim-push.yml index 70aa5997f..36a9dafbc 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -38,7 +38,7 @@ iosPush: badgeCount: true production: false - +fullUserCache: true diff --git a/go.mod b/go.mod index 76166fa4f..a0217a22b 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.11 + github.com/openimsdk/protocol v0.0.72-alpha.14 + github.com/openimsdk/tools v0.0.50-alpha.12 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index bfd625d49..2cfab7fd4 100644 --- a/go.sum +++ b/go.sum @@ -319,10 +319,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.12 h1:GXUtSFXlh1AeOmMjN1CsRfRZMTQYBWZ8mTuRoB7KxLQ= -github.com/openimsdk/protocol v0.0.72-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc= -github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= +github.com/openimsdk/protocol v0.0.72-alpha.14 h1:XnmTUJXxxqxVqvpaO90Y+pn6b4Sz5+kvCb73p3ot1/4= +github.com/openimsdk/protocol v0.0.72-alpha.14/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo= +github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 44e79e412..58e626082 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -58,7 +58,7 @@ func Start(ctx context.Context, index int, conf *Config) error { ) hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { - longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, longServer.subscriberUserOnlineStatusChanges) + longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) return nil }) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 14e6ecc3b..4b621361e 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -76,7 +76,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config - consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) + consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil) return &consumerHandler, nil } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 8bd16178d..b567d8ce6 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -220,6 +220,7 @@ type Push struct { BadgeCount bool `mapstructure:"badgeCount"` Production bool `mapstructure:"production"` } `mapstructure:"iosPush"` + FullUserCache bool `mapstructure:"fullUserCache"` } type Auth struct { diff --git a/pkg/localcache/lru/lru.go b/pkg/localcache/lru/lru.go index 2fedffc48..41de3c8fd 100644 --- a/pkg/localcache/lru/lru.go +++ b/pkg/localcache/lru/lru.go @@ -20,6 +20,7 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V] type LRU[K comparable, V any] interface { Get(key K, fetch func() (V, error)) (V, error) + Set(key K, value V) SetHas(key K, value V) bool Del(key K) bool Stop() diff --git a/pkg/localcache/lru/lru_expiration.go b/pkg/localcache/lru/lru_expiration.go index d27e67057..98f170cc8 100644 --- a/pkg/localcache/lru/lru_expiration.go +++ b/pkg/localcache/lru/lru_expiration.go @@ -99,5 +99,11 @@ func (x *ExpirationLRU[K, V]) SetHas(key K, value V) bool { return false } +func (x *ExpirationLRU[K, V]) Set(key K, value V) { + x.lock.Lock() + defer x.lock.Unlock() + x.core.Add(key, &expirationLruItem[V]{value: value}) +} + func (x *ExpirationLRU[K, V]) Stop() { } diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index 430778b87..f44722999 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -116,6 +116,12 @@ func (x *LayLRU[K, V]) SetHasBatch(data map[K]V) bool { // return x.core.Contains(key) //} +func (x *LayLRU[K, V]) Set(key K, value V) { + x.lock.Lock() + defer x.lock.Unlock() + x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) +} + func (x *LayLRU[K, V]) SetHas(key K, value V) bool { x.lock.Lock() defer x.lock.Unlock() diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go index 4538ca20e..30ba25c41 100644 --- a/pkg/localcache/lru/lru_slot.go +++ b/pkg/localcache/lru/lru_slot.go @@ -40,6 +40,10 @@ func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return x.slots[x.getIndex(key)].Get(key, fetch) } +func (x *slotLRU[K, V]) Set(key K, value V) { + x.slots[x.getIndex(key)].Set(key, value) +} + func (x *slotLRU[K, V]) SetHas(key K, value V) bool { return x.slots[x.getIndex(key)].SetHas(key, value) } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 9598299d9..0b01b5e51 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -7,6 +7,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/util/useronline" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/db/cacheutil" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/redis/go-redis/v9" @@ -15,27 +17,47 @@ import ( "time" ) -func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { +func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { x := &OnlineCache{ - user: user, - group: group, - local: lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { + user: user, + group: group, + fullUserCache: fullUserCache, + } + + switch x.fullUserCache { + case true: + x.mapCache = cacheutil.NewCache[string, []int32]() + case false: + x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) - }), + }) } + go func() { ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) if err != nil { - log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) + log.ZError(ctx, "OnlineCache setHasUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) continue } - storageCache := x.setUserOnline(userID, platformIDs) - log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) - if fn != nil { - fn(ctx, userID, platformIDs) + + switch x.fullUserCache { + case true: + if len(platformIDs) == 0 { + // offline + x.mapCache.Delete(userID) + } else { + x.mapCache.Store(userID, platformIDs) + } + case false: + storageCache := x.setHasUserOnline(userID, platformIDs) + log.ZDebug(ctx, "OnlineCache setHasUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) + if fn != nil { + fn(ctx, userID, platformIDs) + } } + } }() return x @@ -44,11 +66,53 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re type OnlineCache struct { user rpcclient.UserRpcClient group *GroupLocalCache - local lru.LRU[string, []int32] + + // fullUserCache if enabled, caches the online status of all users using mapCache; + // otherwise, only a portion of users' online statuses (regardless of whether they are online) will be cached using lruCache. + fullUserCache bool + + lruCache lru.LRU[string, []int32] + mapCache *cacheutil.Cache[string, []int32] +} + +func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) error { + log.ZDebug(ctx, "init users online status begin") + + var ( + totalSet int + ) + + defer func(t time.Time) { + log.ZDebug(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet) + }(time.Now()) + + for page := int32(1); ; page++ { + resp, err := o.user.GetAllUserID(ctx, page, constant.ParamMaxLength) + if err != nil { + return err + } + + usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, resp.UserIDs) + if err != nil { + return err + } + + for _, user := range usersStatus { + if user.Status == constant.Online { + o.setUserOnline(user.UserID, user.PlatformIDs) + } + totalSet++ + } + + if len(resp.UserIDs) < constant.ParamMaxLength { + break + } + } + return nil } func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { - platformIDs, err := o.local.Get(userID, func() ([]int32, error) { + platformIDs, err := o.lruCache.Get(userID, func() ([]int32, error) { return o.user.GetUserOnlinePlatform(ctx, userID) }) if err != nil { @@ -83,6 +147,19 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]s offlineUserIDs []string ) + switch o.fullUserCache { + case true: + for _, userID := range usersID { + if _, ok := o.mapCache.Load(userID); ok { + onlineUserIDS = append(onlineUserIDS, userID) + } else { + offlineUserIDs = append(offlineUserIDs, userID) + } + } + case false: + } + + log.ZDebug(ctx, "get users online", "online users length", len(onlineUserIDS), "offline users length", len(offlineUserIDs)) return onlineUserIDS, offlineUserIDs, nil } @@ -120,6 +197,15 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]s // return onlineUserIDs, nil //} -func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool { - return o.local.SetHas(userID, platformIDs) +func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) { + switch o.fullUserCache { + case true: + o.mapCache.Store(userID, platformIDs) + case false: + o.lruCache.Set(userID, platformIDs) + } +} + +func (o *OnlineCache) setHasUserOnline(userID string, platformIDs []int32) bool { + return o.lruCache.SetHas(userID, platformIDs) } diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index eabe77b94..a02873ac5 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -169,6 +169,15 @@ func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error { return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdminUserID) } +// GetAllUserID retrieves all user IDs with pagination options. +func (u *UserRpcClient) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (*user.GetAllUserIDResp, error) { + resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}}) + if err != nil { + return nil, err + } + return resp, nil +} + // GetAllUserIDs retrieves all user IDs with pagination options. func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})