diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 33f987528..3f3ca11e8 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -28,6 +28,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/constant" pbchat "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/msggateway" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" @@ -128,16 +129,11 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s // Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType. func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) (err error) { - userIDs, err = c.onlineCache.GetUsersOnline(ctx, userIDs) - if err != nil { - return err - } - log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil { return err } - wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, userIDs) + wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs) if err != nil { return err } @@ -186,6 +182,38 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat return true } +func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) { + var ( + onlineUserIDs []string + offlineUserIDs []string + ) + for _, userID := range pushToUserIDs { + online, err := c.onlineCache.GetUserOnline(ctx, userID) + if err != nil { + return nil, err + } + if online { + onlineUserIDs = append(onlineUserIDs, userID) + } else { + offlineUserIDs = append(offlineUserIDs, userID) + } + } + var result []*msggateway.SingleMsgToUserResults + if len(onlineUserIDs) > 0 { + var err error + result, err = c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) + if err != nil { + return nil, err + } + } + for _, userID := range offlineUserIDs { + result = append(result, &msggateway.SingleMsgToUserResults{ + UserID: userID, + }) + } + return result, nil +} + func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string @@ -199,7 +227,7 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s return err } - wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) + wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) if err != nil { return err } @@ -240,8 +268,7 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s } func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) { if len(*pushToUserIDs) == 0 { - //*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID) - *pushToUserIDs, err = c.onlineCache.GetGroupOnline(ctx, groupID) // + *pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID) if err != nil { return err }