From 09460ccefaaf53b2cff21afd680198826baf2f0a Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Mon, 13 Apr 2026 23:14:18 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=A8=E6=88=B7=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/rtc-signaling.md | 288 ---------------------- internal/rpc/msg/send.go | 5 +- internal/rpc/msg/verify.go | 1 - internal/rpc/rtc/signal.go | 258 ++++++++++++++----- pkg/common/storage/database/mgo/signal.go | 6 + pkg/common/storage/model/signal.go | 5 + 6 files changed, 215 insertions(+), 348 deletions(-) delete mode 100644 docs/rtc-signaling.md diff --git a/docs/rtc-signaling.md b/docs/rtc-signaling.md deleted file mode 100644 index e29921bdf..000000000 --- a/docs/rtc-signaling.md +++ /dev/null @@ -1,288 +0,0 @@ -# OpenIM 音视频(RTC)信令与媒体 — 技术说明 - -## 1. 职责边界 - -| 维度 | 说明 | -|------|------| -| OpenIM | 呼叫信令编排、邀请状态(Mongo)、LiveKit 房间创建/删除、进房 JWT、通过消息链路把信令投递到对端(在线 WebSocket + 离线推送)。 | -| LiveKit | WebRTC 媒体面(客户端持 Token 连接 `externalAddress`)。 | -| 协议 | `protocol/rtc/rtc.proto`;通知类 `ContentType` 见 `protocol/constant/rtc.go`。 | - ---- - -## 2. 服务与配置 - -| 项 | 位置 | -|----|------| -| RTC 进程入口 | `pkg/common/cmd/rpc_rtc.go` → `internal/rpc/rtc.Start` | -| 实现 | `internal/rpc/rtc/server.go`、`internal/rpc/rtc/signal.go` | -| 注册名 | `config.Share.RpcRegisterName.Rtc` | -| LiveKit 配置 | `pkg/common/config/config.go`:`LiveKit`(`internalAddress`、`externalAddress`、`apiKey`、`apiSecret`、`tokenExpiry`) | - ---- - -## 3. 接入方式(概览) - -### 3.1 WebSocket - -1. `internal/msggateway/ws_server.go`:注入 `RtcServiceClient`。 -2. `internal/msggateway/client.go`:`ReqIdentifier == WSSendSignalMsg`(**1004**)。 -3. `internal/msggateway/message_handler.go`:`SendSignalMessage` → `SignalMessageAssemble`(体为 `SignalMessageAssembleReq` 或裸 `SignalReq`)。 -4. 返回:`SignalResp` 的 protobuf 二进制放在 `Resp.Data`。 - -### 3.2 HTTP(`/rtc`) - -路由:`internal/api/router.go`;封装:`internal/api/rtc.go`(`a2r.Call` 将 HTTP 体映射到同名 gRPC)。Prometheus 发现:`GET /prometheus_discovery/rtc`。 - -**详细接口与链路见第 4 节。** - ---- - -## 4. 接口清单与各接口调用链路 - -对外暴露形态包括:**gRPC 方法名**(服务 `openim.rtc.RtcService`)、**HTTP**(OpenIM API 网关)、以及 **WebSocket**(仅映射到 `SignalMessageAssemble`)。以下链路按代码真实调用顺序描述。 - -### 4.1 接口总表 - -| # | gRPC 方法 | HTTP 路径 | WebSocket | -|---|-----------|-----------|-----------| -| 1 | `SignalMessageAssemble` | `POST /rtc/signal_message_assemble` | `ReqIdentifier=1004`(`WSSendSignalMsg`) | -| 2 | `SignalGetRoomByGroupID` | `POST /rtc/signal_get_room_by_group_id` | — | -| 3 | `SignalGetTokenByRoomID` | `POST /rtc/signal_get_token_by_room_id` | — | -| 4 | `SignalGetRooms` | `POST /rtc/signal_get_rooms` | — | -| 5 | `GetSignalInvitationInfo` | `POST /rtc/get_signal_invitation_info` | — | -| 6 | `GetSignalInvitationInfoStartApp` | `POST /rtc/get_signal_invitation_info_start_app` | — | -| 7 | `SignalSendCustomSignal` | `POST /rtc/signal_send_custom_signal` | — | -| 8 | `GetSignalInvitationRecords` | `POST /rtc/get_signal_invitation_records` | — | -| 9 | `DeleteSignalRecords` | `POST /rtc/delete_signal_records` | — | - -说明:`SignalReq` 内嵌的 **`getTokenByRoomID`** 与独立 RPC **`SignalGetTokenByRoomID`** 在服务端均落到 `genToken` + 返回 `liveURL`,前者经 `SignalMessageAssemble` 分发,后者经 HTTP 直达同名 gRPC。 - ---- - -### 4.2 `SignalMessageAssemble` - -**作用**:处理一路信令请求,返回 `SignalResp`;部分分支会写 Mongo、调 LiveKit、并通过 Msg 服务发 1601 通知。 - -**入口 A — WebSocket** - -1. 客户端发送二进制帧 → `internal/msggateway/client.go` 按 `ReqIdentifier==1004` 分支。 -2. `LongConnServer.SendSignalMessage` → `GrpcHandler.SendSignalMessage`(`internal/msggateway/message_handler.go`)。 -3. `proto.Unmarshal`:`SignalMessageAssembleReq`;若失败则解 `SignalReq` 并填入 `assembleReq.SignalReq`。 -4. `RtcServiceClient.SignalMessageAssemble(ctx, assembleReq)`(gRPC 至 **rtc 进程**)。 -5. `internal/rpc/rtc/signal.go`:`rtcServer.SignalMessageAssemble` → `switch req.SignalReq.Payload` → `handleInvite` / `handleInviteInGroup` / `handleCancel` / `handleAccept` / `handleHungUp` / `handleReject` / `handleGetTokenByRoomID`。 -6. 返回 `SignalMessageAssembleResp` → 网关将 `SignalResp` `proto.Marshal` → `Resp.Data` 回客户端。 - -**入口 B — HTTP** - -1. `POST /rtc/signal_message_assemble` → `internal/api/rtc.go`:`RtcApi.SignalMessageAssemble`。 -2. `github.com/openimsdk/tools/a2r.Call`:解析 Gin 请求体 → 调用 `RtcServiceClient.SignalMessageAssemble`。 -3. 后续与步骤 5–6 相同(响应经 HTTP 返回,而非 WS `Resp`)。 - -**分支内典型下游(仅当对应 payload 触发时)** - -| 子逻辑 | LiveKit | Mongo(`controller.RtcDatabase` → `mgo/signal`) | Msg(`rpcli.MsgClient.SendMsg`) | -|--------|---------|---------------------------------------------------|----------------------------------| -| `handleInvite` | `CreateRoom` | `CreateInvitation` | 对每个被叫 `sendSignalingNotification`(1601) | -| `handleInviteInGroup` | `CreateRoom` | `CreateInvitation` | 同上(`SessionType` 为群) | -| `handleAccept` | — | — | 通知主叫 1601 | -| `handleReject` | — | `DeleteInvitation` / `RemoveInvitee` | 通知主叫 1601 | -| `handleCancel` | — | `DeleteInvitation` | 通知被叫 1601 | -| `handleHungUp` | `DeleteRoom` | `DeleteInvitation` | 通知对端 1601 | -| `handleGetTokenByRoomID` | — | — | — | - -**若发生 `SendMsg`(1601)**,后续链路见 **第 7 节**(Kafka → msg_transfer → push → 网关 `WSPushMsg` 2001)。 - ---- - -### 4.3 `SignalGetRoomByGroupID` - -**作用**:按群 ID 查当前(或最近)邀请信息,返回 `InvitationInfo` 与 `roomID`。 - -**HTTP 链路** - -1. `POST /rtc/signal_get_room_by_group_id` → `RtcApi.SignalGetRoomByGroupID` → `a2r.Call` → gRPC。 -2. `internal/rpc/rtc/signal.go`:`SignalGetRoomByGroupID` → `db.GetInvitationByGroupID` → Mongo `signal_invitation`(`mgo/signal.go`)。 -3. `modelToInvitationInfo` 填响应返回。 - -**不经过**:LiveKit、Msg、Kafka。 - ---- - -### 4.4 `SignalGetTokenByRoomID`(独立 RPC) - -**作用**:已有房间时,仅为指定用户签发 LiveKit JWT,并返回 `liveURL`(`ExternalAddress`)。 - -**HTTP 链路** - -1. `POST /rtc/signal_get_token_by_room_id` → `RtcApi.SignalGetTokenByRoomID` → `a2r.Call` → gRPC。 -2. `internal/rpc/rtc/signal.go`:`SignalGetTokenByRoomID`(与 `handleGetTokenByRoomID` 同源逻辑)→ `genToken(roomID, userID)`。 -3. 返回 `SignalGetTokenByRoomIDResp`。 - -**不经过**:Mongo(不校验邀请是否存在)、Msg、Kafka。 - ---- - -### 4.5 `SignalGetRooms` - -**作用**:批量 `roomID` 查询邀请信息列表。 - -**HTTP 链路** - -1. `POST /rtc/signal_get_rooms` → `RtcApi.SignalGetRooms` → `a2r.Call` → gRPC。 -2. `SignalGetRooms` → `db.GetInvitationsByRoomIDs` → Mongo。 -3. 组装 `[]*SignalGetRoomByGroupIDResp` 返回。 - -**不经过**:LiveKit、Msg、Kafka。 - ---- - -### 4.6 `GetSignalInvitationInfo` - -**作用**:按 **roomID** 查邀请详情及离线推送字段。 - -**HTTP 链路** - -1. `POST /rtc/get_signal_invitation_info` → `RtcApi.GetSignalInvitationInfo` → `a2r.Call` → gRPC。 -2. `GetSignalInvitationInfo` → `db.GetInvitationByRoomID` → Mongo。 -3. 填充 `InvitationInfo`、`OfflinePushInfo` 返回。 - -**不经过**:LiveKit、Msg、Kafka。 - ---- - -### 4.7 `GetSignalInvitationInfoStartApp` - -**作用**:按 **被叫 userID** 查其相关待处理邀请(冷启动拉铃场景)。 - -**HTTP 链路** - -1. `POST /rtc/get_signal_invitation_info_start_app` → `RtcApi.GetSignalInvitationInfoStartApp` → `a2r.Call` → gRPC。 -2. `GetSignalInvitationInfoStartApp` → `db.GetInvitationByInviteeUserID` → Mongo(`invitee_user_id_list` 查询)。 -3. 返回邀请与 `OfflinePushInfo`。 - -**不经过**:LiveKit、Msg、Kafka。 - ---- - -### 4.8 `SignalSendCustomSignal` - -**作用**:向房间内除操作者外的参与者广播 **自定义信令**(系统消息 **1605**)。 - -**HTTP 链路** - -1. `POST /rtc/signal_send_custom_signal` → `RtcApi.SignalSendCustomSignal` → `a2r.Call` → gRPC。 -2. `SignalSendCustomSignal` → `db.GetInvitationByRoomID`(取邀请内 `InviteeUserIDList` + `InviterUserID`)。 -3. `mcontext.GetOpUserID(ctx)` 排除发送者自己。 -4. 对每个目标用户 `sendCustomSignalNotification` → `MsgClient.SendMsg`(`ContentType=CustomSignalNotification`,JSON body)。 -5. 若第 2 步查无邀请:打日志后返回空成功(不报错)。 - -**若发生 `SendMsg`**:后续同第 7 节(1605 走消息总线与推送)。 - ---- - -### 4.9 `GetSignalInvitationRecords` - -**作用**:分页查询通话/信令话单(`signal_record`)。 - -**HTTP 链路** - -1. `POST /rtc/get_signal_invitation_records` → `RtcApi.GetSignalInvitationRecords` → `a2r.Call` → gRPC。 -2. `GetSignalInvitationRecords` → `db.SearchRecords`(`sendID` / `recvID` / `sessionType` / 时间范围 / 分页)→ Mongo `signal_record`。 -3. 映射为 `[]*rtc.SignalRecord` 返回。 - -**不经过**:LiveKit、Msg、Kafka。 - ---- - -### 4.10 `DeleteSignalRecords` - -**作用**:按话单主键 `SID` 列表删除记录。 - -**HTTP 链路** - -1. `POST /rtc/delete_signal_records` → `RtcApi.DeleteSignalRecords` → `a2r.Call` → gRPC。 -2. `DeleteSignalRecords` → `db.DeleteRecords(sIDs)` → Mongo。 - -**不经过**:LiveKit、Msg、Kafka。 - ---- - -## 5. `SignalMessageAssemble` 行为摘要(payload 与副作用) - -(实现:`internal/rpc/rtc/signal.go`) - -| 动作 | LiveKit | Mongo | 通知 | -|------|---------|-------|------| -| Invite | CreateRoom | CreateInvitation | 向被叫发 1601 | -| InviteInGroup | CreateRoom | CreateInvitation | 向被叫发 1601(群 SessionType) | -| Accept | — | — | 通知主叫 1601 | -| Reject | — | DeleteInvitation / RemoveInvitee | 通知主叫 | -| Cancel | — | DeleteInvitation | 通知被叫 | -| HungUp | **DeleteRoom** | DeleteInvitation | 通知对端 | -| GetTokenByRoomID(嵌在 SignalReq) | — | — | — | - -Token:`github.com/livekit/protocol/auth`,`VideoGrant`(`RoomJoin` + `Room` + `Identity`),有效期由配置决定。 - ---- - -## 6. Mongo - -- 集合:`signal_invitation`、`signal_record`(`pkg/common/storage/database/name.go`)。 -- 模型:`pkg/common/storage/model/signal.go`。 -- DAO:`pkg/common/storage/database/mgo/signal.go`。 -- 控制器:`pkg/common/storage/controller/rtc.go`。 - -话单 `SignalRecord` 的写入需结合业务;`GetSignalInvitationRecords` 依赖该集合已有数据。 - ---- - -## 7. 信令进消息链路(`SendMsg` 之后) - -适用于:`sendSignalingNotification`(1601)、`sendCustomSignalNotification`(1605)。 - -1. `MsgClient.SendMsg` → `internal/rpc/msg/send.go`(按 `SessionType` 走单聊/群聊分支)。 -2. `MsgToMQ` → Kafka **`toRedisTopic`**(key:单聊为 `GenConversationUniqueKeyForSingle`;群为 `GroupID`)。 -3. `msg_transfer`(`internal/msgtransfer/online_history_msg_handler.go`)消费 → Redis seq → **`toMongoTopic`** → **`toPushTopic`**。 -4. `push`(`internal/push/push_handler.go`)消费 `toPushTopic` → `Push2User` / `Push2Group` → 网关 RPC。 -5. 网关 `Client.PushMessage`(`internal/msggateway/client.go`):**`ReqIdentifier = WSPushMsg`(2001)**,`Data` 为 `sdkws.PushMessages` 的 protobuf。 - -离线推送:`SignalingNotification` 可走离线;`RoomParticipantsConnected/Disconnected`(1602/1603)在 push 逻辑中默认不触发离线推。 - ---- - -## 8. `MsgData.Options` 与会话 ID(缺省行为) - -`pkg/msgprocessor/options.go`:`Options.Is(key)` 在 **key 未设置时视为 true**。 - -RTC 侧 `sendSignalingNotification` 使用 `make(map[string]bool)` 空 map,故 `IsHistory` / `IsNotNotification` 等表现为 true,信令在 transfer 中多走**落库 + 带 seq 后推送**路径。 - -网关下行使用 `GetConversationIDByMsg`:单聊信令默认挂在 **`si_*`** 的 `PushMessages.Msgs` 中(`IsNotification` 为 false 的前缀规则)。 - ---- - -## 9. 已知风险与排查 - -- **群通话信令**:当前构造通知时若未设置 `MsgData.GroupID`,`sendMsgGroupChat` 的 Kafka key 与 `Push2Group(ctx, groupID, ...)` 可能异常;建议在发群信令时写入与 `InvitationInfo.group_id` 一致的 `GroupID`。 -- **常量 1602–1604**:协议与 push 有特殊分支,但 `internal/rpc/rtc` 主路径主要发 1601/1605;若产品需要房间成员/流状态通知,需在扩展路径发送。 - ---- - -## 10. 常量(节选) - -| 值 | 含义 | -|----|------| -| 1601 | `SignalingNotification` | -| 1605 | `CustomSignalNotification` | -| 1602–1604 | 房间参与者/流变更等(push 对 1602/1603 限制离线推) | - ---- - -## 11. 端到端链路(简图) - -```text -客户端 → [WS 1004 或 HTTP /rtc] → rtc RPC → LiveKit + Mongo - → msg SendMsg → Kafka(toRedis) → msg_transfer → Kafka(toPush) - → push → msg_gateway → WS 2001 (PushMessages) -客户端 ← LiveKit(media) + OpenIM(信令推送) -``` diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 98a9f7616..f754f9057 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -153,10 +153,13 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgR } func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { + if err := m.messageVerification(ctx, req); err != nil { + return nil, err + } + isSend := true isNotification := msgprocessor.IsNotificationByMsg(req.MsgData) log.ZInfo(ctx, "sendMsgSingleChat", "isNotification", isNotification, "msgdata", req.MsgData) - isSend := true if !isNotification { // 非通知类消息:执行发送权限校验 + 接收偏好校验(含 blacklist / MsgReceiveSetting / webhook / FriendVerify / globalOpt / convOpt) isSend, err = m.modifyMessageByUserMessageReceiveOpt( diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 574cc79c7..8b4d53dd0 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -53,7 +53,6 @@ type MessageRevoked struct { func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error { switch data.MsgData.SessionType { case constant.SingleChatType: - // 单聊发送权限校验已迁移至 modifyMessageByUserMessageReceiveOpt return nil case constant.ReadGroupChatType: groupInfo, err := m.GroupLocalCache.GetGroupInfo(ctx, data.MsgData.GroupID) diff --git a/internal/rpc/rtc/signal.go b/internal/rpc/rtc/signal.go index 69551d57f..0aa131dc7 100644 --- a/internal/rpc/rtc/signal.go +++ b/internal/rpc/rtc/signal.go @@ -32,6 +32,7 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" + "go.mongodb.org/mongo-driver/mongo" "google.golang.org/protobuf/proto" ) @@ -76,11 +77,11 @@ func (s *rtcServer) SignalMessageAssemble(ctx context.Context, req *rtc.SignalMe resp.Payload = &rtc.SignalResp_GetTokenByRoomID{GetTokenByRoomID: r} respErr = err default: - log.ZError(ctx, "SignalMessageAssemble", respErr, "r", respErr.Error()) + // Fix P0: 原代码在此调用 respErr.Error(),而 respErr 为 nil,会直接 panic return nil, errs.ErrArgs.WrapMsg("unknown signal payload type") } if respErr != nil { - log.ZError(ctx, "SignalMessageAssemble", respErr, "r", respErr.Error()) + log.ZError(ctx, "SignalMessageAssemble", respErr, "err", respErr.Error()) return nil, respErr } return &rtc.SignalMessageAssembleResp{SignalResp: &resp}, nil @@ -93,9 +94,8 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, log.ZError(ctx, "handleInvite", errs.ErrArgs, "r", "invitation is nil") return nil, errs.ErrArgs.WrapMsg("invitation is nil") } - if inv.RoomID == "" { - inv.RoomID = newRoomID() - } + // Fix P3: RoomID 统一由服务端生成,忽略客户端传入的值(客户端不应决定 RoomID) + inv.RoomID = newRoomID() inv.InviterUserID = req.UserID inv.InitiateTime = time.Now().UnixMilli() @@ -118,19 +118,38 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, token, err := s.genToken(inv.RoomID, req.UserID) if err != nil { - log.ZError(ctx, "handleInvite", err, "r", err.Error()) + // LiveKit Room 已创建,需要回滚 + if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil { + log.ZWarn(ctx, "handleInvite: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID) + } return nil, err } + // Fix P1/幂等: CreateInvitation 失败分两种情况: + // - 重复 key(相同 roomID 重试)→ 认为幂等成功,直接返回 + // - 其他错误 → 回滚 LiveKit Room,返回错误 + // Fix P0: 原代码对失败仅打 warn,导致 DB 无记录、Room 泄漏、后续流程断裂 if err := s.db.CreateInvitation(ctx, invitationToModel(inv, req.OfflinePushInfo)); err != nil { - log.ZWarn(ctx, "CreateInvitation failed", err, "roomID", inv.RoomID) + if mongo.IsDuplicateKeyError(err) { + log.ZWarn(ctx, "handleInvite: duplicate invitation (idempotent retry)", err, "roomID", inv.RoomID) + } else { + if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil { + log.ZWarn(ctx, "handleInvite: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID) + } + return nil, errs.WrapMsg(err, "CreateInvitation failed", "roomID", inv.RoomID) + } } - content := marshalSignalReq(signalReq) + content, err := marshalSignalReq(signalReq) + if err != nil { + return nil, err + } + // Fix P1: 1v1 场景下,通知失败应返回错误(被叫收不到来电意味着主叫白等) for _, inviteeID := range inv.InviteeUserIDList { log.ZInfo(ctx, "sendSignalingNotification to invitee", "sendID", req.UserID, "recvID", inviteeID) if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), req.OfflinePushInfo, content); err != nil { - log.ZWarn(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID) + log.ZError(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID) + return nil, errs.WrapMsg(err, "failed to notify invitee", "inviteeID", inviteeID) } } @@ -148,9 +167,8 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi if inv == nil { return nil, errs.ErrArgs.WrapMsg("invitation is nil") } - if inv.RoomID == "" { - inv.RoomID = newRoomID() - } + // Fix P3: RoomID 统一由服务端生成 + inv.RoomID = newRoomID() inv.InviterUserID = req.UserID inv.InitiateTime = time.Now().UnixMilli() @@ -160,14 +178,27 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi token, err := s.genToken(inv.RoomID, req.UserID) if err != nil { + if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil { + log.ZWarn(ctx, "handleInviteInGroup: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID) + } return nil, err } + // Fix P0: CreateInvitation 失败需要回滚 LiveKit Room if err := s.db.CreateInvitation(ctx, invitationToModel(inv, req.OfflinePushInfo)); err != nil { - log.ZWarn(ctx, "CreateInvitation failed", err, "roomID", inv.RoomID) + if !mongo.IsDuplicateKeyError(err) { + if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil { + log.ZWarn(ctx, "handleInviteInGroup: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID) + } + return nil, errs.WrapMsg(err, "CreateInvitation failed", "roomID", inv.RoomID) + } + log.ZWarn(ctx, "handleInviteInGroup: duplicate invitation (idempotent retry)", err, "roomID", inv.RoomID) } - content := marshalSignalReq(signalReq) + content, err := marshalSignalReq(signalReq) + if err != nil { + return nil, err + } for _, inviteeID := range inv.InviteeUserIDList { allowed, err := s.isCallAllowed(ctx, req.UserID, inviteeID) if err != nil { @@ -215,56 +246,90 @@ func (s *rtcServer) isCallAllowed(ctx context.Context, inviterID, inviteeID stri } // handleAccept processes a call acceptance. +// Fix P1(安全): 原代码完全信任客户端传入的 Invitation,未从 DB 校验邀请真实存在。 +// 攻击者可伪造任意 RoomID/InviterUserID 来获取 LiveKit Token 并加入房间。 func (s *rtcServer) handleAccept(ctx context.Context, req *rtc.SignalAcceptReq, signalReq *rtc.SignalReq) (*rtc.SignalAcceptResp, error) { - inv := req.Invitation - if inv == nil { + if req.Invitation == nil { return nil, errs.ErrArgs.WrapMsg("invitation is nil") } - token, err := s.genToken(inv.RoomID, req.UserID) + // 从 DB 获取权威邀请数据,验证邀请存在且 userID 在被邀请人列表中 + dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID) + if err != nil { + return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID) + } + if !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) { + return nil, errs.ErrNoPermission.WrapMsg("user not in invitee list", "userID", req.UserID) + } + + token, err := s.genToken(dbInv.RoomID, req.UserID) if err != nil { return nil, err } sessionType := int32(constant.SingleChatType) - if inv.GroupID != "" { + if dbInv.GroupID != "" { sessionType = int32(constant.ReadGroupChatType) } - content := marshalSignalReq(signalReq) - if err := s.sendSignalingNotification(ctx, req.UserID, inv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil { - log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", inv.InviterUserID) + + content, err := marshalSignalReq(signalReq) + if err != nil { + return nil, err + } + // 使用 DB 中的 InviterUserID,防止客户端伪造通知目标 + if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil { + log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", dbInv.InviterUserID) + } + + // Fix P2: 1v1 通话接受后删除邀请记录,避免冷启动时重复弹出已接通的来电 + // TODO: 群通话可通过 RemoveInvitee 实现精细化状态管理 + if dbInv.GroupID == "" { + if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil { + log.ZWarn(ctx, "handleAccept: DeleteInvitation failed (non-fatal)", err, "roomID", dbInv.RoomID) + } } return &rtc.SignalAcceptResp{ Token: token, - RoomID: inv.RoomID, + RoomID: dbInv.RoomID, LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress, }, nil } // handleReject processes a call rejection. +// Fix P1(安全): 从 DB 验证邀请存在,并使用 DB 中的 InviterUserID,防止客户端伪造通知目标。 func (s *rtcServer) handleReject(ctx context.Context, req *rtc.SignalRejectReq, signalReq *rtc.SignalReq) (*rtc.SignalRejectResp, error) { - inv := req.Invitation - if inv == nil { + if req.Invitation == nil { return nil, errs.ErrArgs.WrapMsg("invitation is nil") } + dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID) + if err != nil { + return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID) + } + if !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) { + return nil, errs.ErrNoPermission.WrapMsg("user not in invitee list", "userID", req.UserID) + } + sessionType := int32(constant.SingleChatType) - if inv.GroupID != "" { + if dbInv.GroupID != "" { sessionType = int32(constant.ReadGroupChatType) } - content := marshalSignalReq(signalReq) - if err := s.sendSignalingNotification(ctx, req.UserID, inv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil { - log.ZWarn(ctx, "sendSignalingNotification reject to inviter failed", err, "inviterID", inv.InviterUserID) + content, err := marshalSignalReq(signalReq) + if err != nil { + return nil, err + } + if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil { + log.ZWarn(ctx, "sendSignalingNotification reject to inviter failed", err, "inviterID", dbInv.InviterUserID) } - if inv.GroupID != "" { - if err := s.db.RemoveInvitee(ctx, inv.RoomID, req.UserID); err != nil { - log.ZWarn(ctx, "RemoveInvitee failed", err, "roomID", inv.RoomID, "userID", req.UserID) + if dbInv.GroupID != "" { + if err := s.db.RemoveInvitee(ctx, dbInv.RoomID, req.UserID); err != nil { + log.ZWarn(ctx, "RemoveInvitee failed", err, "roomID", dbInv.RoomID, "userID", req.UserID) } } else { - if err := s.db.DeleteInvitation(ctx, inv.RoomID); err != nil { - log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", inv.RoomID) + if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil { + log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", dbInv.RoomID) } } @@ -272,62 +337,94 @@ func (s *rtcServer) handleReject(ctx context.Context, req *rtc.SignalRejectReq, } // handleCancel processes a call cancellation. +// Fix P1(安全): 从 DB 验证操作者是邀请发起方,防止被叫方冒充取消通话。 func (s *rtcServer) handleCancel(ctx context.Context, req *rtc.SignalCancelReq, signalReq *rtc.SignalReq) (*rtc.SignalCancelResp, error) { - inv := req.Invitation - if inv == nil { + if req.Invitation == nil { return nil, errs.ErrArgs.WrapMsg("invitation is nil") } + dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID) + if err != nil { + return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID) + } + if req.UserID != dbInv.InviterUserID { + return nil, errs.ErrNoPermission.WrapMsg("only the inviter can cancel", "userID", req.UserID, "inviterUserID", dbInv.InviterUserID) + } + sessionType := int32(constant.SingleChatType) - if inv.GroupID != "" { + if dbInv.GroupID != "" { sessionType = int32(constant.ReadGroupChatType) } - content := marshalSignalReq(signalReq) - for _, inviteeID := range inv.InviteeUserIDList { + content, err := marshalSignalReq(signalReq) + if err != nil { + return nil, err + } + for _, inviteeID := range dbInv.InviteeUserIDList { if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, sessionType, req.OfflinePushInfo, content); err != nil { log.ZWarn(ctx, "sendSignalingNotification cancel to invitee failed", err, "inviteeID", inviteeID) } } - if err := s.db.DeleteInvitation(ctx, inv.RoomID); err != nil { - log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", inv.RoomID) + if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil { + log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", dbInv.RoomID) } return &rtc.SignalCancelResp{}, nil } // handleHungUp processes a call hang-up. +// Fix P1(安全): 从 DB 验证操作者是通话参与者,防止任意用户挂断他人通话并删除 LiveKit Room。 func (s *rtcServer) handleHungUp(ctx context.Context, req *rtc.SignalHungUpReq, signalReq *rtc.SignalReq) (*rtc.SignalHungUpResp, error) { - inv := req.Invitation - if inv == nil { + if req.Invitation == nil { return nil, errs.ErrArgs.WrapMsg("invitation is nil") } + dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID) + if err != nil { + return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID) + } + if req.UserID != dbInv.InviterUserID && !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) { + return nil, errs.ErrNoPermission.WrapMsg("user is not a participant of this call", "userID", req.UserID) + } + sessionType := int32(constant.SingleChatType) - if inv.GroupID != "" { + if dbInv.GroupID != "" { sessionType = int32(constant.ReadGroupChatType) } - content := marshalSignalReq(signalReq) - for _, peerID := range hungUpPeerIDs(inv, req.UserID) { + content, err := marshalSignalReq(signalReq) + if err != nil { + return nil, err + } + // 使用 DB 中的参与者列表,不信任客户端传入的 InviteeUserIDList + for _, peerID := range hungUpPeerIDsFromDB(dbInv, req.UserID) { if err := s.sendSignalingNotification(ctx, req.UserID, peerID, sessionType, req.OfflinePushInfo, content); err != nil { log.ZWarn(ctx, "sendSignalingNotification hungUp to peer failed", err, "peerID", peerID) } } // Terminate the LiveKit room - if _, err := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); err != nil { - log.ZWarn(ctx, "LiveKit DeleteRoom failed", err, "roomID", inv.RoomID) + if _, err := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: dbInv.RoomID}); err != nil { + log.ZWarn(ctx, "LiveKit DeleteRoom failed", err, "roomID", dbInv.RoomID) } - if err := s.db.DeleteInvitation(ctx, inv.RoomID); err != nil { - log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", inv.RoomID) + if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil { + log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", dbInv.RoomID) } return &rtc.SignalHungUpResp{}, nil } // handleGetTokenByRoomID returns a LiveKit token for an existing room. +// Fix P0(安全): 原代码无权限校验,任意已登录用户可获取任意房间的 Token,可窃听他人通话。 func (s *rtcServer) handleGetTokenByRoomID(ctx context.Context, req *rtc.SignalGetTokenByRoomIDReq) (*rtc.SignalGetTokenByRoomIDResp, error) { + dbInv, err := s.db.GetInvitationByRoomID(ctx, req.RoomID) + if err != nil { + return nil, errs.WrapMsg(err, "room not found or expired", "roomID", req.RoomID) + } + if req.UserID != dbInv.InviterUserID && !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) { + return nil, errs.ErrNoPermission.WrapMsg("user is not a participant of this room", "userID", req.UserID) + } + token, err := s.genToken(req.RoomID, req.UserID) if err != nil { return nil, err @@ -351,7 +448,16 @@ func (s *rtcServer) SignalGetRoomByGroupID(ctx context.Context, req *rtc.SignalG } // SignalGetTokenByRoomID returns a token for joining a room directly (HTTP API path). +// Fix P0(安全): 同 handleGetTokenByRoomID,添加参与者身份校验。 func (s *rtcServer) SignalGetTokenByRoomID(ctx context.Context, req *rtc.SignalGetTokenByRoomIDReq) (*rtc.SignalGetTokenByRoomIDResp, error) { + dbInv, err := s.db.GetInvitationByRoomID(ctx, req.RoomID) + if err != nil { + return nil, errs.WrapMsg(err, "room not found or expired", "roomID", req.RoomID) + } + if req.UserID != dbInv.InviterUserID && !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) { + return nil, errs.ErrNoPermission.WrapMsg("user is not a participant of this room", "userID", req.UserID) + } + token, err := s.genToken(req.RoomID, req.UserID) if err != nil { return nil, err @@ -421,10 +527,14 @@ func (s *rtcServer) SignalSendCustomSignal(ctx context.Context, req *rtc.SignalS return &rtc.SignalSendCustomSignalResp{}, nil } opUserID := mcontext.GetOpUserID(ctx) - content, _ := json.Marshal(map[string]any{ + // Fix P3: 处理 json.Marshal 错误 + content, err := json.Marshal(map[string]any{ "roomID": req.RoomID, "customInfo": req.CustomInfo, }) + if err != nil { + return nil, errs.WrapMsg(err, "marshal custom signal content failed") + } recipients := make([]string, 0, len(inv.InviteeUserIDList)+1) recipients = append(recipients, inv.InviteeUserIDList...) recipients = append(recipients, inv.InviterUserID) @@ -492,6 +602,29 @@ func (s *rtcServer) genToken(roomID, userID string) (string, error) { return at.ToJWT() } +// signalingMsgOptions 返回信令通知消息应设置的 Options。 +// +// Fix P2+P2(安全): 原代码传 make(map[string]bool) 空 map,导致: +// 1. IsNotificationByMsg 将信令消息误判为普通聊天消息,触发黑名单/好友关系等权限拦截 +// 2. IsHistory/IsPersistent 默认为 true,信令消息被写入历史记录占用存储 +// 3. IsUnreadCount/IsConversationUpdate 默认 true,污染未读数和会话列表 +// +// 信令消息应走 Notification 通道(对话 ID 前缀 "n_"),绕过聊天消息权限校验, +// 且不写历史、不计未读、不更新会话。离线推送根据 offlinePushInfo 控制,此处不强制关闭。 +func signalingMsgOptions() map[string]bool { + opts := make(map[string]bool, 8) + // IsNotNotification=false 表示"这是通知消息",让 IsNotificationByMsg 返回 true + // 从而跳过 modifyMessageByUserMessageReceiveOpt 中的黑名单/好友关系等校验 + datautil.SetSwitchFromOptions(opts, constant.IsNotNotification, false) + datautil.SetSwitchFromOptions(opts, constant.IsHistory, false) + datautil.SetSwitchFromOptions(opts, constant.IsPersistent, false) + datautil.SetSwitchFromOptions(opts, constant.IsUnreadCount, false) + datautil.SetSwitchFromOptions(opts, constant.IsConversationUpdate, false) + datautil.SetSwitchFromOptions(opts, constant.IsSenderConversationUpdate, false) + datautil.SetSwitchFromOptions(opts, constant.IsSenderSync, false) + return opts +} + // sendSignalingNotification sends a SignalingNotification message to a user via the msg service. func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvID string, sessionType int32, offlinePush *sdkws.OfflinePushInfo, content []byte) error { now := time.Now().UnixMilli() @@ -506,7 +639,7 @@ func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvI SendTime: now, ServerMsgID: uuid.New().String(), ClientMsgID: uuid.New().String(), - Options: make(map[string]bool), + Options: signalingMsgOptions(), } if offlinePush != nil { msgData.OfflinePushInfo = offlinePush @@ -536,15 +669,20 @@ func (s *rtcServer) sendCustomSignalNotification(ctx context.Context, sendID, re SendTime: now, ServerMsgID: uuid.New().String(), ClientMsgID: uuid.New().String(), - Options: make(map[string]bool), + Options: signalingMsgOptions(), } _, err := s.msgClient.MsgClient.SendMsg(ctx, &pbmsg.SendMsgReq{MsgData: msgData}) return err } -func marshalSignalReq(req *rtc.SignalReq) []byte { - b, _ := proto.Marshal(req) - return b +// marshalSignalReq serializes a SignalReq to bytes. +// Fix P2: 原代码使用 _ 吞掉错误,序列化失败时返回 nil,导致被叫收到空 Content 消息,来电通知丢失。 +func marshalSignalReq(req *rtc.SignalReq) ([]byte, error) { + b, err := proto.Marshal(req) + if err != nil { + return nil, errs.WrapMsg(err, "marshal SignalReq failed") + } + return b, nil } // newRoomID generates a unique room ID. @@ -554,6 +692,7 @@ func newRoomID() string { // invitationToModel converts a proto InvitationInfo to the database model. func invitationToModel(inv *rtc.InvitationInfo, push *sdkws.OfflinePushInfo) *model.SignalInvitation { + now := time.Now() m := &model.SignalInvitation{ RoomID: inv.RoomID, InviterUserID: inv.InviterUserID, @@ -566,7 +705,9 @@ func invitationToModel(inv *rtc.InvitationInfo, push *sdkws.OfflinePushInfo) *mo SessionType: inv.SessionType, InitiateTime: inv.InitiateTime, BusyLineUserIDList: inv.BusyLineUserIDList, - CreateTime: time.Now().UnixMilli(), + CreateTime: now.UnixMilli(), + // Fix P1(TTL): 根据 Timeout 设置过期时间,配合 MongoDB TTL 索引自动清理 + ExpireAt: now.Add(time.Duration(inv.Timeout+30) * time.Second), } if push != nil { m.OfflinePushTitle = push.Title @@ -596,8 +737,9 @@ func modelToInvitationInfo(m *model.SignalInvitation) *rtc.InvitationInfo { } } -// hungUpPeerIDs returns the IDs that should receive hang-up notification. -func hungUpPeerIDs(inv *rtc.InvitationInfo, callerID string) []string { +// hungUpPeerIDsFromDB returns IDs that should receive hang-up notification, based on authoritative DB data. +// Fix P1(安全): 原 hungUpPeerIDs 使用客户端传入的 inv,改为使用从 DB 获取的记录。 +func hungUpPeerIDsFromDB(inv *model.SignalInvitation, callerID string) []string { if callerID == inv.InviterUserID { return inv.InviteeUserIDList } diff --git a/pkg/common/storage/database/mgo/signal.go b/pkg/common/storage/database/mgo/signal.go index 2ae8b3feb..4a84b2cc9 100644 --- a/pkg/common/storage/database/mgo/signal.go +++ b/pkg/common/storage/database/mgo/signal.go @@ -39,6 +39,12 @@ func NewSignalMongo(db *mongo.Database) (database.SignalDatabase, error) { { Keys: bson.D{{Key: "create_time", Value: -1}}, }, + // Fix P1(TTL): expire_at 字段为 BSON Date,MongoDB 后台每 60s 扫描一次并自动删除过期文档。 + // 覆盖场景:被叫网络断开、主叫 App 被杀、任何异常中断导致没有 Cancel/Reject/HungUp 的情况。 + { + Keys: bson.D{{Key: "expire_at", Value: 1}}, + Options: options.Index().SetExpireAfterSeconds(0), + }, }) if err != nil { return nil, err diff --git a/pkg/common/storage/model/signal.go b/pkg/common/storage/model/signal.go index 91f241e98..1dc46c9e5 100644 --- a/pkg/common/storage/model/signal.go +++ b/pkg/common/storage/model/signal.go @@ -14,6 +14,8 @@ package model +import "time" + // SignalInvitation stores an ongoing or pending signal invitation, keyed by roomID. // It is created when a call is initiated and can be queried when the callee starts the app. type SignalInvitation struct { @@ -32,6 +34,9 @@ type SignalInvitation struct { OfflinePushDesc string `bson:"offline_push_desc"` OfflinePushEx string `bson:"offline_push_ex"` CreateTime int64 `bson:"create_time"` + // ExpireAt 是 MongoDB BSON Date 类型,供 TTL 索引自动清理过期邀请(无人响应/异常中断场景)。 + // 值 = 创建时间 + Timeout + 30s 缓冲,由 invitationToModel 负责填充。 + ExpireAt time.Time `bson:"expire_at"` } // SignalRecord stores a completed call record used for history queries.