From 7f8d26c36b3b6f8f883ec20490eb5edd8adb8ffe Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Thu, 5 Sep 2024 14:35:40 +0800 Subject: [PATCH] feat: cache batch --- internal/push/push_handler.go | 21 +++++++-------------- pkg/localcache/lru/lru_lazy.go | 16 ++++++++++++++++ pkg/rpccache/online.go | 9 +++++++++ 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index f11ee0b3c..14e6ecc3b 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -185,21 +185,12 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat } func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) { - var ( - onlineUserIDs []string - offlineUserIDs []string - ) - for _, userID := range pushToUserIDs { - online, err := c.onlineCache.GetUserOnline(ctx, userID) - if err != nil { - return nil, err - } - if online { - onlineUserIDs = append(onlineUserIDs, userID) - } else { - offlineUserIDs = append(offlineUserIDs, userID) - } + + onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs) + if err != nil { + return nil, err } + log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs) var result []*msggateway.SingleMsgToUserResults if len(onlineUserIDs) > 0 { @@ -398,6 +389,7 @@ func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, conten } return } + func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID) @@ -406,6 +398,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, } return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) } + func unmarshalNotificationElem(bytes []byte, t any) error { var notification sdkws.NotificationElem if err := json.Unmarshal(bytes, ¬ification); err != nil { diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index e935c687c..430778b87 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -88,6 +88,22 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return v.value, v.err } +func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func() ([]V, error)) ([]V, error) { + return nil, nil +} + +func (x *LayLRU[K, V]) SetHasBatch(data map[K]V) bool { + x.lock.Lock() + defer x.lock.Unlock() + for key, value := range data { + if x.core.Contains(key) { + x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) + return true + } + } + return false +} + //func (x *LayLRU[K, V]) Set(key K, value V) { // x.lock.Lock() // x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index fec345b98..9598299d9 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -77,6 +77,15 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e return len(platformIDs) > 0, nil } +func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]string, []string, error) { + var ( + onlineUserIDS []string + offlineUserIDs []string + ) + + return onlineUserIDS, offlineUserIDs, nil +} + //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { // onlineUserIDs := make([]string, 0, len(userIDs)) // for _, userID := range userIDs {