diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index d5345f6ec..b65f77812 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -23,6 +23,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -47,6 +48,7 @@ import ( type conversationServer struct { pbconversation.UnimplementedConversationServer conversationDatabase controller.ConversationDatabase + msgBurnDeadlineDB database.MsgBurnDeadline conversationNotificationSender *ConversationNotificationSender config *Config @@ -79,6 +81,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + msgBurnDeadlineDB, err := mgo.NewMsgBurnDeadlineMongo(mgocli.GetDB()) + if err != nil { + return err + } userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err @@ -97,9 +103,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient), conversationDatabase: controller.NewConversationDatabase(conversationDB, redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()), - userClient: rpcli.NewUserClient(userConn), - groupClient: rpcli.NewGroupClient(groupConn), - msgClient: msgClient, + msgBurnDeadlineDB: msgBurnDeadlineDB, + userClient: rpcli.NewUserClient(userConn), + groupClient: rpcli.NewGroupClient(groupConn), + msgClient: msgClient, }) return nil } @@ -823,3 +830,51 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) return nil } + +// ClearBurnExpiredMsgs 处理「阅后即焚」过期消息: +// 1. 从 msg_burn_deadline 中拉取一批过期分组(按 user/conversation 聚合,含每组最大 seq)。 +// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1。 +// 3. 同步更新 conversation 文档的 min_seq 字段并下发会话变更通知。 +// 4. 删除已处理的 deadline 记录。 +// +// 单次最多处理 req.Limit 个分组;若返回的 count == limit,cron 可继续触发。 +func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearBurnExpiredMsgsReq) (*pbconversation.ClearBurnExpiredMsgsResp, error) { + if c.msgBurnDeadlineDB == nil { + return &pbconversation.ClearBurnExpiredMsgsResp{Count: 0}, nil + } + limit := int(req.Limit) + if limit <= 0 { + limit = 100 + } + groups, err := c.msgBurnDeadlineDB.FindExpiredGroups(ctx, req.Timestamp, limit) + if err != nil { + return nil, err + } + var processed int32 + for _, g := range groups { + if g.UserID == "" || g.ConversationID == "" || g.MaxSeq <= 0 { + continue + } + newMinSeq := g.MaxSeq + 1 + if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil { + log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err, + "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) + continue + } + if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID, + map[string]any{"min_seq": newMinSeq}); err != nil { + log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err, + "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) + continue + } + c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID}) + if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil { + log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err, + "userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs) + } + log.ZDebug(ctx, "ClearBurnExpiredMsgs advanced min_seq", "userID", g.UserID, + "conversationID", g.ConversationID, "minSeq", newMinSeq, "seqs", g.Seqs) + processed++ + } + return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil +} diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index de1879438..ff3014d95 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -17,9 +17,12 @@ package msg import ( "context" "errors" + "time" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" @@ -126,6 +129,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR ContentType: conversation.ConversationType, } m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback) + m.recordBurnDeadlines(ctx, conversation, req.UserID, req.Seqs) m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq) return &msg.MarkMsgsAsReadResp{}, nil @@ -166,6 +170,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon } hasReadSeq = req.HasReadSeq } + m.recordBurnDeadlines(ctx, conversation, req.UserID, seqs) m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq) } else if conversation.ConversationType == constant.ReadGroupChatType || @@ -201,6 +206,52 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon return &msg.MarkConversationAsReadResp{}, nil } +// recordBurnDeadlines 在「单聊」场景下,根据对端的 MsgBurnDuration 为本次已读的每条消息 +// 记录一份「阅后即焚」截止时间到 mongo。后续由 cron 推进 min_seq 让消息从该用户视图消失。 +// +// 设计要点: +// 1. 仅单聊:群聊有自己的不同语义(多接收方 + 仅 watermark 已读),暂不在此实现。 +// 2. 仅当对端 MsgBurnDuration > 0 时才记录;为 0 表示对端未开启该功能。 +// 3. 同一 (UserID, ConversationID, Seq) 已存在则不覆盖:保证以「首次阅读时刻」为基准, +// 避免多端重复 MarkAsRead 导致 deadline 被往后推。 +// 4. 失败仅记录日志,不影响主流程的已读语义。 +func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.Conversation, readerUserID string, seqs []int64) { + if len(seqs) == 0 { + return + } + if conv.ConversationType != constant.SingleChatType { + return + } + peerID := m.conversationAndGetRecvID(conv, readerUserID) + if peerID == "" || peerID == readerUserID { + return + } + peerInfo, err := m.UserLocalCache.GetUserInfo(ctx, peerID) + if err != nil { + log.ZWarn(ctx, "recordBurnDeadlines GetUserInfo failed", err, "peerID", peerID) + return + } + if peerInfo == nil || peerInfo.MsgBurnDuration <= 0 { + return + } + now := time.Now().UnixMilli() + deadline := now + int64(peerInfo.MsgBurnDuration)*1000 + items := make([]*model.MsgBurnDeadline, 0, len(seqs)) + for _, seq := range seqs { + items = append(items, &model.MsgBurnDeadline{ + UserID: readerUserID, + ConversationID: conv.ConversationID, + Seq: seq, + DeadlineMs: deadline, + CreateTime: now, + }) + } + if err := m.msgBurnDeadlineDB.UpsertIfAbsent(ctx, items); err != nil { + log.ZError(ctx, "recordBurnDeadlines UpsertIfAbsent failed", err, + "userID", readerUserID, "conversationID", conv.ConversationID, "seqs", seqs) + } +} + func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) { tips := &sdkws.MarkAsReadTips{ MarkAsReadUserID: sendID, diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index de00217ea..3408a32b6 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -72,6 +72,7 @@ type msgServer struct { conversationClient *rpcli.ConversationClient spamReportDB database.SpamReport globalBlackDB controller.UserGlobalBlackDatabase + msgBurnDeadlineDB database.MsgBurnDeadline } func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { @@ -132,6 +133,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + msgBurnDeadlineDB, err := mgo.NewMsgBurnDeadlineMongo(mgocli.GetDB()) + if err != nil { + return err + } s := &msgServer{ MsgDatabase: msgDatabase, RegisterCenter: client, @@ -144,6 +149,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg conversationClient: conversationClient, spamReportDB: spamReportDB, globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo), + msgBurnDeadlineDB: msgBurnDeadlineDB, } s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg)) diff --git a/internal/tools/burn_msg.go b/internal/tools/burn_msg.go new file mode 100644 index 000000000..597acccda --- /dev/null +++ b/internal/tools/burn_msg.go @@ -0,0 +1,55 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tools + +import ( + "fmt" + "os" + "time" + + pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" +) + +// clearBurnExpiredMsgs 阅后即焚 cron 入口:循环调用 conversation 服务的 +// ClearBurnExpiredMsgs,每次至多处理 burnLimit 个 (user, conversation) 分组, +// 直至本轮没有新的过期分组或达到防御性的最大循环次数。 +func (c *cronServer) clearBurnExpiredMsgs() { + now := time.Now() + operationID := fmt.Sprintf("cron_burn_msg_%d_%d", os.Getpid(), now.UnixMilli()) + ctx := mcontext.SetOperationID(c.ctx, operationID) + log.ZDebug(ctx, "clear burn expired msgs cron start") + const ( + maxLoop = 10000 + burnLimit = 100 + ) + var count int + for i := 1; i <= maxLoop; i++ { + resp, err := c.conversationClient.ClearBurnExpiredMsgs(ctx, &pbconversation.ClearBurnExpiredMsgsReq{ + Timestamp: now.UnixMilli(), + Limit: burnLimit, + }) + if err != nil { + log.ZError(ctx, "ClearBurnExpiredMsgs failed.", err) + return + } + count += int(resp.Count) + if resp.Count < burnLimit { + break + } + } + log.ZDebug(ctx, "clear burn expired msgs cron completed", "cost", time.Since(now), "count", count) +} diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 689244dd1..1e28ffafc 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -84,6 +84,9 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err := srv.registerClearUserMsg(); err != nil { return err } + if err := srv.registerClearBurnExpiredMsgs(); err != nil { + return err + } log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) srv.cron.Start() <-ctx.Done() @@ -121,3 +124,8 @@ func (c *cronServer) registerClearUserMsg() error { _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg) return errs.WrapMsg(err, "failed to register clear user msg cron task") } + +func (c *cronServer) registerClearBurnExpiredMsgs() error { + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs) + return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task") +} diff --git a/pkg/common/storage/database/mgo/msg_burn_deadline.go b/pkg/common/storage/database/mgo/msg_burn_deadline.go new file mode 100644 index 000000000..cc9b6832f --- /dev/null +++ b/pkg/common/storage/database/mgo/msg_burn_deadline.go @@ -0,0 +1,134 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mgo + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/errs" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewMsgBurnDeadlineMongo(db *mongo.Database) (database.MsgBurnDeadline, error) { + coll := db.Collection(database.MsgBurnDeadlineName) + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "user_id", Value: 1}, + {Key: "conversation_id", Value: 1}, + {Key: "seq", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{{Key: "deadline_ms", Value: 1}}, + }, + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &msgBurnDeadlineMgo{coll: coll}, nil +} + +type msgBurnDeadlineMgo struct { + coll *mongo.Collection +} + +func (m *msgBurnDeadlineMgo) UpsertIfAbsent(ctx context.Context, items []*model.MsgBurnDeadline) error { + if len(items) == 0 { + return nil + } + models := make([]mongo.WriteModel, 0, len(items)) + for _, item := range items { + filter := bson.M{ + "user_id": item.UserID, + "conversation_id": item.ConversationID, + "seq": item.Seq, + } + setOnInsert := bson.M{ + "user_id": item.UserID, + "conversation_id": item.ConversationID, + "seq": item.Seq, + "deadline_ms": item.DeadlineMs, + "create_time": item.CreateTime, + } + models = append(models, + mongo.NewUpdateOneModel(). + SetFilter(filter). + SetUpdate(bson.M{"$setOnInsert": setOnInsert}). + SetUpsert(true), + ) + } + _, err := m.coll.BulkWrite(ctx, models, options.BulkWrite().SetOrdered(false)) + return errs.Wrap(err) +} + +func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, limit int) ([]*database.ExpiredBurnGroup, error) { + if limit <= 0 { + return nil, nil + } + pipeline := mongo.Pipeline{ + bson.D{{Key: "$match", Value: bson.M{"deadline_ms": bson.M{"$lte": nowMs}}}}, + bson.D{{Key: "$group", Value: bson.M{ + "_id": bson.M{ + "user_id": "$user_id", + "conversation_id": "$conversation_id", + }, + "max_seq": bson.M{"$max": "$seq"}, + "seqs": bson.M{"$push": "$seq"}, + }}}, + bson.D{{Key: "$limit", Value: int64(limit)}}, + } + type aggRow struct { + ID struct { + UserID string `bson:"user_id"` + ConversationID string `bson:"conversation_id"` + } `bson:"_id"` + MaxSeq int64 `bson:"max_seq"` + Seqs []int64 `bson:"seqs"` + } + rows, err := mongoutil.Aggregate[*aggRow](ctx, m.coll, pipeline) + if err != nil { + return nil, err + } + res := make([]*database.ExpiredBurnGroup, 0, len(rows)) + for _, r := range rows { + res = append(res, &database.ExpiredBurnGroup{ + UserID: r.ID.UserID, + ConversationID: r.ID.ConversationID, + MaxSeq: r.MaxSeq, + Seqs: r.Seqs, + }) + } + return res, nil +} + +func (m *msgBurnDeadlineMgo) DeleteByUserConversationSeqs(ctx context.Context, userID, conversationID string, seqs []int64) error { + if len(seqs) == 0 { + return nil + } + filter := bson.M{ + "user_id": userID, + "conversation_id": conversationID, + "seq": bson.M{"$in": seqs}, + } + _, err := m.coll.DeleteMany(ctx, filter) + return errs.Wrap(err) +} diff --git a/pkg/common/storage/database/msg_burn_deadline.go b/pkg/common/storage/database/msg_burn_deadline.go new file mode 100644 index 000000000..f5d7676ea --- /dev/null +++ b/pkg/common/storage/database/msg_burn_deadline.go @@ -0,0 +1,48 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package database + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +// ExpiredBurnGroup 表示某个 (UserID, ConversationID) 上需要被推进 min_seq 的 seq 集合。 +type ExpiredBurnGroup struct { + UserID string + ConversationID string + // MaxSeq 当前批次中最大的过期 seq;推进 min_seq 时使用 MaxSeq + 1。 + MaxSeq int64 + // Seqs 当前批次实际涉及的所有过期 seq,便于精确删除已处理的 deadline 记录。 + Seqs []int64 +} + +// MsgBurnDeadline 持久化每条消息对每个用户的「阅后即焚截止时间」。 +// 写入位置:msg 服务 MarkMsgsAsRead / MarkConversationAsRead(单聊)。 +// 消费位置:conversation 服务 ClearBurnExpiredMsgs cron 入口。 +type MsgBurnDeadline interface { + // UpsertIfAbsent 仅在 (UserID, ConversationID, Seq) 不存在时插入; + // 已存在则不覆盖,保证「首次阅读时刻」决定 deadline。 + UpsertIfAbsent(ctx context.Context, items []*model.MsgBurnDeadline) error + + // FindExpiredGroups 查询 deadline_ms <= nowMs 的记录,按 (UserID, ConversationID) + // 聚合并返回每组的最大 seq 与所涉及的 seq 列表。limit 限制返回的 group 数量。 + FindExpiredGroups(ctx context.Context, nowMs int64, limit int) ([]*ExpiredBurnGroup, error) + + // DeleteByUserConversationSeqs 删除某 (UserID, ConversationID) 下指定 seq 列表的 deadline 记录。 + // 一般在成功推进 min_seq 后调用。 + DeleteByUserConversationSeqs(ctx context.Context, userID, conversationID string, seqs []int64) error +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 0fd9b3b2e..08f8aa6c3 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -23,4 +23,5 @@ const ( SignalInvitationName = "signal_invitation" SignalRecordName = "signal_record" SpamReportName = "spam_report" + MsgBurnDeadlineName = "msg_burn_deadline" ) diff --git a/pkg/common/storage/model/msg_burn_deadline.go b/pkg/common/storage/model/msg_burn_deadline.go new file mode 100644 index 000000000..ef1055662 --- /dev/null +++ b/pkg/common/storage/model/msg_burn_deadline.go @@ -0,0 +1,31 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +// MsgBurnDeadline 单条消息的「阅后即焚截止时间」记录。 +// 在 MarkMsgsAsRead / MarkConversationAsRead(单聊场景)时按 +// (UserID, ConversationID, Seq) 维度写入。读取时间锁定后续不再覆盖。 +// +// 当当前时间 > DeadlineMs 时,cron 会把该用户在该会话上的 min_seq +// 推进到 max(已过期 seq) + 1,从而让这些消息从对该用户的拉取结果中消失。 +type MsgBurnDeadline struct { + UserID string `bson:"user_id"` + ConversationID string `bson:"conversation_id"` + Seq int64 `bson:"seq"` + // DeadlineMs 截止时间戳(毫秒);超过即可被 cron 收走推进 min_seq。 + DeadlineMs int64 `bson:"deadline_ms"` + // CreateTime 写入时刻(毫秒);用于排查/审计。 + CreateTime int64 `bson:"create_time"` +}