diff --git a/internal/rpc/rtc/signal.go b/internal/rpc/rtc/signal.go index a46aa3f27..be4b8ea50 100644 --- a/internal/rpc/rtc/signal.go +++ b/internal/rpc/rtc/signal.go @@ -108,6 +108,17 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, } } + // 检测哪些被叫用户正忙(已在通话中),记录到 BusyLineUserIDList + busyUserIDs, err := s.db.GetBusyUserIDs(ctx, inv.InviteeUserIDList) + if err != nil { + log.ZWarn(ctx, "handleInvite: GetBusyUserIDs failed (non-fatal)", err) + } + busySet := make(map[string]struct{}, len(busyUserIDs)) + for _, uid := range busyUserIDs { + busySet[uid] = struct{}{} + } + inv.BusyLineUserIDList = busyUserIDs + // 从主叫用户资料获取铃声 URL,注入到邀请信息中,被叫方收到后播放主叫方铃声 if inviterInfo, err := s.userClient.GetUserInfo(ctx, req.UserID); err == nil && inviterInfo.CallRingtoneURL != "" { inv.CallerRingtoneURL = inviterInfo.CallRingtoneURL @@ -151,6 +162,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, } for _, inviteeID := range inv.InviteeUserIDList { + if _, busy := busySet[inviteeID]; busy { + log.ZInfo(ctx, "handleInvite: skip busy invitee", "inviteeID", inviteeID) + continue + } log.ZInfo(ctx, "sendSignalingNotification to invitee", "sendID", req.UserID, "recvID", inviteeID) if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), req.OfflinePushInfo, content); err != nil { log.ZError(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID) @@ -160,10 +175,11 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, log.ZDebug(ctx, "handleInvite", "token", token, "roomID", inv.RoomID, "liveURL", s.config.RpcConfig.LiveKit.ExternalAddress) return &rtc.SignalInviteResp{ - Token: token, - RoomID: inv.RoomID, - LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress, - CalleeRingtoneURL: calleeRingtoneURL, + Token: token, + RoomID: inv.RoomID, + LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress, + BusyLineUserIDList: busyUserIDs, + CalleeRingtoneURL: calleeRingtoneURL, }, nil } @@ -178,6 +194,17 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi inv.InviterUserID = req.UserID inv.InitiateTime = time.Now().UnixMilli() + // 检测哪些被叫用户正忙(已在通话中),记录到 BusyLineUserIDList + busyUserIDs, err := s.db.GetBusyUserIDs(ctx, inv.InviteeUserIDList) + if err != nil { + log.ZWarn(ctx, "handleInviteInGroup: GetBusyUserIDs failed (non-fatal)", err) + } + busySet := make(map[string]struct{}, len(busyUserIDs)) + for _, uid := range busyUserIDs { + busySet[uid] = struct{}{} + } + inv.BusyLineUserIDList = busyUserIDs + // 从主叫用户资料获取铃声 URL,注入到邀请信息中,被叫方收到后播放主叫方铃声 if inviterInfo, err := s.userClient.GetUserInfo(ctx, req.UserID); err == nil && inviterInfo.CallRingtoneURL != "" { inv.CallerRingtoneURL = inviterInfo.CallRingtoneURL @@ -227,16 +254,21 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi log.ZInfo(ctx, "handleInviteInGroup: skipping invitee (call setting blocked)", "inviteeID", inviteeID) continue } + if _, busy := busySet[inviteeID]; busy { + log.ZInfo(ctx, "handleInviteInGroup: skip busy invitee", "inviteeID", inviteeID) + continue + } if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.ReadGroupChatType), req.OfflinePushInfo, content); err != nil { log.ZWarn(ctx, "sendSignalingNotification to group invitee failed", err, "inviteeID", inviteeID) } } return &rtc.SignalInviteInGroupResp{ - Token: token, - RoomID: inv.RoomID, - LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress, - CalleeRingtoneURL: calleeRingtoneURL, + Token: token, + RoomID: inv.RoomID, + LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress, + BusyLineUserIDList: busyUserIDs, + CalleeRingtoneURL: calleeRingtoneURL, }, nil } @@ -297,12 +329,12 @@ func (s *rtcServer) handleAccept(ctx context.Context, req *rtc.SignalAcceptReq, log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", dbInv.InviterUserID) } - // TODO: 群通话可通过 RemoveInvitee 实现精细化状态管理 - if dbInv.GroupID == "" { - if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil { - log.ZWarn(ctx, "handleAccept: DeleteInvitation failed (non-fatal)", err, "roomID", dbInv.RoomID) - } - } + // 接受邀请后不删除 invitation:通话仍在进行,双方应被标记为忙线(BusyLineUserIDList)。 + // invitation 的清理由以下路径负责: + // - 主动挂断:handleHungUp → DeleteInvitation + // - 主叫取消:handleCancel → DeleteInvitation + // - 被叫拒绝:handleReject → DeleteInvitation / RemoveInvitee + // - 异常中断:MongoDB TTL 索引(expire_at 字段)自动清理 return &rtc.SignalAcceptResp{ Token: token, diff --git a/pkg/common/storage/controller/rtc.go b/pkg/common/storage/controller/rtc.go index 75b5d83f4..8e0e46557 100644 --- a/pkg/common/storage/controller/rtc.go +++ b/pkg/common/storage/controller/rtc.go @@ -31,6 +31,8 @@ type RtcDatabase interface { RemoveInvitee(ctx context.Context, roomID string, userID string) error GetInvitationByGroupID(ctx context.Context, groupID string) (*model.SignalInvitation, error) GetInvitationsByRoomIDs(ctx context.Context, roomIDs []string) ([]*model.SignalInvitation, error) + // GetBusyUserIDs returns the subset of userIDs that are currently in an active call. + GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error) CreateRecord(ctx context.Context, record *model.SignalRecord) error SearchRecords(ctx context.Context, sendID, recvID string, sessionType int32, startTime, endTime int64, pagination pagination.Pagination) (int64, []*model.SignalRecord, error) @@ -73,6 +75,10 @@ func (r *rtcDatabase) GetInvitationsByRoomIDs(ctx context.Context, roomIDs []str return r.db.GetInvitationsByRoomIDs(ctx, roomIDs) } +func (r *rtcDatabase) GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error) { + return r.db.GetBusyUserIDs(ctx, userIDs) +} + func (r *rtcDatabase) CreateRecord(ctx context.Context, record *model.SignalRecord) error { return r.db.CreateRecord(ctx, record) } diff --git a/pkg/common/storage/database/mgo/signal.go b/pkg/common/storage/database/mgo/signal.go index 4a84b2cc9..0eb7f8b0e 100644 --- a/pkg/common/storage/database/mgo/signal.go +++ b/pkg/common/storage/database/mgo/signal.go @@ -117,6 +117,44 @@ func (s *signalMgo) GetInvitationsByRoomIDs(ctx context.Context, roomIDs []strin return mongoutil.Find[*model.SignalInvitation](ctx, s.invColl, bson.M{"room_id": bson.M{"$in": roomIDs}}) } +func (s *signalMgo) GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error) { + if len(userIDs) == 0 { + return nil, nil + } + filter := bson.M{ + "$or": bson.A{ + bson.M{"inviter_user_id": bson.M{"$in": userIDs}}, + bson.M{"invitee_user_id_list": bson.M{"$in": userIDs}}, + }, + } + invitations, err := mongoutil.Find[*model.SignalInvitation](ctx, s.invColl, filter, + options.Find().SetProjection(bson.M{"inviter_user_id": 1, "invitee_user_id_list": 1}), + ) + if err != nil { + return nil, err + } + requested := make(map[string]struct{}, len(userIDs)) + for _, uid := range userIDs { + requested[uid] = struct{}{} + } + busySet := make(map[string]struct{}) + for _, inv := range invitations { + if _, ok := requested[inv.InviterUserID]; ok { + busySet[inv.InviterUserID] = struct{}{} + } + for _, uid := range inv.InviteeUserIDList { + if _, ok := requested[uid]; ok { + busySet[uid] = struct{}{} + } + } + } + busy := make([]string, 0, len(busySet)) + for uid := range busySet { + busy = append(busy, uid) + } + return busy, nil +} + func (s *signalMgo) CreateRecord(ctx context.Context, record *model.SignalRecord) error { return mongoutil.InsertMany(ctx, s.recColl, []*model.SignalRecord{record}) } diff --git a/pkg/common/storage/database/signal.go b/pkg/common/storage/database/signal.go index 24b83dcbc..dbc24834b 100644 --- a/pkg/common/storage/database/signal.go +++ b/pkg/common/storage/database/signal.go @@ -38,6 +38,9 @@ type SignalDatabase interface { GetInvitationByGroupID(ctx context.Context, groupID string) (*model.SignalInvitation, error) // GetInvitationsByRoomIDs retrieves invitations for the given room IDs. GetInvitationsByRoomIDs(ctx context.Context, roomIDs []string) ([]*model.SignalInvitation, error) + // GetBusyUserIDs returns the subset of userIDs that are currently involved in an active call + // (either as inviter or as invitee in a pending invitation). + GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error) // CreateRecord stores a completed call record. CreateRecord(ctx context.Context, record *model.SignalRecord) error