From 6f5575f583a47fb1ab0d07f9758241500b2c8630 Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Wed, 20 May 2026 17:54:49 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BE=A4=E9=98=85=E5=90=8E=E5=8D=B3=E7=84=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/rpc/msg/as_read.go | 20 +++++++++++++++---- .../storage/database/group_msg_burn_record.go | 6 +++--- .../database/mgo/group_msg_burn_record.go | 9 +++++++-- .../storage/model/group_msg_burn_record.go | 2 ++ 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index f8d074c2e..4f904a502 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -189,7 +189,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon for i := oldHasReadSeq + 1; i <= req.HasReadSeq; i++ { groupSeqs = append(groupSeqs, i) } - m.recordGroupBurnReadCount(ctx, conversation, groupSeqs) + m.recordGroupBurnReadCount(ctx, conversation, req.UserID, groupSeqs) } m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, seqs, hasReadSeq) @@ -276,9 +276,9 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation. } // recordGroupBurnReadCount 在群聊阅读时记录「阅后即焚」进度。 -// 每次已读触发 $inc read_count;首次写入时记录 member_count 与 burn_end_time。 +// 每次已读触发 $inc read_count;首次写入时记录 member_count、burn_end_time、send_id(发送者)。 // 仅在群的 MsgBurnDuration > 0 时生效;失败只记日志,不影响主流程。 -func (m *msgServer) recordGroupBurnReadCount(ctx context.Context, conv *conversation.Conversation, seqs []int64) { +func (m *msgServer) recordGroupBurnReadCount(ctx context.Context, conv *conversation.Conversation, readerUserID string, seqs []int64) { if len(seqs) == 0 || m.groupMsgBurnRecordDB == nil { return } @@ -290,10 +290,22 @@ func (m *msgServer) recordGroupBurnReadCount(ctx context.Context, conv *conversa if groupInfo.MsgBurnDuration <= 0 { return } + seqSenderID := make(map[int64]string, len(seqs)) + _, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, readerUserID, conv.ConversationID, seqs) + if err != nil { + log.ZWarn(ctx, "recordGroupBurnReadCount GetMsgBySeqs failed", err, + "groupID", conv.GroupID, "conversationID", conv.ConversationID, "readerUserID", readerUserID, "seqs", seqs) + } else { + for _, md := range msgs { + if md != nil && md.Seq > 0 { + seqSenderID[md.Seq] = md.SendID + } + } + } now := time.Now().UnixMilli() burnEndTimeMs := now + int64(groupInfo.MsgBurnDuration)*1000 memberCount := int32(groupInfo.MemberCount) - if err := m.groupMsgBurnRecordDB.UpsertOnRead(ctx, conv.GroupID, seqs, memberCount, burnEndTimeMs); err != nil { + if err := m.groupMsgBurnRecordDB.UpsertOnRead(ctx, conv.GroupID, seqs, seqSenderID, memberCount, burnEndTimeMs); err != nil { log.ZError(ctx, "recordGroupBurnReadCount UpsertOnRead failed", err, "groupID", conv.GroupID, "seqs", seqs) } diff --git a/pkg/common/storage/database/group_msg_burn_record.go b/pkg/common/storage/database/group_msg_burn_record.go index 3f0885db6..a8ac6f130 100644 --- a/pkg/common/storage/database/group_msg_burn_record.go +++ b/pkg/common/storage/database/group_msg_burn_record.go @@ -34,9 +34,9 @@ type ExpiredGroupBurn struct { // 消费:conversation 服务 ClearGroupBurnExpiredMsgs cron 入口。 type GroupMsgBurnRecord interface { // UpsertOnRead 批量原子更新阅读记录: - // - 若 (group_id, seq) 不存在:插入 {member_count, burn_end_time, create_time, read_count=1} - // - 若已存在:仅对 read_count 执行 $inc,不覆盖首次写入的 burn_end_time - UpsertOnRead(ctx context.Context, groupID string, seqs []int64, memberCount int32, burnEndTimeMs int64) error + // - 若 (group_id, seq) 不存在:插入 {member_count, burn_end_time, create_time, send_id, read_count=1};send_id 来自 seqSenderID[seq],可为空。 + // - 若已存在:仅对 read_count 执行 $inc,不覆盖首次写入的 burn_end_time、send_id + UpsertOnRead(ctx context.Context, groupID string, seqs []int64, seqSenderID map[int64]string, memberCount int32, burnEndTimeMs int64) error // FindExpired 查询满足以下条件的记录并按 group_id 聚合: // burn_end_time <= nowMs AND read_count >= member_count diff --git a/pkg/common/storage/database/mgo/group_msg_burn_record.go b/pkg/common/storage/database/mgo/group_msg_burn_record.go index 23a8d2e21..7c44f1d26 100644 --- a/pkg/common/storage/database/mgo/group_msg_burn_record.go +++ b/pkg/common/storage/database/mgo/group_msg_burn_record.go @@ -52,15 +52,19 @@ type groupMsgBurnRecordMgo struct { } // UpsertOnRead 对每条 seq 执行 upsert: -// - 首次插入($setOnInsert)写入 member_count、burn_end_time、create_time,read_count 初始化为 1。 +// - 首次插入($setOnInsert)写入 member_count、burn_end_time、create_time、send_id,read_count 初始化为 1。 // - 已存在时仅对 read_count 执行 $inc 1。 -func (m *groupMsgBurnRecordMgo) UpsertOnRead(ctx context.Context, groupID string, seqs []int64, memberCount int32, burnEndTimeMs int64) error { +func (m *groupMsgBurnRecordMgo) UpsertOnRead(ctx context.Context, groupID string, seqs []int64, seqSenderID map[int64]string, memberCount int32, burnEndTimeMs int64) error { if len(seqs) == 0 { return nil } now := time.Now().UnixMilli() models := make([]mongo.WriteModel, 0, len(seqs)) for _, seq := range seqs { + senderID := "" + if seqSenderID != nil { + senderID = seqSenderID[seq] + } filter := bson.M{ "group_id": groupID, "seq": seq, @@ -70,6 +74,7 @@ func (m *groupMsgBurnRecordMgo) UpsertOnRead(ctx context.Context, groupID string "$setOnInsert": bson.M{ "group_id": groupID, "seq": seq, + "send_id": senderID, "member_count": memberCount, "burn_end_time": burnEndTimeMs, "create_time": now, diff --git a/pkg/common/storage/model/group_msg_burn_record.go b/pkg/common/storage/model/group_msg_burn_record.go index 193d0e43c..9755473c4 100644 --- a/pkg/common/storage/model/group_msg_burn_record.go +++ b/pkg/common/storage/model/group_msg_burn_record.go @@ -26,6 +26,8 @@ type GroupMsgBurnRecord struct { GroupID string `bson:"group_id"` // Seq 消息序列号 Seq int64 `bson:"seq"` + // SendID 发送该条群消息的用户 ID(首次有成员已读时写入,$setOnInsert) + SendID string `bson:"send_id"` // ReadCount 已阅读该消息的成员数量(原子累加) ReadCount int32 `bson:"read_count"` // MemberCount 创建记录时的群成员总数;用于判断是否全员已读