package push import ( "context" "encoding/json" "errors" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/getui" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/jpush" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) type Pusher struct { database controller.PushDatabase discov discoveryregistry.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher groupLocalCache *localcache.GroupLocalCache conversationLocalCache *localcache.ConversationLocalCache msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient successCount int } var errNoOfflinePusher = errors.New("no offlinePusher is configured") func NewPusher( discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { return &Pusher{ discov: discov, database: database, offlinePusher: offlinePusher, groupLocalCache: groupLocalCache, conversationLocalCache: conversationLocalCache, msgRpcClient: msgRpcClient, conversationRpcClient: conversationRpcClient, groupRpcClient: groupRpcClient, } } func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { var offlinePusher offlinepush.OfflinePusher switch config.Config.Push.Enable { case "getui": offlinePusher = getui.NewClient(cache) case "fcm": offlinePusher = fcm.NewClient(cache) case "jpush": offlinePusher = jpush.NewClient() } return offlinePusher } func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) if err != nil { return err } return p.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) } func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) // callback if err := callbackOnlinePush(ctx, userIDs, msg); err != nil { return err } // push wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs) if err != nil { return err } isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) log.ZDebug( ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs, ) p.successCount++ for _, userID := range userIDs { if isOfflinePush && userID != msg.SendID { // save invitation info for offline push for _, v := range wsResults { if v.OnlinePush { return nil } } if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil { return err } err = p.offlinePushMsg(ctx, userID, msg, userIDs) if err != nil { return err } } } return nil } func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error { var notificationElem struct { Detail string `json:"detail,omitempty"` } if err := json.Unmarshal(bytes, ¬ificationElem); err != nil { return err } return json.Unmarshal([]byte(notificationElem.Detail), t) } func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil { return err } if len(pushToUserIDs) == 0 { pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID) if err != nil { return err } switch msg.ContentType { case constant.MemberQuitNotification: var tips sdkws.MemberQuitTips if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { return err } defer func(groupID string, userIDs []string) { if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { log.ZError( ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs, ) } }(groupID, []string{tips.QuitUser.UserID}) pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID) case constant.MemberKickedNotification: var tips sdkws.MemberKickedTips if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { return err } kickedUsers := utils.Slice( tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }, ) defer func(groupID string, userIDs []string) { if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { log.ZError( ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs, ) } }(groupID, kickedUsers) pushToUserIDs = append(pushToUserIDs, kickedUsers...) case constant.GroupDismissedNotification: if utils.IsNotification(utils.GetConversationIDByMsg(msg)) { // 消息先到,通知后到 var tips sdkws.GroupDismissedTips if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { return err } log.ZInfo( ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs, ) if len(config.Config.Manager.UserID) > 0 { ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0]) } defer func(groupID string) { if err := p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID) } }(groupID) } } } wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) if err != nil { return err } log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) p.successCount++ isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) if isOfflinePush { var onlineSuccessUserIDs []string var WebAndPcBackgroundUserIDs []string onlineSuccessUserIDs = append(onlineSuccessUserIDs, msg.SendID) for _, v := range wsResults { if v.OnlinePush && v.UserID != msg.SendID { onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID) } if !v.OnlinePush { if len(v.Resp) != 0 { for _, singleResult := range v.Resp { if singleResult.ResultCode == -2 { if constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC || singleResult.RecvPlatFormID == constant.WebPlatformID { WebAndPcBackgroundUserIDs = append(WebAndPcBackgroundUserIDs, v.UserID) } } } } } } needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) if msg.ContentType != constant.SignalingNotification { notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) if err != nil { // log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID) return err } needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs) } //Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } err = p.offlinePushMsg(ctx, groupID, msg, offlinePushUserIDs) if err != nil { log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) return err } _, err := p.GetConnsAndOnlinePush( ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs), ) if err != nil { log.ZError( ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs), ) return err } } } return nil } func (p *Pusher) GetConnsAndOnlinePush( ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string, ) (wsResults []*msggateway.SingleMsgToUserResults, err error) { conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) if err != nil { return nil, err } //Online push message for _, v := range conns { msgClient := msggateway.NewMsgGatewayClient(v) reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg( ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}, ) p.discov.CloseConn(v) if err != nil { continue } log.ZDebug(ctx, "push result", "reply", reply) if reply != nil && reply.SinglePushResult != nil { wsResults = append(wsResults, reply.SinglePushResult...) } } return wsResults, nil } func (p *Pusher) offlinePushMsg( ctx context.Context, conversationID string, msg *sdkws.MsgData, offlinePushUserIDs []string, ) error { title, content, opts, err := p.getOfflinePushInfos(conversationID, msg) if err != nil { return err } err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) if err != nil { prome.Inc(prome.MsgOfflinePushFailedCounter) return err } prome.Inc(prome.MsgOfflinePushSuccessCounter) return nil } func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts, err error) { opts = &offlinepush.Opts{} // if msg.ContentType > constant.SignalingNotificationBegin && msg.ContentType < constant.SignalingNotificationEnd { // req := &sdkws.SignalReq{} // if err := proto.Unmarshal(msg.Content, req); err != nil { // return nil, utils.Wrap(err, "") // } // switch req.Payload.(type) { // case *sdkws.SignalReq_Invite, *sdkws.SignalReq_InviteInGroup: // opts.Signal = &offlinepush.Signal{ClientMsgID: msg.ClientMsgID} // } // } if msg.OfflinePushInfo != nil { opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound opts.Ex = msg.OfflinePushInfo.Ex } return opts, nil } func (p *Pusher) getOfflinePushInfos( conversationID string, msg *sdkws.MsgData, ) (title, content string, opts *offlinepush.Opts, err error) { if p.offlinePusher == nil { err = errNoOfflinePusher return } type AtContent struct { Text string `json:"text"` AtUserList []string `json:"atUserList"` IsAtSelf bool `json:"isAtSelf"` } opts, err = p.GetOfflinePushOpts(msg) if err != nil { return } if msg.OfflinePushInfo != nil { title = msg.OfflinePushInfo.Title content = msg.OfflinePushInfo.Desc } if title == "" { switch msg.ContentType { case constant.Text: fallthrough case constant.Picture: fallthrough case constant.Voice: fallthrough case constant.Video: fallthrough case constant.File: title = constant.ContentType2PushContent[int64(msg.ContentType)] case constant.AtText: a := AtContent{} _ = utils.JsonStringToStruct(string(msg.Content), &a) if utils.IsContain(conversationID, a.AtUserList) { title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] } else { title = constant.ContentType2PushContent[constant.GroupMsg] } case constant.SignalingNotification: title = constant.ContentType2PushContent[constant.SignalMsg] default: title = constant.ContentType2PushContent[constant.Common] } } if content == "" { content = title } return }