You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/internal/push/push_to_client.go

336 lines
12 KiB

2 years ago
package push
4 years ago
import (
"context"
"encoding/json"
2 years ago
"errors"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/getui"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/jpush"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2 years ago
"google.golang.org/protobuf/proto"
4 years ago
)
2 years ago
type Pusher struct {
2 years ago
database controller.PushDatabase
2 years ago
client discoveryregistry.SvcDiscoveryRegistry
2 years ago
offlinePusher offlinepush.OfflinePusher
2 years ago
groupLocalCache *localcache.GroupLocalCache
conversationLocalCache *localcache.ConversationLocalCache
2 years ago
successCount int
}
4 years ago
2 years ago
func NewPusher(client discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
2 years ago
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher {
2 years ago
rpcclient.NewGroupClient(client)
2 years ago
return &Pusher{
2 years ago
database: database,
client: client,
offlinePusher: offlinePusher,
groupLocalCache: groupLocalCache,
conversationLocalCache: conversationLocalCache,
}
2 years ago
}
2 years ago
func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
2 years ago
var offlinePusher offlinepush.OfflinePusher
if config.Config.Push.Getui.Enable {
offlinePusher = getui.NewClient(cache)
}
if config.Config.Push.Fcm.Enable {
offlinePusher = fcm.NewClient(cache)
}
if config.Config.Push.Jpns.Enable {
offlinePusher = jpush.NewClient()
}
return offlinePusher
}
2 years ago
func (p *Pusher) DismissGroup(ctx context.Context, groupID string) error {
cc, err := p.client.GetConn(ctx, config.Config.RpcRegisterName.OpenImGroupName)
if err != nil {
return err
}
_, err = group.NewGroupClient(cc).DismissGroup(ctx, &group.DismissGroupReq{
GroupID: groupID,
DeleteMember: true,
})
return err
}
2 years ago
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
2 years ago
// callback
2 years ago
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil && err != errs.ErrCallbackContinue {
2 years ago
return err
}
// push
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs)
if err != nil {
return err
4 years ago
}
2 years ago
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
2 years ago
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
2 years ago
p.successCount++
2 years ago
for _, userID := range userIDs {
if isOfflinePush && userID != msg.SendID {
// save invitation info for offline push
for _, v := range wsResults {
if v.OnlinePush {
return nil
}
3 years ago
}
2 years ago
if msg.ContentType == constant.SignalingNotification {
isSend, err := p.database.HandleSignalInvite(ctx, msg, userID)
if err != nil {
return err
}
if !isSend {
return nil
}
}
if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
2 years ago
return err
2 years ago
}
2 years ago
err = p.offlinePushMsg(ctx, userID, msg, userIDs)
if err != nil {
return err
2 years ago
}
2 years ago
}
3 years ago
}
2 years ago
return nil
3 years ago
}
3 years ago
func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error {
var notificationElem struct {
Detail string `json:"detail,omitempty"`
}
if err := json.Unmarshal(bytes, &notificationElem); err != nil {
return err
}
return json.Unmarshal([]byte(notificationElem.Detail), t)
}
2 years ago
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
2 years ago
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
2 years ago
var pushToUserIDs []string
2 years ago
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil && err != errs.ErrCallbackContinue {
2 years ago
return err
}
2 years ago
if len(pushToUserIDs) == 0 {
pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
if err != nil {
2 years ago
return err
}
switch msg.ContentType {
case constant.MemberQuitNotification:
var tips sdkws.MemberQuitTips
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID)
case constant.MemberKickedNotification:
var tips sdkws.MemberKickedTips
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
pushToUserIDs = append(pushToUserIDs, kickedUsers...)
2 years ago
case constant.GroupDismissedNotification:
2 years ago
if utils.IsNotification(utils.GetConversationIDByMsg(msg)) { // 消息先到,通知后到
var tips sdkws.GroupDismissedTips
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs)
if len(config.Config.Manager.AppManagerUid) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.AppManagerUid[0])
}
if err := p.DismissGroup(ctx, groupID); err != nil {
log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID)
}
2 years ago
}
}
2 years ago
}
2 years ago
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil {
return err
2 years ago
}
2 years ago
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
2 years ago
p.successCount++
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
2 years ago
if isOfflinePush {
2 years ago
var onlineSuccessUserIDs []string
var WebAndPcBackgroundUserIDs []string
onlineSuccessUserIDs = append(onlineSuccessUserIDs, msg.SendID)
for _, v := range wsResults {
if v.OnlinePush && v.UserID != msg.SendID {
onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID)
2 years ago
}
2 years ago
if !v.OnlinePush {
if len(v.Resp) != 0 {
for _, singleResult := range v.Resp {
if singleResult.ResultCode == -2 {
if constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC ||
singleResult.RecvPlatFormID == constant.WebPlatformID {
WebAndPcBackgroundUserIDs = append(WebAndPcBackgroundUserIDs, v.UserID)
}
}
}
}
2 years ago
}
2 years ago
}
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
2 years ago
log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID)
2 years ago
return err
}
2 years ago
needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs)
}
//Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
2 years ago
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
2 years ago
}
2 years ago
err = p.offlinePushMsg(ctx, groupID, msg, offlinePushUserIDs)
2 years ago
if err != nil {
2 years ago
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
2 years ago
return err
}
2 years ago
_, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
2 years ago
if err != nil {
2 years ago
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
2 years ago
return err
2 years ago
}
2 years ago
}
3 years ago
}
2 years ago
return nil
}
2 years ago
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
2 years ago
conns, err := p.client.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
2 years ago
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
2 years ago
if err != nil {
return nil, err
}
//Online push message
for _, v := range conns {
2 years ago
msgClient := msggateway.NewMsgGatewayClient(v)
2 years ago
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs})
2 years ago
if err != nil {
continue
}
2 years ago
log.ZDebug(ctx, "push result", "reply", reply)
2 years ago
if reply != nil && reply.SinglePushResult != nil {
wsResults = append(wsResults, reply.SinglePushResult...)
}
}
return wsResults, nil
}
func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
title, content, opts, err := p.getOfflinePushInfos(conversationID, msg)
2 years ago
if err != nil {
return err
}
err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil {
2 years ago
prome.Inc(prome.MsgOfflinePushFailedCounter)
2 years ago
return err
}
2 years ago
prome.Inc(prome.MsgOfflinePushSuccessCounter)
2 years ago
return nil
3 years ago
}
2 years ago
func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts, err error) {
opts = &offlinepush.Opts{}
2 years ago
if msg.ContentType > constant.SignalingNotificationBegin && msg.ContentType < constant.SignalingNotificationEnd {
2 years ago
req := &sdkws.SignalReq{}
2 years ago
if err := proto.Unmarshal(msg.Content, req); err != nil {
2 years ago
return nil, utils.Wrap(err, "")
}
switch req.Payload.(type) {
2 years ago
case *sdkws.SignalReq_Invite, *sdkws.SignalReq_InviteInGroup:
2 years ago
opts.Signal = &offlinepush.Signal{ClientMsgID: msg.ClientMsgID}
}
}
2 years ago
if msg.OfflinePushInfo != nil {
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
opts.Ex = msg.OfflinePushInfo.Ex
}
return opts, nil
}
2 years ago
func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) {
2 years ago
if p.offlinePusher == nil {
err = errors.New("no offlinePusher is configured")
return
}
type AtContent struct {
Text string `json:"text"`
AtUserList []string `json:"atUserList"`
IsAtSelf bool `json:"isAtSelf"`
}
opts, err = p.GetOfflinePushOpts(msg)
if err != nil {
return
}
if msg.OfflinePushInfo != nil {
title = msg.OfflinePushInfo.Title
content = msg.OfflinePushInfo.Desc
}
if title == "" {
switch msg.ContentType {
case constant.Text:
fallthrough
case constant.Picture:
fallthrough
case constant.Voice:
fallthrough
case constant.Video:
fallthrough
case constant.File:
title = constant.ContentType2PushContent[int64(msg.ContentType)]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(msg.Content), &a)
if utils.IsContain(conversationID, a.AtUserList) {
2 years ago
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
title = constant.ContentType2PushContent[constant.GroupMsg]
}
case constant.SignalingNotification:
title = constant.ContentType2PushContent[constant.SignalMsg]
default:
title = constant.ContentType2PushContent[constant.Common]
}
}
if content == "" {
content = title
}
return
}