diff --git a/config/openim-push.yml b/config/openim-push.yml index 70aa5997f..f2dd39dbf 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -1,16 +1,16 @@ rpc: # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP - registerIP: + registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10170, 10171, 10172, 10173 ] + ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20670, 20671, 20672, 20673 ] + ports: [ 20670, 20671, 20672, 20673, 20674, 20675, 20676, 20677 ] maxConcurrentWorkers: 3 #Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified. @@ -38,7 +38,7 @@ iosPush: badgeCount: true production: false - +fullUserCache: true diff --git a/go.mod b/go.mod index 76166fa4f..a0217a22b 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.11 + github.com/openimsdk/protocol v0.0.72-alpha.14 + github.com/openimsdk/tools v0.0.50-alpha.12 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index bfd625d49..2cfab7fd4 100644 --- a/go.sum +++ b/go.sum @@ -319,10 +319,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.12 h1:GXUtSFXlh1AeOmMjN1CsRfRZMTQYBWZ8mTuRoB7KxLQ= -github.com/openimsdk/protocol v0.0.72-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc= -github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= +github.com/openimsdk/protocol v0.0.72-alpha.14 h1:XnmTUJXxxqxVqvpaO90Y+pn6b4Sz5+kvCb73p3ot1/4= +github.com/openimsdk/protocol v0.0.72-alpha.14/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo= +github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 44e79e412..50da06097 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -58,7 +58,7 @@ func Start(ctx context.Context, index int, conf *Config) error { ) hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { - longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, longServer.subscriberUserOnlineStatusChanges) + longServer.online, _ = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) return nil }) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 14e6ecc3b..46a75e3f5 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -40,6 +40,8 @@ import ( "github.com/openimsdk/tools/utils/timeutil" "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" + "math/rand" + "strconv" "time" ) @@ -67,6 +69,20 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, return nil, err } userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + for { + ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10)) + conns, err := userRpcClient.Discov.GetConns( + ctx, + config.Share.RpcRegisterName.User, + ) + if err != nil || len(conns) == 0 { + time.Sleep(time.Second) + log.ZWarn(ctx, "waiting for user rpc", err) + } else { + break + } + } + consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) @@ -76,7 +92,10 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config - consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) + consumerHandler.onlineCache, err = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil) + if err != nil { + return nil, err + } return &consumerHandler, nil } @@ -212,7 +231,7 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) defer func(duration time.Time) { t := time.Since(duration) - if t.Seconds() > 5 { + if t.Seconds() > 1 { log.ZWarn(ctx, "Get group msg from msg_transfer and push msg end", nil, "msg", msg.String(), "groupID", groupID, "time cost", t) } else { log.ZDebug(ctx, "Get group msg from msg_transfer and push msg end", "msg", msg.String(), "groupID", groupID, "time cost", t) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 8bd16178d..b567d8ce6 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -220,6 +220,7 @@ type Push struct { BadgeCount bool `mapstructure:"badgeCount"` Production bool `mapstructure:"production"` } `mapstructure:"iosPush"` + FullUserCache bool `mapstructure:"fullUserCache"` } type Auth struct { diff --git a/pkg/localcache/lru/lru.go b/pkg/localcache/lru/lru.go index 5182bed5f..943151246 100644 --- a/pkg/localcache/lru/lru.go +++ b/pkg/localcache/lru/lru.go @@ -20,6 +20,7 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V] type LRU[K comparable, V any] interface { Get(key K, fetch func() (V, error)) (V, error) + Set(key K, value V) GetBatch(key []K, fetchBatch func([]K) (map[string]V, error)) (map[string]V, error) SetHas(key K, value V) bool Del(key K) bool diff --git a/pkg/localcache/lru/lru_expiration.go b/pkg/localcache/lru/lru_expiration.go index d27e67057..98f170cc8 100644 --- a/pkg/localcache/lru/lru_expiration.go +++ b/pkg/localcache/lru/lru_expiration.go @@ -99,5 +99,11 @@ func (x *ExpirationLRU[K, V]) SetHas(key K, value V) bool { return false } +func (x *ExpirationLRU[K, V]) Set(key K, value V) { + x.lock.Lock() + defer x.lock.Unlock() + x.core.Add(key, &expirationLruItem[V]{value: value}) +} + func (x *ExpirationLRU[K, V]) Stop() { } diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index ce3f7cff6..43a8c412a 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -176,6 +176,12 @@ func (x *LayLRU[K, V]) SetHasBatch(data map[K]V) { // return x.core.Contains(key) //} +func (x *LayLRU[K, V]) Set(key K, value V) { + x.lock.Lock() + defer x.lock.Unlock() + x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) +} + func (x *LayLRU[K, V]) SetHas(key K, value V) bool { x.lock.Lock() defer x.lock.Unlock() diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go index 4538ca20e..30ba25c41 100644 --- a/pkg/localcache/lru/lru_slot.go +++ b/pkg/localcache/lru/lru_slot.go @@ -40,6 +40,10 @@ func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return x.slots[x.getIndex(key)].Get(key, fetch) } +func (x *slotLRU[K, V]) Set(key K, value V) { + x.slots[x.getIndex(key)].Set(key, value) +} + func (x *slotLRU[K, V]) SetHas(key K, value V) bool { return x.slots[x.getIndex(key)].SetHas(key, value) } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index e767b9a92..f8b9ff710 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -11,45 +11,112 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/util/useronline" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/db/cacheutil" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/redis/go-redis/v9" ) -func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { +func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) { x := &OnlineCache{ - user: user, - group: group, - local: lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { + user: user, + group: group, + fullUserCache: fullUserCache, + } + + switch x.fullUserCache { + case true: + x.mapCache = cacheutil.NewCache[string, []int32]() + if err := x.initUsersOnlineStatus(mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))); err != nil { + return nil, err + } + case false: + x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) - }), + }) } + go func() { ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) if err != nil { - log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) + log.ZError(ctx, "OnlineCache setHasUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) continue } - storageCache := x.setUserOnline(userID, platformIDs) - log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) - if fn != nil { - fn(ctx, userID, platformIDs) + + switch x.fullUserCache { + case true: + if len(platformIDs) == 0 { + // offline + x.mapCache.Delete(userID) + } else { + x.mapCache.Store(userID, platformIDs) + } + case false: + storageCache := x.setHasUserOnline(userID, platformIDs) + log.ZDebug(ctx, "OnlineCache setHasUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) + if fn != nil { + fn(ctx, userID, platformIDs) + } } + } }() - return x + return x, nil } type OnlineCache struct { user rpcclient.UserRpcClient group *GroupLocalCache - local lru.LRU[string, []int32] + + // fullUserCache if enabled, caches the online status of all users using mapCache; + // otherwise, only a portion of users' online statuses (regardless of whether they are online) will be cached using lruCache. + fullUserCache bool + + lruCache lru.LRU[string, []int32] + mapCache *cacheutil.Cache[string, []int32] +} + +func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) error { + log.ZDebug(ctx, "init users online status begin") + + var ( + totalSet int + ) + + defer func(t time.Time) { + log.ZWarn(ctx, "init users online status end", nil, "cost", time.Since(t), "totalSet", totalSet) + }(time.Now()) + + for page := int32(1); ; page++ { + resp, err := o.user.GetAllUserID(ctx, page, constant.ParamMaxLength) + if err != nil { + return err + } + + usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, resp.UserIDs) + if err != nil { + return err + } + + for _, user := range usersStatus { + if user.Status == constant.Online { + o.setUserOnline(user.UserID, user.PlatformIDs) + } + totalSet++ + } + + if len(resp.UserIDs) < constant.ParamMaxLength { + break + } + } + return nil } func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { - platformIDs, err := o.local.Get(userID, func() ([]int32, error) { + platformIDs, err := o.lruCache.Get(userID, func() ([]int32, error) { return o.user.GetUserOnlinePlatform(ctx, userID) }) if err != nil { @@ -91,7 +158,7 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e // ---------------------- func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { - platformIDsMap, err := o.local.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { + platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { platformIDsMap := make(map[string][]int32) usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) @@ -134,6 +201,19 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s } } + switch o.fullUserCache { + case true: + for _, userID := range userIDs { + if _, ok := o.mapCache.Load(userID); ok { + onlineUserIDs = append(onlineUserIDs, userID) + } else { + offlineUserIDs = append(offlineUserIDs, userID) + } + } + case false: + } + + log.ZWarn(ctx, "get users online", nil, "online users length", len(onlineUserIDs), "offline users length", len(offlineUserIDs)) return onlineUserIDs, offlineUserIDs, nil } @@ -171,6 +251,15 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s // return onlineUserIDs, nil //} -func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool { - return o.local.SetHas(userID, platformIDs) +func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) { + switch o.fullUserCache { + case true: + o.mapCache.Store(userID, platformIDs) + case false: + o.lruCache.Set(userID, platformIDs) + } +} + +func (o *OnlineCache) setHasUserOnline(userID string, platformIDs []int32) bool { + return o.lruCache.SetHas(userID, platformIDs) } diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index eabe77b94..a02873ac5 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -169,6 +169,15 @@ func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error { return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdminUserID) } +// GetAllUserID retrieves all user IDs with pagination options. +func (u *UserRpcClient) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (*user.GetAllUserIDResp, error) { + resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}}) + if err != nil { + return nil, err + } + return resp, nil +} + // GetAllUserIDs retrieves all user IDs with pagination options. func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})