From 66368cdd2777b4f18a944b55bd9b802c9d64c708 Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Thu, 14 May 2026 12:13:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=85=E5=90=8E=E5=8D=B3?= =?UTF-8?q?=E7=84=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/rpc/conversation/conversation.go | 22 ++++++++++++++++--- internal/rpc/conversation/notification.go | 16 ++++++++++++++ internal/rpc/msg/as_read.go | 2 ++ .../storage/database/mgo/msg_burn_deadline.go | 4 ++++ .../storage/database/msg_burn_deadline.go | 2 ++ pkg/common/storage/model/msg_burn_deadline.go | 3 +++ pkg/rpcli/msg.go | 10 +++++++++ 7 files changed, 56 insertions(+), 3 deletions(-) diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index b65f77812..1b86e6b8b 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -833,9 +833,11 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c // ClearBurnExpiredMsgs 处理「阅后即焚」过期消息: // 1. 从 msg_burn_deadline 中拉取一批过期分组(按 user/conversation 聚合,含每组最大 seq)。 -// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1。 -// 3. 同步更新 conversation 文档的 min_seq 字段并下发会话变更通知。 -// 4. 删除已处理的 deadline 记录。 +// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1,更新 conversation 文档。 +// 3. 向阅读方发 ConversationChangeNotification + DeleteMsgsNotification(含精确 seqs)。 +// 4. 单聊场景(si_ 前缀):利用 deadline 记录中的 PeerID 直接推进对端 min_seq, +// 并向对端也发两条通知,保证双方客户端都删本地消息。 +// 5. 物理删除 msg 存储中的焚毁消息;清理 deadline 记录。 // // 单次最多处理 req.Limit 个分组;若返回的 count == limit,cron 可继续触发。 func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearBurnExpiredMsgsReq) (*pbconversation.ClearBurnExpiredMsgsResp, error) { @@ -856,6 +858,8 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco continue } newMinSeq := g.MaxSeq + 1 + + // 推进阅读方 min_seq。 if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil { log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err, "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) @@ -867,7 +871,18 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) continue } + // 通知 g.UserID 客户端:会话变更 + 精确删除指定 seqs。 + // 对端用户在 msg_burn_deadline 中有独立记录,cron 处理其分组时会自行通知, + // 无需在此重复推进对端 min_seq 或发送额外通知。 c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID}) + c.conversationNotificationSender.BurnMsgsDeleteNotification(ctx, g.UserID, g.UserID, g.ConversationID, g.Seqs) + + // 物理删除 msg 存储中的焚毁消息(best-effort,失败不中断流程)。 + if err := c.msgClient.DeleteMsgPhysicalBySeqs(ctx, g.ConversationID, g.Seqs); err != nil { + log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgPhysicalBySeqs failed", err, + "conversationID", g.ConversationID, "seqs", g.Seqs) + } + if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil { log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err, "userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs) @@ -878,3 +893,4 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco } return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil } + diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 370865c1a..84c91a267 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -73,3 +73,19 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } + +// BurnMsgsDeleteNotification 通知 recvUserID 按 seqs 删除本地消息。 +// sendUserID 为触发焚烧的一方(阅读方),recvUserID 为需要执行删除的一方。 +// 双方各调用一次,保证单聊两端客户端都删除本地缓存。 +func (c *ConversationNotificationSender) BurnMsgsDeleteNotification( + ctx context.Context, + sendUserID, recvUserID, conversationID string, + seqs []int64, +) { + tips := &sdkws.DeleteMsgsTips{ + UserID: sendUserID, + ConversationID: conversationID, + Seqs: seqs, + } + c.Notification(ctx, sendUserID, recvUserID, constant.DeleteMsgsNotification, tips) +} diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 095b1500d..d4c1531e4 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -245,6 +245,7 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation. UserID: readerUserID, ConversationID: conv.ConversationID, Seq: seq, + PeerID: peerID, DeadlineMs: deadline, CreateTime: now, }, @@ -252,6 +253,7 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation. UserID: peerID, ConversationID: conv.ConversationID, Seq: seq, + PeerID: readerUserID, DeadlineMs: deadline, CreateTime: now, }, diff --git a/pkg/common/storage/database/mgo/msg_burn_deadline.go b/pkg/common/storage/database/mgo/msg_burn_deadline.go index cc9b6832f..3ae08a3c3 100644 --- a/pkg/common/storage/database/mgo/msg_burn_deadline.go +++ b/pkg/common/storage/database/mgo/msg_burn_deadline.go @@ -66,6 +66,7 @@ func (m *msgBurnDeadlineMgo) UpsertIfAbsent(ctx context.Context, items []*model. "user_id": item.UserID, "conversation_id": item.ConversationID, "seq": item.Seq, + "peer_id": item.PeerID, "deadline_ms": item.DeadlineMs, "create_time": item.CreateTime, } @@ -91,6 +92,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, "user_id": "$user_id", "conversation_id": "$conversation_id", }, + "peer_id": bson.M{"$first": "$peer_id"}, "max_seq": bson.M{"$max": "$seq"}, "seqs": bson.M{"$push": "$seq"}, }}}, @@ -101,6 +103,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, UserID string `bson:"user_id"` ConversationID string `bson:"conversation_id"` } `bson:"_id"` + PeerID string `bson:"peer_id"` MaxSeq int64 `bson:"max_seq"` Seqs []int64 `bson:"seqs"` } @@ -113,6 +116,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, res = append(res, &database.ExpiredBurnGroup{ UserID: r.ID.UserID, ConversationID: r.ID.ConversationID, + PeerID: r.PeerID, MaxSeq: r.MaxSeq, Seqs: r.Seqs, }) diff --git a/pkg/common/storage/database/msg_burn_deadline.go b/pkg/common/storage/database/msg_burn_deadline.go index f5d7676ea..829d77b80 100644 --- a/pkg/common/storage/database/msg_burn_deadline.go +++ b/pkg/common/storage/database/msg_burn_deadline.go @@ -24,6 +24,8 @@ import ( type ExpiredBurnGroup struct { UserID string ConversationID string + // PeerID 单聊中的对端用户 ID,直接从 deadline 记录读取,无需额外查 conversation 表。 + PeerID string // MaxSeq 当前批次中最大的过期 seq;推进 min_seq 时使用 MaxSeq + 1。 MaxSeq int64 // Seqs 当前批次实际涉及的所有过期 seq,便于精确删除已处理的 deadline 记录。 diff --git a/pkg/common/storage/model/msg_burn_deadline.go b/pkg/common/storage/model/msg_burn_deadline.go index ef1055662..d44849f46 100644 --- a/pkg/common/storage/model/msg_burn_deadline.go +++ b/pkg/common/storage/model/msg_burn_deadline.go @@ -24,6 +24,9 @@ type MsgBurnDeadline struct { UserID string `bson:"user_id"` ConversationID string `bson:"conversation_id"` Seq int64 `bson:"seq"` + // PeerID 单聊中的对端用户 ID。 + // cron 处理时可直接获取对端,无需额外查询 conversation 表。 + PeerID string `bson:"peer_id"` // DeadlineMs 截止时间戳(毫秒);超过即可被 cron 收走推进 min_seq。 DeadlineMs int64 `bson:"deadline_ms"` // CreateTime 写入时刻(毫秒);用于排查/审计。 diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go index d439d0c12..0bff800d3 100644 --- a/pkg/rpcli/msg.go +++ b/pkg/rpcli/msg.go @@ -113,3 +113,13 @@ func (x *MsgClient) SetUserConversationsMinSeq(ctx context.Context, conversation req := &msg.SetUserConversationsMinSeqReq{ConversationID: conversationID, UserIDs: userIDs, Seq: seq} return ignoreResp(x.MsgClient.SetUserConversationsMinSeq(ctx, req)) } + +// DeleteMsgPhysicalBySeqs 按 seq 物理删除会话内的消息(无鉴权)。 +// 用于阅后即焚、系统级消息清理等场景。 +func (x *MsgClient) DeleteMsgPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error { + if len(seqs) == 0 { + return nil + } + req := &msg.DeleteMsgPhysicalBySeqReq{ConversationID: conversationID, Seqs: seqs} + return ignoreResp(x.MsgClient.DeleteMsgPhysicalBySeq(ctx, req)) +}