diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 03c299b7a..029330f1a 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -45,6 +45,7 @@ type ConsumerHandler struct { pushConsumerGroup *kafka.MConsumerGroup offlinePusher offlinepush.OfflinePusher onlinePusher OnlinePusher + onlineCache *rpccache.OnlineCache groupLocalCache *rpccache.GroupLocalCache conversationLocalCache *rpccache.ConversationLocalCache msgRpcClient rpcclient.MessageRpcClient @@ -63,16 +64,17 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, if err != nil { return nil, err } + userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) - consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, - &config.LocalCacheConfig, rdb) + consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config + consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb) return &consumerHandler, nil } @@ -100,11 +102,13 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { var pushUserIDList []string isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID { - pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID) + pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID}) } else { - pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) + pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID}) + } + if err == nil { + err = c.Push2User(ctx, pushUserIDList, pbData.MsgData) } - err = c.Push2User(ctx, pushUserIDList, pbData.MsgData) } if err != nil { log.ZWarn(ctx, "push failed", err, "msg", pbData.String()) @@ -233,7 +237,8 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s } func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) { if len(*pushToUserIDs) == 0 { - *pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID) + //*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID) + *pushToUserIDs, err = c.onlineCache.GetGroupOnline(ctx, groupID) // if err != nil { return err } diff --git a/pkg/common/storage/cache/redis/seq_user_test.go b/pkg/common/storage/cache/redis/seq_user_test.go index b4c852cd0..cfbea004c 100644 --- a/pkg/common/storage/cache/redis/seq_user_test.go +++ b/pkg/common/storage/cache/redis/seq_user_test.go @@ -3,6 +3,7 @@ package redis import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/redis/go-redis/v9" "log" "strconv" @@ -15,7 +16,7 @@ func newTestOnline() *userOnline { opt := &redis.Options{ Addr: "172.16.8.48:16379", Password: "openIM123", - DB: 1, + DB: 0, } rdb := redis.NewClient(opt) if err := rdb.Ping(context.Background()).Err(); err != nil { @@ -63,7 +64,7 @@ func TestGetOnline(t *testing.T) { func TestRecvOnline(t *testing.T) { ts := newTestOnline() ctx := context.Background() - pubsub := ts.rdb.Subscribe(ctx, "user_online") + pubsub := ts.rdb.Subscribe(ctx, cachekey.OnlineChannel) // 等待订阅确认 _, err := pubsub.Receive(ctx) diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go index 0e040ad38..ba849f892 100644 --- a/pkg/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -31,6 +31,12 @@ type Cache[V any] interface { Stop() } +func LRUStringHash(key string) uint64 { + h := fnv.New64a() + h.Write(*(*[]byte)(unsafe.Pointer(&key))) + return h.Sum64() +} + func New[V any](opts ...Option) Cache[V] { opt := defaultOption() for _, o := range opts { @@ -49,11 +55,7 @@ func New[V any](opts ...Option) Cache[V] { if opt.localSlotNum == 1 { c.local = createSimpleLRU() } else { - c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, func(key string) uint64 { - h := fnv.New64a() - h.Write(*(*[]byte)(unsafe.Pointer(&key))) - return h.Sum64() - }, createSimpleLRU) + c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU) } if opt.linkSlotNum > 0 { c.link = link.New(opt.linkSlotNum) diff --git a/pkg/localcache/lru/lru.go b/pkg/localcache/lru/lru.go index 64280f238..2fedffc48 100644 --- a/pkg/localcache/lru/lru.go +++ b/pkg/localcache/lru/lru.go @@ -20,6 +20,7 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V] type LRU[K comparable, V any] interface { Get(key K, fetch func() (V, error)) (V, error) + SetHas(key K, value V) bool Del(key K) bool Stop() } diff --git a/pkg/localcache/lru/lru_expiration.go b/pkg/localcache/lru/lru_expiration.go index 970ac083e..d27e67057 100644 --- a/pkg/localcache/lru/lru_expiration.go +++ b/pkg/localcache/lru/lru_expiration.go @@ -89,5 +89,15 @@ func (x *ExpirationLRU[K, V]) Del(key K) bool { return ok } +func (x *ExpirationLRU[K, V]) SetHas(key K, value V) bool { + x.lock.Lock() + defer x.lock.Unlock() + if x.core.Contains(key) { + x.core.Add(key, &expirationLruItem[V]{value: value}) + return true + } + return false +} + func (x *ExpirationLRU[K, V]) Stop() { } diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index d6e64aae4..e935c687c 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -88,6 +88,28 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return v.value, v.err } +//func (x *LayLRU[K, V]) Set(key K, value V) { +// x.lock.Lock() +// x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) +// x.lock.Unlock() +//} +// +//func (x *LayLRU[K, V]) Has(key K) bool { +// x.lock.Lock() +// defer x.lock.Unlock() +// return x.core.Contains(key) +//} + +func (x *LayLRU[K, V]) SetHas(key K, value V) bool { + x.lock.Lock() + defer x.lock.Unlock() + if x.core.Contains(key) { + x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) + return true + } + return false +} + func (x *LayLRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go index d034e94d3..4538ca20e 100644 --- a/pkg/localcache/lru/lru_slot.go +++ b/pkg/localcache/lru/lru_slot.go @@ -40,6 +40,10 @@ func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return x.slots[x.getIndex(key)].Get(key, fetch) } +func (x *slotLRU[K, V]) SetHas(key K, value V) bool { + return x.slots[x.getIndex(key)].SetHas(key, value) +} + func (x *slotLRU[K, V]) Del(key K) bool { return x.slots[x.getIndex(key)].Del(key) } diff --git a/pkg/localcache/option.go b/pkg/localcache/option.go index 00bb9d044..7d91aba6c 100644 --- a/pkg/localcache/option.go +++ b/pkg/localcache/option.go @@ -30,7 +30,7 @@ func defaultOption() *option { localSuccessTTL: time.Minute, localFailedTTL: time.Second * 5, delFn: make([]func(ctx context.Context, key ...string), 0, 2), - target: emptyTarget{}, + target: EmptyTarget{}, } } @@ -123,14 +123,14 @@ func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option { } } -type emptyTarget struct{} +type EmptyTarget struct{} -func (e emptyTarget) IncrGetHit() {} +func (e EmptyTarget) IncrGetHit() {} -func (e emptyTarget) IncrGetSuccess() {} +func (e EmptyTarget) IncrGetSuccess() {} -func (e emptyTarget) IncrGetFailed() {} +func (e EmptyTarget) IncrGetFailed() {} -func (e emptyTarget) IncrDelHit() {} +func (e EmptyTarget) IncrDelHit() {} -func (e emptyTarget) IncrDelNotFound() {} +func (e EmptyTarget) IncrDelNotFound() {} diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go new file mode 100644 index 000000000..79d799bbe --- /dev/null +++ b/pkg/rpccache/online.go @@ -0,0 +1,121 @@ +package rpccache + +import ( + "context" + "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" + "github.com/redis/go-redis/v9" + "math/rand" + "strconv" + "strings" + "time" +) + +func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient) *OnlineCache { + x := &OnlineCache{ + user: user, + group: group, + local: lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { + return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) + }), + } + go func() { + parseUserOnlineStatus := func(payload string) (string, []int32, error) { + arr := strings.Split(payload, ":") + if len(arr) == 0 { + return "", nil, errors.New("invalid data") + } + userID := arr[len(arr)-1] + if userID == "" { + return "", nil, errors.New("userID is empty") + } + platformIDs := make([]int32, len(arr)-1) + for i := range platformIDs { + platformID, err := strconv.Atoi(arr[i]) + if err != nil { + return "", nil, err + } + platformIDs[i] = int32(platformID) + } + return userID, platformIDs, nil + } + ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) + for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { + userID, platformIDs, err := parseUserOnlineStatus(message.Payload) + if err != nil { + log.ZError(ctx, "redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) + continue + } + x.setUserOnline(userID, platformIDs) + //if err := x.setUserOnline(ctx, userID, platformIDs); err != nil { + // log.ZError(ctx, "redis subscribe setUserOnline", err, "payload", message.Payload, "channel", message.Channel) + //} + } + }() + return x +} + +type OnlineCache struct { + user rpcclient.UserRpcClient + group *GroupLocalCache + local lru.LRU[string, []int32] +} + +func (o *OnlineCache) getUserOnlineKey(userID string) string { + return "" + userID +} + +func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { + return o.local.Get(userID, func() ([]int32, error) { + return o.user.GetUserOnlinePlatform(ctx, userID) + }) +} + +func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) { + platformIDs, err := o.GetUserOnlinePlatform(ctx, userID) + if err != nil { + return false, err + } + return len(platformIDs) > 0, nil +} + +func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { + onlineUserIDs := make([]string, 0, len(userIDs)) + for _, userID := range userIDs { + online, err := o.GetUserOnline(ctx, userID) + if err != nil { + return nil, err + } + if online { + onlineUserIDs = append(onlineUserIDs, userID) + } + } + return onlineUserIDs, nil +} + +func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) { + userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + var onlineUserIDs []string + for _, userID := range userIDs { + online, err := o.GetUserOnline(ctx, userID) + if err != nil { + return nil, err + } + if online { + onlineUserIDs = append(onlineUserIDs, userID) + } + } + return onlineUserIDs, nil +} + +func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) { + o.local.SetHas(o.getUserOnlineKey(userID), platformIDs) +} diff --git a/pkg/rpccache/user.go b/pkg/rpccache/user.go index 25a8eb20d..6126f5891 100644 --- a/pkg/rpccache/user.go +++ b/pkg/rpccache/user.go @@ -110,3 +110,18 @@ func (u *UserLocalCache) GetUsersInfoMap(ctx context.Context, userIDs []string) } return users, nil } + +//func (u *UserLocalCache) GetUserOnlinePlatform(ctx context.Context, userID string) (val []int32, err error) { +// log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform req", "userID", userID) +// defer func() { +// if err == nil { +// log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform return", "value", val) +// } else { +// log.ZError(ctx, "UserLocalCache GetUserOnlinePlatform return", err) +// } +// }() +// return localcache.AnyValue[[]int32](u.local.Get(ctx, cachekey.GetOnlineKey(userID), func(ctx context.Context) (any, error) { +// log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID) +// return u.client.GetUserGlobalMsgRecvOpt(ctx, userID) +// })) +//} diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index aab96603e..cac44fce0 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -193,3 +193,25 @@ func (u *UserRpcClient) GetNotificationByID(ctx context.Context, userID string) }) return err } + +func (u *UserRpcClient) GetUsersOnlinePlatform(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { + if len(userIDs) == 0 { + return nil, nil + } + resp, err := u.Client.GetUserStatus(ctx, &user.GetUserStatusReq{UserIDs: userIDs}) + if err != nil { + return nil, err + } + return resp.StatusList, nil +} + +func (u *UserRpcClient) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { + resp, err := u.GetUsersOnlinePlatform(ctx, []string{userID}) + if err != nil { + return nil, err + } + if len(resp) == 0 { + return nil, nil + } + return resp[0].PlatformIDs, nil +}