|
|
|
@ -3,11 +3,12 @@ package push
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
|
|
|
|
"math/rand"
|
|
|
|
|
"strconv"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
|
|
|
|
|
|
|
|
|
"github.com/IBM/sarama"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
|
|
|
@ -152,24 +153,24 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
|
|
|
|
|
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
|
|
|
|
|
defer func(duration time.Time) {
|
|
|
|
|
t := time.Since(duration)
|
|
|
|
|
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "msg", msg.String(), "time cost", t)
|
|
|
|
|
log.ZInfo(ctx, "Get msg from msg_transfer And push msg end", "msg", msg.String(), "time cost", t)
|
|
|
|
|
}(time.Now())
|
|
|
|
|
if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "webhookBeforeOnlinePush end")
|
|
|
|
|
|
|
|
|
|
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.ZInfo(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
|
|
|
|
|
log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
|
|
|
|
|
log.ZInfo(ctx, "single and notification push end")
|
|
|
|
|
|
|
|
|
|
if !c.shouldPushOffline(ctx, msg) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "shouldPushOffline end")
|
|
|
|
|
log.ZInfo(ctx, "pushOffline start")
|
|
|
|
|
|
|
|
|
|
for _, v := range wsResults {
|
|
|
|
|
//message sender do not need offline push
|
|
|
|
@ -188,14 +189,14 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
|
|
|
|
|
if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserID, msg, &offlinePushUserID); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "webhookBeforeOfflinePush end")
|
|
|
|
|
|
|
|
|
|
if len(offlinePushUserID) > 0 {
|
|
|
|
|
needOfflinePushUserID = offlinePushUserID
|
|
|
|
|
}
|
|
|
|
|
err = c.offlinePushMsg(ctx, msg, needOfflinePushUserID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg)
|
|
|
|
|
log.ZDebug(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg)
|
|
|
|
|
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID length", len(needOfflinePushUserID), "msg", msg)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -250,26 +251,24 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
|
|
|
|
|
&pushToUserIDs); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end")
|
|
|
|
|
|
|
|
|
|
err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "groupMessagesHandler end")
|
|
|
|
|
|
|
|
|
|
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg)
|
|
|
|
|
log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg)
|
|
|
|
|
log.ZInfo(ctx, "online group push end")
|
|
|
|
|
|
|
|
|
|
if !c.shouldPushOffline(ctx, msg) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs)
|
|
|
|
|
log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end")
|
|
|
|
|
//filter some user, like don not disturb or don't need offline push etc.
|
|
|
|
|
needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -297,9 +296,11 @@ func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushU
|
|
|
|
|
needOfflinePushUserIDs = offlinePushUserIDs
|
|
|
|
|
}
|
|
|
|
|
if err := c.pushDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil {
|
|
|
|
|
log.ZError(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs",
|
|
|
|
|
log.ZDebug(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs",
|
|
|
|
|
needOfflinePushUserIDs, "msg", msg)
|
|
|
|
|
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
|
|
|
|
log.ZWarn(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs length",
|
|
|
|
|
len(needOfflinePushUserIDs), "msg", msg)
|
|
|
|
|
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|