用户设置

pull/3727/head
hawklin2017 2 months ago
parent 48e023882a
commit 09460ccefa

@ -1,288 +0,0 @@
# OpenIM 音视频RTC信令与媒体 — 技术说明
## 1. 职责边界
| 维度 | 说明 |
|------|------|
| OpenIM | 呼叫信令编排、邀请状态Mongo、LiveKit 房间创建/删除、进房 JWT、通过消息链路把信令投递到对端在线 WebSocket + 离线推送)。 |
| LiveKit | WebRTC 媒体面(客户端持 Token 连接 `externalAddress`)。 |
| 协议 | `protocol/rtc/rtc.proto`;通知类 `ContentType``protocol/constant/rtc.go`。 |
---
## 2. 服务与配置
| 项 | 位置 |
|----|------|
| RTC 进程入口 | `pkg/common/cmd/rpc_rtc.go``internal/rpc/rtc.Start` |
| 实现 | `internal/rpc/rtc/server.go`、`internal/rpc/rtc/signal.go` |
| 注册名 | `config.Share.RpcRegisterName.Rtc` |
| LiveKit 配置 | `pkg/common/config/config.go``LiveKit``internalAddress`、`externalAddress`、`apiKey`、`apiSecret`、`tokenExpiry` |
---
## 3. 接入方式(概览)
### 3.1 WebSocket
1. `internal/msggateway/ws_server.go`:注入 `RtcServiceClient`
2. `internal/msggateway/client.go``ReqIdentifier == WSSendSignalMsg`**1004**)。
3. `internal/msggateway/message_handler.go``SendSignalMessage` → `SignalMessageAssemble`(体为 `SignalMessageAssembleReq` 或裸 `SignalReq`)。
4. 返回:`SignalResp` 的 protobuf 二进制放在 `Resp.Data`
### 3.2 HTTP`/rtc`
路由:`internal/api/router.go`;封装:`internal/api/rtc.go``a2r.Call` 将 HTTP 体映射到同名 gRPC。Prometheus 发现:`GET /prometheus_discovery/rtc`。
**详细接口与链路见第 4 节。**
---
## 4. 接口清单与各接口调用链路
对外暴露形态包括:**gRPC 方法名**(服务 `openim.rtc.RtcService`)、**HTTP**OpenIM API 网关)、以及 **WebSocket**(仅映射到 `SignalMessageAssemble`)。以下链路按代码真实调用顺序描述。
### 4.1 接口总表
| # | gRPC 方法 | HTTP 路径 | WebSocket |
|---|-----------|-----------|-----------|
| 1 | `SignalMessageAssemble` | `POST /rtc/signal_message_assemble` | `ReqIdentifier=1004``WSSendSignalMsg` |
| 2 | `SignalGetRoomByGroupID` | `POST /rtc/signal_get_room_by_group_id` | — |
| 3 | `SignalGetTokenByRoomID` | `POST /rtc/signal_get_token_by_room_id` | — |
| 4 | `SignalGetRooms` | `POST /rtc/signal_get_rooms` | — |
| 5 | `GetSignalInvitationInfo` | `POST /rtc/get_signal_invitation_info` | — |
| 6 | `GetSignalInvitationInfoStartApp` | `POST /rtc/get_signal_invitation_info_start_app` | — |
| 7 | `SignalSendCustomSignal` | `POST /rtc/signal_send_custom_signal` | — |
| 8 | `GetSignalInvitationRecords` | `POST /rtc/get_signal_invitation_records` | — |
| 9 | `DeleteSignalRecords` | `POST /rtc/delete_signal_records` | — |
说明:`SignalReq` 内嵌的 **`getTokenByRoomID`** 与独立 RPC **`SignalGetTokenByRoomID`** 在服务端均落到 `genToken` + 返回 `liveURL`,前者经 `SignalMessageAssemble` 分发,后者经 HTTP 直达同名 gRPC。
---
### 4.2 `SignalMessageAssemble`
**作用**:处理一路信令请求,返回 `SignalResp`;部分分支会写 Mongo、调 LiveKit、并通过 Msg 服务发 1601 通知。
**入口 A — WebSocket**
1. 客户端发送二进制帧 → `internal/msggateway/client.go``ReqIdentifier==1004` 分支。
2. `LongConnServer.SendSignalMessage``GrpcHandler.SendSignalMessage``internal/msggateway/message_handler.go`)。
3. `proto.Unmarshal``SignalMessageAssembleReq`;若失败则解 `SignalReq` 并填入 `assembleReq.SignalReq`
4. `RtcServiceClient.SignalMessageAssemble(ctx, assembleReq)`gRPC 至 **rtc 进程**)。
5. `internal/rpc/rtc/signal.go``rtcServer.SignalMessageAssemble` → `switch req.SignalReq.Payload``handleInvite` / `handleInviteInGroup` / `handleCancel` / `handleAccept` / `handleHungUp` / `handleReject` / `handleGetTokenByRoomID`
6. 返回 `SignalMessageAssembleResp` → 网关将 `SignalResp` `proto.Marshal``Resp.Data` 回客户端。
**入口 B — HTTP**
1. `POST /rtc/signal_message_assemble``internal/api/rtc.go``RtcApi.SignalMessageAssemble`。
2. `github.com/openimsdk/tools/a2r.Call`:解析 Gin 请求体 → 调用 `RtcServiceClient.SignalMessageAssemble`
3. 后续与步骤 56 相同(响应经 HTTP 返回,而非 WS `Resp`)。
**分支内典型下游(仅当对应 payload 触发时)**
| 子逻辑 | LiveKit | Mongo`controller.RtcDatabase` → `mgo/signal` | Msg`rpcli.MsgClient.SendMsg` |
|--------|---------|---------------------------------------------------|----------------------------------|
| `handleInvite` | `CreateRoom` | `CreateInvitation` | 对每个被叫 `sendSignalingNotification`1601 |
| `handleInviteInGroup` | `CreateRoom` | `CreateInvitation` | 同上(`SessionType` 为群) |
| `handleAccept` | — | — | 通知主叫 1601 |
| `handleReject` | — | `DeleteInvitation` / `RemoveInvitee` | 通知主叫 1601 |
| `handleCancel` | — | `DeleteInvitation` | 通知被叫 1601 |
| `handleHungUp` | `DeleteRoom` | `DeleteInvitation` | 通知对端 1601 |
| `handleGetTokenByRoomID` | — | — | — |
**若发生 `SendMsg`1601**,后续链路见 **第 7 节**Kafka → msg_transfer → push → 网关 `WSPushMsg` 2001
---
### 4.3 `SignalGetRoomByGroupID`
**作用**:按群 ID 查当前(或最近)邀请信息,返回 `InvitationInfo``roomID`
**HTTP 链路**
1. `POST /rtc/signal_get_room_by_group_id``RtcApi.SignalGetRoomByGroupID``a2r.Call` → gRPC。
2. `internal/rpc/rtc/signal.go``SignalGetRoomByGroupID` → `db.GetInvitationByGroupID` → Mongo `signal_invitation``mgo/signal.go`)。
3. `modelToInvitationInfo` 填响应返回。
**不经过**LiveKit、Msg、Kafka。
---
### 4.4 `SignalGetTokenByRoomID`(独立 RPC
**作用**:已有房间时,仅为指定用户签发 LiveKit JWT并返回 `liveURL``ExternalAddress`)。
**HTTP 链路**
1. `POST /rtc/signal_get_token_by_room_id``RtcApi.SignalGetTokenByRoomID``a2r.Call` → gRPC。
2. `internal/rpc/rtc/signal.go``SignalGetTokenByRoomID`(与 `handleGetTokenByRoomID` 同源逻辑)→ `genToken(roomID, userID)`
3. 返回 `SignalGetTokenByRoomIDResp`
**不经过**Mongo不校验邀请是否存在、Msg、Kafka。
---
### 4.5 `SignalGetRooms`
**作用**:批量 `roomID` 查询邀请信息列表。
**HTTP 链路**
1. `POST /rtc/signal_get_rooms``RtcApi.SignalGetRooms``a2r.Call` → gRPC。
2. `SignalGetRooms``db.GetInvitationsByRoomIDs` → Mongo。
3. 组装 `[]*SignalGetRoomByGroupIDResp` 返回。
**不经过**LiveKit、Msg、Kafka。
---
### 4.6 `GetSignalInvitationInfo`
**作用**:按 **roomID** 查邀请详情及离线推送字段。
**HTTP 链路**
1. `POST /rtc/get_signal_invitation_info``RtcApi.GetSignalInvitationInfo``a2r.Call` → gRPC。
2. `GetSignalInvitationInfo``db.GetInvitationByRoomID` → Mongo。
3. 填充 `InvitationInfo`、`OfflinePushInfo` 返回。
**不经过**LiveKit、Msg、Kafka。
---
### 4.7 `GetSignalInvitationInfoStartApp`
**作用**:按 **被叫 userID** 查其相关待处理邀请(冷启动拉铃场景)。
**HTTP 链路**
1. `POST /rtc/get_signal_invitation_info_start_app``RtcApi.GetSignalInvitationInfoStartApp``a2r.Call` → gRPC。
2. `GetSignalInvitationInfoStartApp``db.GetInvitationByInviteeUserID` → Mongo`invitee_user_id_list` 查询)。
3. 返回邀请与 `OfflinePushInfo`
**不经过**LiveKit、Msg、Kafka。
---
### 4.8 `SignalSendCustomSignal`
**作用**:向房间内除操作者外的参与者广播 **自定义信令**(系统消息 **1605**)。
**HTTP 链路**
1. `POST /rtc/signal_send_custom_signal``RtcApi.SignalSendCustomSignal``a2r.Call` → gRPC。
2. `SignalSendCustomSignal``db.GetInvitationByRoomID`(取邀请内 `InviteeUserIDList` + `InviterUserID`)。
3. `mcontext.GetOpUserID(ctx)` 排除发送者自己。
4. 对每个目标用户 `sendCustomSignalNotification``MsgClient.SendMsg``ContentType=CustomSignalNotification`JSON body
5. 若第 2 步查无邀请:打日志后返回空成功(不报错)。
**若发生 `SendMsg`**:后续同第 7 节1605 走消息总线与推送)。
---
### 4.9 `GetSignalInvitationRecords`
**作用**:分页查询通话/信令话单(`signal_record`)。
**HTTP 链路**
1. `POST /rtc/get_signal_invitation_records``RtcApi.GetSignalInvitationRecords``a2r.Call` → gRPC。
2. `GetSignalInvitationRecords``db.SearchRecords``sendID` / `recvID` / `sessionType` / 时间范围 / 分页)→ Mongo `signal_record`
3. 映射为 `[]*rtc.SignalRecord` 返回。
**不经过**LiveKit、Msg、Kafka。
---
### 4.10 `DeleteSignalRecords`
**作用**:按话单主键 `SID` 列表删除记录。
**HTTP 链路**
1. `POST /rtc/delete_signal_records``RtcApi.DeleteSignalRecords``a2r.Call` → gRPC。
2. `DeleteSignalRecords``db.DeleteRecords(sIDs)` → Mongo。
**不经过**LiveKit、Msg、Kafka。
---
## 5. `SignalMessageAssemble` 行为摘要payload 与副作用)
(实现:`internal/rpc/rtc/signal.go`
| 动作 | LiveKit | Mongo | 通知 |
|------|---------|-------|------|
| Invite | CreateRoom | CreateInvitation | 向被叫发 1601 |
| InviteInGroup | CreateRoom | CreateInvitation | 向被叫发 1601群 SessionType |
| Accept | — | — | 通知主叫 1601 |
| Reject | — | DeleteInvitation / RemoveInvitee | 通知主叫 |
| Cancel | — | DeleteInvitation | 通知被叫 |
| HungUp | **DeleteRoom** | DeleteInvitation | 通知对端 |
| GetTokenByRoomID嵌在 SignalReq | — | — | — |
Token`github.com/livekit/protocol/auth``VideoGrant``RoomJoin` + `Room` + `Identity`),有效期由配置决定。
---
## 6. Mongo
- 集合:`signal_invitation`、`signal_record``pkg/common/storage/database/name.go`)。
- 模型:`pkg/common/storage/model/signal.go`。
- DAO`pkg/common/storage/database/mgo/signal.go`。
- 控制器:`pkg/common/storage/controller/rtc.go`。
话单 `SignalRecord` 的写入需结合业务;`GetSignalInvitationRecords` 依赖该集合已有数据。
---
## 7. 信令进消息链路(`SendMsg` 之后)
适用于:`sendSignalingNotification`1601、`sendCustomSignalNotification`1605
1. `MsgClient.SendMsg``internal/rpc/msg/send.go`(按 `SessionType` 走单聊/群聊分支)。
2. `MsgToMQ` → Kafka **`toRedisTopic`**key单聊为 `GenConversationUniqueKeyForSingle`;群为 `GroupID`)。
3. `msg_transfer``internal/msgtransfer/online_history_msg_handler.go`)消费 → Redis seq → **`toMongoTopic`** → **`toPushTopic`**。
4. `push``internal/push/push_handler.go`)消费 `toPushTopic``Push2User` / `Push2Group` → 网关 RPC。
5. 网关 `Client.PushMessage``internal/msggateway/client.go`**`ReqIdentifier = WSPushMsg`2001**`Data` 为 `sdkws.PushMessages` 的 protobuf。
离线推送:`SignalingNotification` 可走离线;`RoomParticipantsConnected/Disconnected`1602/1603在 push 逻辑中默认不触发离线推。
---
## 8. `MsgData.Options` 与会话 ID缺省行为
`pkg/msgprocessor/options.go``Options.Is(key)` 在 **key 未设置时视为 true**
RTC 侧 `sendSignalingNotification` 使用 `make(map[string]bool)` 空 map`IsHistory` / `IsNotNotification` 等表现为 true信令在 transfer 中多走**落库 + 带 seq 后推送**路径。
网关下行使用 `GetConversationIDByMsg`:单聊信令默认挂在 **`si_*`** 的 `PushMessages.Msgs` 中(`IsNotification` 为 false 的前缀规则)。
---
## 9. 已知风险与排查
- **群通话信令**:当前构造通知时若未设置 `MsgData.GroupID``sendMsgGroupChat` 的 Kafka key 与 `Push2Group(ctx, groupID, ...)` 可能异常;建议在发群信令时写入与 `InvitationInfo.group_id` 一致的 `GroupID`
- **常量 16021604**:协议与 push 有特殊分支,但 `internal/rpc/rtc` 主路径主要发 1601/1605若产品需要房间成员/流状态通知,需在扩展路径发送。
---
## 10. 常量(节选)
| 值 | 含义 |
|----|------|
| 1601 | `SignalingNotification` |
| 1605 | `CustomSignalNotification` |
| 16021604 | 房间参与者/流变更等push 对 1602/1603 限制离线推) |
---
## 11. 端到端链路(简图)
```text
客户端 → [WS 1004 或 HTTP /rtc] → rtc RPC → LiveKit + Mongo
→ msg SendMsg → Kafka(toRedis) → msg_transfer → Kafka(toPush)
→ push → msg_gateway → WS 2001 (PushMessages)
客户端 ← LiveKit(media) + OpenIM(信令推送)
```

@ -153,10 +153,13 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbmsg.SendMsgR
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
if err := m.messageVerification(ctx, req); err != nil {
return nil, err
}
isSend := true
isNotification := msgprocessor.IsNotificationByMsg(req.MsgData)
log.ZInfo(ctx, "sendMsgSingleChat", "isNotification", isNotification, "msgdata", req.MsgData)
isSend := true
if !isNotification {
// 非通知类消息:执行发送权限校验 + 接收偏好校验(含 blacklist / MsgReceiveSetting / webhook / FriendVerify / globalOpt / convOpt
isSend, err = m.modifyMessageByUserMessageReceiveOpt(

@ -53,7 +53,6 @@ type MessageRevoked struct {
func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error {
switch data.MsgData.SessionType {
case constant.SingleChatType:
// 单聊发送权限校验已迁移至 modifyMessageByUserMessageReceiveOpt
return nil
case constant.ReadGroupChatType:
groupInfo, err := m.GroupLocalCache.GetGroupInfo(ctx, data.MsgData.GroupID)

@ -32,6 +32,7 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/protobuf/proto"
)
@ -76,11 +77,11 @@ func (s *rtcServer) SignalMessageAssemble(ctx context.Context, req *rtc.SignalMe
resp.Payload = &rtc.SignalResp_GetTokenByRoomID{GetTokenByRoomID: r}
respErr = err
default:
log.ZError(ctx, "SignalMessageAssemble", respErr, "r", respErr.Error())
// Fix P0: 原代码在此调用 respErr.Error(),而 respErr 为 nil会直接 panic
return nil, errs.ErrArgs.WrapMsg("unknown signal payload type")
}
if respErr != nil {
log.ZError(ctx, "SignalMessageAssemble", respErr, "r", respErr.Error())
log.ZError(ctx, "SignalMessageAssemble", respErr, "err", respErr.Error())
return nil, respErr
}
return &rtc.SignalMessageAssembleResp{SignalResp: &resp}, nil
@ -93,9 +94,8 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
log.ZError(ctx, "handleInvite", errs.ErrArgs, "r", "invitation is nil")
return nil, errs.ErrArgs.WrapMsg("invitation is nil")
}
if inv.RoomID == "" {
inv.RoomID = newRoomID()
}
// Fix P3: RoomID 统一由服务端生成,忽略客户端传入的值(客户端不应决定 RoomID
inv.RoomID = newRoomID()
inv.InviterUserID = req.UserID
inv.InitiateTime = time.Now().UnixMilli()
@ -118,19 +118,38 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
token, err := s.genToken(inv.RoomID, req.UserID)
if err != nil {
log.ZError(ctx, "handleInvite", err, "r", err.Error())
// LiveKit Room 已创建,需要回滚
if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil {
log.ZWarn(ctx, "handleInvite: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID)
}
return nil, err
}
// Fix P1/幂等: CreateInvitation 失败分两种情况:
// - 重复 key相同 roomID 重试)→ 认为幂等成功,直接返回
// - 其他错误 → 回滚 LiveKit Room返回错误
// Fix P0: 原代码对失败仅打 warn导致 DB 无记录、Room 泄漏、后续流程断裂
if err := s.db.CreateInvitation(ctx, invitationToModel(inv, req.OfflinePushInfo)); err != nil {
log.ZWarn(ctx, "CreateInvitation failed", err, "roomID", inv.RoomID)
if mongo.IsDuplicateKeyError(err) {
log.ZWarn(ctx, "handleInvite: duplicate invitation (idempotent retry)", err, "roomID", inv.RoomID)
} else {
if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil {
log.ZWarn(ctx, "handleInvite: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID)
}
return nil, errs.WrapMsg(err, "CreateInvitation failed", "roomID", inv.RoomID)
}
}
content := marshalSignalReq(signalReq)
content, err := marshalSignalReq(signalReq)
if err != nil {
return nil, err
}
// Fix P1: 1v1 场景下,通知失败应返回错误(被叫收不到来电意味着主叫白等)
for _, inviteeID := range inv.InviteeUserIDList {
log.ZInfo(ctx, "sendSignalingNotification to invitee", "sendID", req.UserID, "recvID", inviteeID)
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID)
log.ZError(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID)
return nil, errs.WrapMsg(err, "failed to notify invitee", "inviteeID", inviteeID)
}
}
@ -148,9 +167,8 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi
if inv == nil {
return nil, errs.ErrArgs.WrapMsg("invitation is nil")
}
if inv.RoomID == "" {
inv.RoomID = newRoomID()
}
// Fix P3: RoomID 统一由服务端生成
inv.RoomID = newRoomID()
inv.InviterUserID = req.UserID
inv.InitiateTime = time.Now().UnixMilli()
@ -160,14 +178,27 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi
token, err := s.genToken(inv.RoomID, req.UserID)
if err != nil {
if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil {
log.ZWarn(ctx, "handleInviteInGroup: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID)
}
return nil, err
}
// Fix P0: CreateInvitation 失败需要回滚 LiveKit Room
if err := s.db.CreateInvitation(ctx, invitationToModel(inv, req.OfflinePushInfo)); err != nil {
log.ZWarn(ctx, "CreateInvitation failed", err, "roomID", inv.RoomID)
if !mongo.IsDuplicateKeyError(err) {
if _, delErr := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); delErr != nil {
log.ZWarn(ctx, "handleInviteInGroup: rollback DeleteRoom failed", delErr, "roomID", inv.RoomID)
}
return nil, errs.WrapMsg(err, "CreateInvitation failed", "roomID", inv.RoomID)
}
log.ZWarn(ctx, "handleInviteInGroup: duplicate invitation (idempotent retry)", err, "roomID", inv.RoomID)
}
content := marshalSignalReq(signalReq)
content, err := marshalSignalReq(signalReq)
if err != nil {
return nil, err
}
for _, inviteeID := range inv.InviteeUserIDList {
allowed, err := s.isCallAllowed(ctx, req.UserID, inviteeID)
if err != nil {
@ -215,56 +246,90 @@ func (s *rtcServer) isCallAllowed(ctx context.Context, inviterID, inviteeID stri
}
// handleAccept processes a call acceptance.
// Fix P1(安全): 原代码完全信任客户端传入的 Invitation未从 DB 校验邀请真实存在。
// 攻击者可伪造任意 RoomID/InviterUserID 来获取 LiveKit Token 并加入房间。
func (s *rtcServer) handleAccept(ctx context.Context, req *rtc.SignalAcceptReq, signalReq *rtc.SignalReq) (*rtc.SignalAcceptResp, error) {
inv := req.Invitation
if inv == nil {
if req.Invitation == nil {
return nil, errs.ErrArgs.WrapMsg("invitation is nil")
}
token, err := s.genToken(inv.RoomID, req.UserID)
// 从 DB 获取权威邀请数据,验证邀请存在且 userID 在被邀请人列表中
dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID)
if err != nil {
return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID)
}
if !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) {
return nil, errs.ErrNoPermission.WrapMsg("user not in invitee list", "userID", req.UserID)
}
token, err := s.genToken(dbInv.RoomID, req.UserID)
if err != nil {
return nil, err
}
sessionType := int32(constant.SingleChatType)
if inv.GroupID != "" {
if dbInv.GroupID != "" {
sessionType = int32(constant.ReadGroupChatType)
}
content := marshalSignalReq(signalReq)
if err := s.sendSignalingNotification(ctx, req.UserID, inv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", inv.InviterUserID)
content, err := marshalSignalReq(signalReq)
if err != nil {
return nil, err
}
// 使用 DB 中的 InviterUserID防止客户端伪造通知目标
if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", dbInv.InviterUserID)
}
// Fix P2: 1v1 通话接受后删除邀请记录,避免冷启动时重复弹出已接通的来电
// TODO: 群通话可通过 RemoveInvitee 实现精细化状态管理
if dbInv.GroupID == "" {
if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil {
log.ZWarn(ctx, "handleAccept: DeleteInvitation failed (non-fatal)", err, "roomID", dbInv.RoomID)
}
}
return &rtc.SignalAcceptResp{
Token: token,
RoomID: inv.RoomID,
RoomID: dbInv.RoomID,
LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress,
}, nil
}
// handleReject processes a call rejection.
// Fix P1(安全): 从 DB 验证邀请存在,并使用 DB 中的 InviterUserID防止客户端伪造通知目标。
func (s *rtcServer) handleReject(ctx context.Context, req *rtc.SignalRejectReq, signalReq *rtc.SignalReq) (*rtc.SignalRejectResp, error) {
inv := req.Invitation
if inv == nil {
if req.Invitation == nil {
return nil, errs.ErrArgs.WrapMsg("invitation is nil")
}
dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID)
if err != nil {
return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID)
}
if !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) {
return nil, errs.ErrNoPermission.WrapMsg("user not in invitee list", "userID", req.UserID)
}
sessionType := int32(constant.SingleChatType)
if inv.GroupID != "" {
if dbInv.GroupID != "" {
sessionType = int32(constant.ReadGroupChatType)
}
content := marshalSignalReq(signalReq)
if err := s.sendSignalingNotification(ctx, req.UserID, inv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification reject to inviter failed", err, "inviterID", inv.InviterUserID)
content, err := marshalSignalReq(signalReq)
if err != nil {
return nil, err
}
if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification reject to inviter failed", err, "inviterID", dbInv.InviterUserID)
}
if inv.GroupID != "" {
if err := s.db.RemoveInvitee(ctx, inv.RoomID, req.UserID); err != nil {
log.ZWarn(ctx, "RemoveInvitee failed", err, "roomID", inv.RoomID, "userID", req.UserID)
if dbInv.GroupID != "" {
if err := s.db.RemoveInvitee(ctx, dbInv.RoomID, req.UserID); err != nil {
log.ZWarn(ctx, "RemoveInvitee failed", err, "roomID", dbInv.RoomID, "userID", req.UserID)
}
} else {
if err := s.db.DeleteInvitation(ctx, inv.RoomID); err != nil {
log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", inv.RoomID)
if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil {
log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", dbInv.RoomID)
}
}
@ -272,62 +337,94 @@ func (s *rtcServer) handleReject(ctx context.Context, req *rtc.SignalRejectReq,
}
// handleCancel processes a call cancellation.
// Fix P1(安全): 从 DB 验证操作者是邀请发起方,防止被叫方冒充取消通话。
func (s *rtcServer) handleCancel(ctx context.Context, req *rtc.SignalCancelReq, signalReq *rtc.SignalReq) (*rtc.SignalCancelResp, error) {
inv := req.Invitation
if inv == nil {
if req.Invitation == nil {
return nil, errs.ErrArgs.WrapMsg("invitation is nil")
}
dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID)
if err != nil {
return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID)
}
if req.UserID != dbInv.InviterUserID {
return nil, errs.ErrNoPermission.WrapMsg("only the inviter can cancel", "userID", req.UserID, "inviterUserID", dbInv.InviterUserID)
}
sessionType := int32(constant.SingleChatType)
if inv.GroupID != "" {
if dbInv.GroupID != "" {
sessionType = int32(constant.ReadGroupChatType)
}
content := marshalSignalReq(signalReq)
for _, inviteeID := range inv.InviteeUserIDList {
content, err := marshalSignalReq(signalReq)
if err != nil {
return nil, err
}
for _, inviteeID := range dbInv.InviteeUserIDList {
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, sessionType, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification cancel to invitee failed", err, "inviteeID", inviteeID)
}
}
if err := s.db.DeleteInvitation(ctx, inv.RoomID); err != nil {
log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", inv.RoomID)
if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil {
log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", dbInv.RoomID)
}
return &rtc.SignalCancelResp{}, nil
}
// handleHungUp processes a call hang-up.
// Fix P1(安全): 从 DB 验证操作者是通话参与者,防止任意用户挂断他人通话并删除 LiveKit Room。
func (s *rtcServer) handleHungUp(ctx context.Context, req *rtc.SignalHungUpReq, signalReq *rtc.SignalReq) (*rtc.SignalHungUpResp, error) {
inv := req.Invitation
if inv == nil {
if req.Invitation == nil {
return nil, errs.ErrArgs.WrapMsg("invitation is nil")
}
dbInv, err := s.db.GetInvitationByRoomID(ctx, req.Invitation.RoomID)
if err != nil {
return nil, errs.WrapMsg(err, "invitation not found or expired", "roomID", req.Invitation.RoomID)
}
if req.UserID != dbInv.InviterUserID && !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) {
return nil, errs.ErrNoPermission.WrapMsg("user is not a participant of this call", "userID", req.UserID)
}
sessionType := int32(constant.SingleChatType)
if inv.GroupID != "" {
if dbInv.GroupID != "" {
sessionType = int32(constant.ReadGroupChatType)
}
content := marshalSignalReq(signalReq)
for _, peerID := range hungUpPeerIDs(inv, req.UserID) {
content, err := marshalSignalReq(signalReq)
if err != nil {
return nil, err
}
// 使用 DB 中的参与者列表,不信任客户端传入的 InviteeUserIDList
for _, peerID := range hungUpPeerIDsFromDB(dbInv, req.UserID) {
if err := s.sendSignalingNotification(ctx, req.UserID, peerID, sessionType, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification hungUp to peer failed", err, "peerID", peerID)
}
}
// Terminate the LiveKit room
if _, err := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: inv.RoomID}); err != nil {
log.ZWarn(ctx, "LiveKit DeleteRoom failed", err, "roomID", inv.RoomID)
if _, err := s.roomClient.DeleteRoom(ctx, &livekit.DeleteRoomRequest{Room: dbInv.RoomID}); err != nil {
log.ZWarn(ctx, "LiveKit DeleteRoom failed", err, "roomID", dbInv.RoomID)
}
if err := s.db.DeleteInvitation(ctx, inv.RoomID); err != nil {
log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", inv.RoomID)
if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil {
log.ZWarn(ctx, "DeleteInvitation failed", err, "roomID", dbInv.RoomID)
}
return &rtc.SignalHungUpResp{}, nil
}
// handleGetTokenByRoomID returns a LiveKit token for an existing room.
// Fix P0(安全): 原代码无权限校验,任意已登录用户可获取任意房间的 Token可窃听他人通话。
func (s *rtcServer) handleGetTokenByRoomID(ctx context.Context, req *rtc.SignalGetTokenByRoomIDReq) (*rtc.SignalGetTokenByRoomIDResp, error) {
dbInv, err := s.db.GetInvitationByRoomID(ctx, req.RoomID)
if err != nil {
return nil, errs.WrapMsg(err, "room not found or expired", "roomID", req.RoomID)
}
if req.UserID != dbInv.InviterUserID && !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) {
return nil, errs.ErrNoPermission.WrapMsg("user is not a participant of this room", "userID", req.UserID)
}
token, err := s.genToken(req.RoomID, req.UserID)
if err != nil {
return nil, err
@ -351,7 +448,16 @@ func (s *rtcServer) SignalGetRoomByGroupID(ctx context.Context, req *rtc.SignalG
}
// SignalGetTokenByRoomID returns a token for joining a room directly (HTTP API path).
// Fix P0(安全): 同 handleGetTokenByRoomID添加参与者身份校验。
func (s *rtcServer) SignalGetTokenByRoomID(ctx context.Context, req *rtc.SignalGetTokenByRoomIDReq) (*rtc.SignalGetTokenByRoomIDResp, error) {
dbInv, err := s.db.GetInvitationByRoomID(ctx, req.RoomID)
if err != nil {
return nil, errs.WrapMsg(err, "room not found or expired", "roomID", req.RoomID)
}
if req.UserID != dbInv.InviterUserID && !datautil.Contain(req.UserID, dbInv.InviteeUserIDList...) {
return nil, errs.ErrNoPermission.WrapMsg("user is not a participant of this room", "userID", req.UserID)
}
token, err := s.genToken(req.RoomID, req.UserID)
if err != nil {
return nil, err
@ -421,10 +527,14 @@ func (s *rtcServer) SignalSendCustomSignal(ctx context.Context, req *rtc.SignalS
return &rtc.SignalSendCustomSignalResp{}, nil
}
opUserID := mcontext.GetOpUserID(ctx)
content, _ := json.Marshal(map[string]any{
// Fix P3: 处理 json.Marshal 错误
content, err := json.Marshal(map[string]any{
"roomID": req.RoomID,
"customInfo": req.CustomInfo,
})
if err != nil {
return nil, errs.WrapMsg(err, "marshal custom signal content failed")
}
recipients := make([]string, 0, len(inv.InviteeUserIDList)+1)
recipients = append(recipients, inv.InviteeUserIDList...)
recipients = append(recipients, inv.InviterUserID)
@ -492,6 +602,29 @@ func (s *rtcServer) genToken(roomID, userID string) (string, error) {
return at.ToJWT()
}
// signalingMsgOptions 返回信令通知消息应设置的 Options。
//
// Fix P2+P2(安全): 原代码传 make(map[string]bool) 空 map导致
// 1. IsNotificationByMsg 将信令消息误判为普通聊天消息,触发黑名单/好友关系等权限拦截
// 2. IsHistory/IsPersistent 默认为 true信令消息被写入历史记录占用存储
// 3. IsUnreadCount/IsConversationUpdate 默认 true污染未读数和会话列表
//
// 信令消息应走 Notification 通道(对话 ID 前缀 "n_"),绕过聊天消息权限校验,
// 且不写历史、不计未读、不更新会话。离线推送根据 offlinePushInfo 控制,此处不强制关闭。
func signalingMsgOptions() map[string]bool {
opts := make(map[string]bool, 8)
// IsNotNotification=false 表示"这是通知消息",让 IsNotificationByMsg 返回 true
// 从而跳过 modifyMessageByUserMessageReceiveOpt 中的黑名单/好友关系等校验
datautil.SetSwitchFromOptions(opts, constant.IsNotNotification, false)
datautil.SetSwitchFromOptions(opts, constant.IsHistory, false)
datautil.SetSwitchFromOptions(opts, constant.IsPersistent, false)
datautil.SetSwitchFromOptions(opts, constant.IsUnreadCount, false)
datautil.SetSwitchFromOptions(opts, constant.IsConversationUpdate, false)
datautil.SetSwitchFromOptions(opts, constant.IsSenderConversationUpdate, false)
datautil.SetSwitchFromOptions(opts, constant.IsSenderSync, false)
return opts
}
// sendSignalingNotification sends a SignalingNotification message to a user via the msg service.
func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvID string, sessionType int32, offlinePush *sdkws.OfflinePushInfo, content []byte) error {
now := time.Now().UnixMilli()
@ -506,7 +639,7 @@ func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvI
SendTime: now,
ServerMsgID: uuid.New().String(),
ClientMsgID: uuid.New().String(),
Options: make(map[string]bool),
Options: signalingMsgOptions(),
}
if offlinePush != nil {
msgData.OfflinePushInfo = offlinePush
@ -536,15 +669,20 @@ func (s *rtcServer) sendCustomSignalNotification(ctx context.Context, sendID, re
SendTime: now,
ServerMsgID: uuid.New().String(),
ClientMsgID: uuid.New().String(),
Options: make(map[string]bool),
Options: signalingMsgOptions(),
}
_, err := s.msgClient.MsgClient.SendMsg(ctx, &pbmsg.SendMsgReq{MsgData: msgData})
return err
}
func marshalSignalReq(req *rtc.SignalReq) []byte {
b, _ := proto.Marshal(req)
return b
// marshalSignalReq serializes a SignalReq to bytes.
// Fix P2: 原代码使用 _ 吞掉错误,序列化失败时返回 nil导致被叫收到空 Content 消息,来电通知丢失。
func marshalSignalReq(req *rtc.SignalReq) ([]byte, error) {
b, err := proto.Marshal(req)
if err != nil {
return nil, errs.WrapMsg(err, "marshal SignalReq failed")
}
return b, nil
}
// newRoomID generates a unique room ID.
@ -554,6 +692,7 @@ func newRoomID() string {
// invitationToModel converts a proto InvitationInfo to the database model.
func invitationToModel(inv *rtc.InvitationInfo, push *sdkws.OfflinePushInfo) *model.SignalInvitation {
now := time.Now()
m := &model.SignalInvitation{
RoomID: inv.RoomID,
InviterUserID: inv.InviterUserID,
@ -566,7 +705,9 @@ func invitationToModel(inv *rtc.InvitationInfo, push *sdkws.OfflinePushInfo) *mo
SessionType: inv.SessionType,
InitiateTime: inv.InitiateTime,
BusyLineUserIDList: inv.BusyLineUserIDList,
CreateTime: time.Now().UnixMilli(),
CreateTime: now.UnixMilli(),
// Fix P1(TTL): 根据 Timeout 设置过期时间,配合 MongoDB TTL 索引自动清理
ExpireAt: now.Add(time.Duration(inv.Timeout+30) * time.Second),
}
if push != nil {
m.OfflinePushTitle = push.Title
@ -596,8 +737,9 @@ func modelToInvitationInfo(m *model.SignalInvitation) *rtc.InvitationInfo {
}
}
// hungUpPeerIDs returns the IDs that should receive hang-up notification.
func hungUpPeerIDs(inv *rtc.InvitationInfo, callerID string) []string {
// hungUpPeerIDsFromDB returns IDs that should receive hang-up notification, based on authoritative DB data.
// Fix P1(安全): 原 hungUpPeerIDs 使用客户端传入的 inv改为使用从 DB 获取的记录。
func hungUpPeerIDsFromDB(inv *model.SignalInvitation, callerID string) []string {
if callerID == inv.InviterUserID {
return inv.InviteeUserIDList
}

@ -39,6 +39,12 @@ func NewSignalMongo(db *mongo.Database) (database.SignalDatabase, error) {
{
Keys: bson.D{{Key: "create_time", Value: -1}},
},
// Fix P1(TTL): expire_at 字段为 BSON DateMongoDB 后台每 60s 扫描一次并自动删除过期文档。
// 覆盖场景:被叫网络断开、主叫 App 被杀、任何异常中断导致没有 Cancel/Reject/HungUp 的情况。
{
Keys: bson.D{{Key: "expire_at", Value: 1}},
Options: options.Index().SetExpireAfterSeconds(0),
},
})
if err != nil {
return nil, err

@ -14,6 +14,8 @@
package model
import "time"
// SignalInvitation stores an ongoing or pending signal invitation, keyed by roomID.
// It is created when a call is initiated and can be queried when the callee starts the app.
type SignalInvitation struct {
@ -32,6 +34,9 @@ type SignalInvitation struct {
OfflinePushDesc string `bson:"offline_push_desc"`
OfflinePushEx string `bson:"offline_push_ex"`
CreateTime int64 `bson:"create_time"`
// ExpireAt 是 MongoDB BSON Date 类型,供 TTL 索引自动清理过期邀请(无人响应/异常中断场景)。
// 值 = 创建时间 + Timeout + 30s 缓冲,由 invitationToModel 负责填充。
ExpireAt time.Time `bson:"expire_at"`
}
// SignalRecord stores a completed call record used for history queries.

Loading…
Cancel
Save