From 45b07bc3cc0076d117bb12861dc76881f6b99969 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 1 Jul 2024 17:23:35 +0800 Subject: [PATCH] online cache --- internal/push/push_handler.go | 15 +++++++++------ pkg/rpccache/online.go | 5 ++++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 029330f1a..33f987528 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -102,13 +102,11 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { var pushUserIDList []string isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID { - pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID}) + pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID) } else { - pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID}) - } - if err == nil { - err = c.Push2User(ctx, pushUserIDList, pbData.MsgData) + pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) } + err = c.Push2User(ctx, pushUserIDList, pbData.MsgData) } if err != nil { log.ZWarn(ctx, "push failed", err, "msg", pbData.String()) @@ -129,7 +127,12 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s } // Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType. -func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { +func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) (err error) { + userIDs, err = c.onlineCache.GetUsersOnline(ctx, userIDs) + if err != nil { + return err + } + log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil { return err diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 79d799bbe..aeeafcf2b 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -48,9 +48,10 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { userID, platformIDs, err := parseUserOnlineStatus(message.Payload) if err != nil { - log.ZError(ctx, "redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) + log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) continue } + log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload) x.setUserOnline(userID, platformIDs) //if err := x.setUserOnline(ctx, userID, platformIDs); err != nil { // log.ZError(ctx, "redis subscribe setUserOnline", err, "payload", message.Payload, "channel", message.Channel) @@ -95,6 +96,7 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s onlineUserIDs = append(onlineUserIDs, userID) } } + log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs) return onlineUserIDs, nil } @@ -113,6 +115,7 @@ func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]str onlineUserIDs = append(onlineUserIDs, userID) } } + log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs) return onlineUserIDs, nil }