diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index b622ba721..17f0df085 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -2,6 +2,7 @@ package logic import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/proto/push" @@ -49,7 +50,12 @@ func (r *RPCServer) run() { } func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPush.PushMsgResp, error) { //Call push module to send message to the user - MsgToUser(pbData) + switch pbData.MsgData.SessionType { + case constant.SuperGroupChatType: + MsgToSuperGroupUser(pbData) + default: + MsgToUser(pbData) + } return &pbPush.PushMsgResp{ ResultCode: 0, }, nil diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 3cd8b024a..419e817ca 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -133,6 +133,104 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } } +func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { + return + var wsResult []*pbRelay.SingleMsgToUser + isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) + log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String()) + if len(grpcCons) == 0 { + log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ") + grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + } + //Online push message + log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) + for _, v := range grpcCons { + msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) + reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID}) + if err != nil { + log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err) + continue + } + if reply != nil && reply.Resp != nil { + wsResult = append(wsResult, reply.Resp...) + } + } + log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) + successCount++ + if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { + for _, v := range wsResult { + if v.ResultCode == 0 { + if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { + break + } + continue + } + if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { + //Use offline push messaging + var UIDList []string + UIDList = append(UIDList, v.RecvID) + customContent := OpenIMContent{ + SessionType: int(pushMsg.MsgData.SessionType), + From: pushMsg.MsgData.SendID, + To: pushMsg.MsgData.RecvID, + Seq: pushMsg.MsgData.Seq, + } + bCustomContent, _ := json.Marshal(customContent) + jsonCustomContent := string(bCustomContent) + var content string + if pushMsg.MsgData.OfflinePushInfo != nil { + content = pushMsg.MsgData.OfflinePushInfo.Title + + } else { + switch pushMsg.MsgData.ContentType { + case constant.Text: + content = constant.ContentType2PushContent[constant.Text] + case constant.Picture: + content = constant.ContentType2PushContent[constant.Picture] + case constant.Voice: + content = constant.ContentType2PushContent[constant.Voice] + case constant.Video: + content = constant.ContentType2PushContent[constant.Video] + case constant.File: + content = constant.ContentType2PushContent[constant.File] + case constant.AtText: + a := AtContent{} + _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) + if utils.IsContain(v.RecvID, a.AtUserList) { + content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] + } else { + content = constant.ContentType2PushContent[constant.GroupMsg] + } + default: + content = constant.ContentType2PushContent[constant.Common] + } + } + callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList[0], pushMsg.MsgData.OfflinePushInfo, v.RecvPlatFormID) + log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp") + if callbackResp.ErrCode != 0 { + log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) + } + if callbackResp.ActionCode != constant.ActionAllow { + log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop") + break + } + + if offlinePusher == nil { + offlinePusher = jpush.JPushClient + } + pushResult, err := offlinePusher.Push(UIDList, content, jsonCustomContent, pushMsg.OperationID) + if err != nil { + log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) + } else { + log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) + } + break + } + + } + + } +} //func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) { // m.MsgID = rpcChat.GetMsgID(m.SendID)