diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 55da122d2..39edb9024 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -36,6 +36,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/discovery" @@ -874,38 +875,40 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco if g.UserID == "" || g.ConversationID == "" || g.MaxSeq <= 0 { continue } - newMinSeq := g.MaxSeq + 1 + //newMinSeq := g.MaxSeq + 1 // 推进阅读方 min_seq。 - if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil { - log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err, - "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) - continue - } - if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID, - map[string]any{"min_seq": newMinSeq}); err != nil { - log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err, - "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) - continue - } + //if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil { + // log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err, + // "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) + // continue + //} + //if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID, + // map[string]any{"min_seq": newMinSeq}); err != nil { + // log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err, + // "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) + // continue + //} // 通知 g.UserID 客户端:会话变更 + 精确删除指定 seqs。 // 对端用户在 msg_burn_deadline 中有独立记录,cron 处理其分组时会自行通知, // 无需在此重复推进对端 min_seq 或发送额外通知。 - c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID}) - c.conversationNotificationSender.BurnMsgsDeleteNotification(ctx, g.UserID, g.UserID, g.ConversationID, g.Seqs) + //c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID}) - // 物理删除 msg 存储中的焚毁消息(best-effort,失败不中断流程)。 - if err := c.msgClient.DeleteMsgPhysicalBySeqs(ctx, g.ConversationID, g.Seqs); err != nil { - log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgPhysicalBySeqs failed", err, - "conversationID", g.ConversationID, "seqs", g.Seqs) + // 删除焚毁消息并同步通知阅读方客户端(best-effort,失败不中断流程)。 + if err := c.msgClient.DeleteMsgs(ctx, g.UserID, g.ConversationID, g.Seqs, &msg.DeleteSyncOpt{ + IsSyncSelf: true, + }); err != nil { + log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgs failed", err, + "userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs) } if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil { log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err, "userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs) } + log.ZDebug(ctx, "ClearBurnExpiredMsgs advanced min_seq", "userID", g.UserID, - "conversationID", g.ConversationID, "minSeq", newMinSeq, "seqs", g.Seqs) + "conversationID", g.ConversationID, "seqs", g.Seqs) processed++ } return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go index 0bff800d3..b5c6ef4b6 100644 --- a/pkg/rpcli/msg.go +++ b/pkg/rpcli/msg.go @@ -114,6 +114,20 @@ func (x *MsgClient) SetUserConversationsMinSeq(ctx context.Context, conversation return ignoreResp(x.MsgClient.SetUserConversationsMinSeq(ctx, req)) } +// DeleteMsgs 按 seq 删除消息,行为与 msg RPC DeleteMsgs 一致。 +func (x *MsgClient) DeleteMsgs(ctx context.Context, userID, conversationID string, seqs []int64, deleteSyncOpt *msg.DeleteSyncOpt) error { + if len(seqs) == 0 { + return nil + } + req := &msg.DeleteMsgsReq{ + ConversationID: conversationID, + UserID: userID, + Seqs: seqs, + DeleteSyncOpt: deleteSyncOpt, + } + return ignoreResp(x.MsgClient.DeleteMsgs(ctx, req)) +} + // DeleteMsgPhysicalBySeqs 按 seq 物理删除会话内的消息(无鉴权)。 // 用于阅后即焚、系统级消息清理等场景。 func (x *MsgClient) DeleteMsgPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error { diff --git a/protocol b/protocol index 9afba4648..1d47a639e 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit 9afba46486484563098e1e77b46cc94e0d85c9dd +Subproject commit 1d47a639ede5965901dfe0966b8369444c865e24