|
|
|
@ -18,14 +18,9 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
|
|
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/protocol/conversation"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
|
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/protocol/constant"
|
|
|
|
|
"github.com/OpenIMSDK/protocol/conversation"
|
|
|
|
|
"github.com/OpenIMSDK/protocol/msggateway"
|
|
|
|
|
"github.com/OpenIMSDK/protocol/sdkws"
|
|
|
|
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
|
|
|
@ -34,6 +29,7 @@ import (
|
|
|
|
|
"github.com/OpenIMSDK/tools/utils"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush"
|
|
|
|
@ -41,6 +37,8 @@ import (
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -53,7 +51,6 @@ type Pusher struct {
|
|
|
|
|
msgRpcClient *rpcclient.MessageRpcClient
|
|
|
|
|
conversationRpcClient *rpcclient.ConversationRpcClient
|
|
|
|
|
groupRpcClient *rpcclient.GroupRpcClient
|
|
|
|
|
successCount int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var errNoOfflinePusher = errors.New("no offlinePusher is configured")
|
|
|
|
@ -104,24 +101,29 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
|
|
|
|
|
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// push
|
|
|
|
|
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
|
|
|
|
|
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
|
|
|
|
|
p.successCount++
|
|
|
|
|
if isOfflinePush {
|
|
|
|
|
for _, v := range wsResults {
|
|
|
|
|
if msg.SendID != v.UserID && (!v.OnlinePush) {
|
|
|
|
|
if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
err = p.offlinePushMsg(ctx, msg.SendID, msg, []string{v.UserID})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !isOfflinePush {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, v := range wsResults {
|
|
|
|
|
if msg.SendID != v.UserID && (!v.OnlinePush) {
|
|
|
|
|
if err = callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = p.offlinePushMsg(ctx, msg.SendID, msg, []string{v.UserID})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -140,14 +142,16 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error {
|
|
|
|
|
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
|
|
|
|
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
|
|
|
|
var pushToUserIDs []string
|
|
|
|
|
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
|
|
|
|
|
if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(pushToUserIDs) == 0 {
|
|
|
|
|
pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch msg.ContentType {
|
|
|
|
|
case constant.MemberQuitNotification:
|
|
|
|
|
var tips sdkws.MemberQuitTips
|
|
|
|
@ -155,7 +159,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer func(groupID string, userIDs []string) {
|
|
|
|
|
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
|
|
|
|
if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
|
|
|
|
log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
|
|
|
|
|
}
|
|
|
|
|
}(groupID, []string{tips.QuitUser.UserID})
|
|
|
|
@ -167,7 +171,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
}
|
|
|
|
|
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
|
|
|
|
|
defer func(groupID string, userIDs []string) {
|
|
|
|
|
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
|
|
|
|
if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
|
|
|
|
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
|
|
|
|
|
}
|
|
|
|
|
}(groupID, kickedUsers)
|
|
|
|
@ -183,48 +187,61 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
|
|
|
|
|
}
|
|
|
|
|
defer func(groupID string) {
|
|
|
|
|
if err := p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
|
|
|
|
|
if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
|
|
|
|
|
log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID)
|
|
|
|
|
}
|
|
|
|
|
}(groupID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
|
|
|
|
|
p.successCount++
|
|
|
|
|
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
|
|
|
|
|
if isOfflinePush {
|
|
|
|
|
var onlineSuccessUserIDs []string
|
|
|
|
|
var WebAndPcBackgroundUserIDs []string
|
|
|
|
|
onlineSuccessUserIDs = append(onlineSuccessUserIDs, msg.SendID)
|
|
|
|
|
var (
|
|
|
|
|
onlineSuccessUserIDs = []string{msg.SendID}
|
|
|
|
|
webAndPcBackgroundUserIDs []string
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for _, v := range wsResults {
|
|
|
|
|
if v.OnlinePush && v.UserID != msg.SendID {
|
|
|
|
|
onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID)
|
|
|
|
|
}
|
|
|
|
|
if !v.OnlinePush {
|
|
|
|
|
if len(v.Resp) != 0 {
|
|
|
|
|
for _, singleResult := range v.Resp {
|
|
|
|
|
if singleResult.ResultCode == -2 {
|
|
|
|
|
if constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC ||
|
|
|
|
|
singleResult.RecvPlatFormID == constant.WebPlatformID {
|
|
|
|
|
WebAndPcBackgroundUserIDs = append(WebAndPcBackgroundUserIDs, v.UserID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if v.OnlinePush {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(v.Resp) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, singleResult := range v.Resp {
|
|
|
|
|
if singleResult.ResultCode != -2 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
isPC := constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC
|
|
|
|
|
isWebID := singleResult.RecvPlatFormID == constant.WebPlatformID
|
|
|
|
|
|
|
|
|
|
if isPC || isWebID {
|
|
|
|
|
webAndPcBackgroundUserIDs = append(webAndPcBackgroundUserIDs, v.UserID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
|
|
|
|
|
if msg.ContentType != constant.SignalingNotification {
|
|
|
|
|
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs)
|
|
|
|
|
}
|
|
|
|
|
// Use offline push messaging
|
|
|
|
@ -234,6 +251,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(offlinePushUserIDs) > 0 {
|
|
|
|
|
needOfflinePushUserIDs = offlinePushUserIDs
|
|
|
|
|
}
|
|
|
|
@ -250,8 +268,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, WebAndPcBackgroundUserIDs)); err != nil {
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
|
|
|
|
|
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -319,15 +337,18 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData)
|
|
|
|
|
err = errNoOfflinePusher
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
type AtContent struct {
|
|
|
|
|
|
|
|
|
|
type atContent struct {
|
|
|
|
|
Text string `json:"text"`
|
|
|
|
|
AtUserList []string `json:"atUserList"`
|
|
|
|
|
IsAtSelf bool `json:"isAtSelf"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
opts, err = p.GetOfflinePushOpts(msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if msg.OfflinePushInfo != nil {
|
|
|
|
|
title = msg.OfflinePushInfo.Title
|
|
|
|
|
content = msg.OfflinePushInfo.Desc
|
|
|
|
@ -345,9 +366,9 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData)
|
|
|
|
|
case constant.File:
|
|
|
|
|
title = constant.ContentType2PushContent[int64(msg.ContentType)]
|
|
|
|
|
case constant.AtText:
|
|
|
|
|
a := AtContent{}
|
|
|
|
|
_ = utils.JsonStringToStruct(string(msg.Content), &a)
|
|
|
|
|
if utils.IsContain(conversationID, a.AtUserList) {
|
|
|
|
|
ac := atContent{}
|
|
|
|
|
_ = utils.JsonStringToStruct(string(msg.Content), &ac)
|
|
|
|
|
if utils.IsContain(conversationID, ac.AtUserList) {
|
|
|
|
|
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
|
|
|
|
|
} else {
|
|
|
|
|
title = constant.ContentType2PushContent[constant.GroupMsg]
|
|
|
|
|