阅后即焚,接受者隐藏消息

pull/3727/head
hawklin2017 3 weeks ago
parent f85a6a0bd1
commit b8c4be2aaa

@ -23,6 +23,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -47,6 +48,7 @@ import (
type conversationServer struct {
pbconversation.UnimplementedConversationServer
conversationDatabase controller.ConversationDatabase
msgBurnDeadlineDB database.MsgBurnDeadline
conversationNotificationSender *ConversationNotificationSender
config *Config
@ -79,6 +81,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
msgBurnDeadlineDB, err := mgo.NewMsgBurnDeadlineMongo(mgocli.GetDB())
if err != nil {
return err
}
userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User)
if err != nil {
return err
@ -97,9 +103,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()),
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
msgBurnDeadlineDB: msgBurnDeadlineDB,
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
})
return nil
}
@ -823,3 +830,51 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil
}
// ClearBurnExpiredMsgs 处理「阅后即焚」过期消息:
// 1. 从 msg_burn_deadline 中拉取一批过期分组(按 user/conversation 聚合,含每组最大 seq
// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1。
// 3. 同步更新 conversation 文档的 min_seq 字段并下发会话变更通知。
// 4. 删除已处理的 deadline 记录。
//
// 单次最多处理 req.Limit 个分组;若返回的 count == limitcron 可继续触发。
func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearBurnExpiredMsgsReq) (*pbconversation.ClearBurnExpiredMsgsResp, error) {
if c.msgBurnDeadlineDB == nil {
return &pbconversation.ClearBurnExpiredMsgsResp{Count: 0}, nil
}
limit := int(req.Limit)
if limit <= 0 {
limit = 100
}
groups, err := c.msgBurnDeadlineDB.FindExpiredGroups(ctx, req.Timestamp, limit)
if err != nil {
return nil, err
}
var processed int32
for _, g := range groups {
if g.UserID == "" || g.ConversationID == "" || g.MaxSeq <= 0 {
continue
}
newMinSeq := g.MaxSeq + 1
if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
continue
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID,
map[string]any{"min_seq": newMinSeq}); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
continue
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID})
if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs)
}
log.ZDebug(ctx, "ClearBurnExpiredMsgs advanced min_seq", "userID", g.UserID,
"conversationID", g.ConversationID, "minSeq", newMinSeq, "seqs", g.Seqs)
processed++
}
return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil
}

@ -17,9 +17,12 @@ package msg
import (
"context"
"errors"
"time"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
@ -126,6 +129,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
ContentType: conversation.ConversationType,
}
m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback)
m.recordBurnDeadlines(ctx, conversation, req.UserID, req.Seqs)
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq)
return &msg.MarkMsgsAsReadResp{}, nil
@ -166,6 +170,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
}
hasReadSeq = req.HasReadSeq
}
m.recordBurnDeadlines(ctx, conversation, req.UserID, seqs)
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq)
} else if conversation.ConversationType == constant.ReadGroupChatType ||
@ -201,6 +206,52 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
return &msg.MarkConversationAsReadResp{}, nil
}
// recordBurnDeadlines 在「单聊」场景下,根据对端的 MsgBurnDuration 为本次已读的每条消息
// 记录一份「阅后即焚」截止时间到 mongo。后续由 cron 推进 min_seq 让消息从该用户视图消失。
//
// 设计要点:
// 1. 仅单聊:群聊有自己的不同语义(多接收方 + 仅 watermark 已读),暂不在此实现。
// 2. 仅当对端 MsgBurnDuration > 0 时才记录;为 0 表示对端未开启该功能。
// 3. 同一 (UserID, ConversationID, Seq) 已存在则不覆盖:保证以「首次阅读时刻」为基准,
// 避免多端重复 MarkAsRead 导致 deadline 被往后推。
// 4. 失败仅记录日志,不影响主流程的已读语义。
func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.Conversation, readerUserID string, seqs []int64) {
if len(seqs) == 0 {
return
}
if conv.ConversationType != constant.SingleChatType {
return
}
peerID := m.conversationAndGetRecvID(conv, readerUserID)
if peerID == "" || peerID == readerUserID {
return
}
peerInfo, err := m.UserLocalCache.GetUserInfo(ctx, peerID)
if err != nil {
log.ZWarn(ctx, "recordBurnDeadlines GetUserInfo failed", err, "peerID", peerID)
return
}
if peerInfo == nil || peerInfo.MsgBurnDuration <= 0 {
return
}
now := time.Now().UnixMilli()
deadline := now + int64(peerInfo.MsgBurnDuration)*1000
items := make([]*model.MsgBurnDeadline, 0, len(seqs))
for _, seq := range seqs {
items = append(items, &model.MsgBurnDeadline{
UserID: readerUserID,
ConversationID: conv.ConversationID,
Seq: seq,
DeadlineMs: deadline,
CreateTime: now,
})
}
if err := m.msgBurnDeadlineDB.UpsertIfAbsent(ctx, items); err != nil {
log.ZError(ctx, "recordBurnDeadlines UpsertIfAbsent failed", err,
"userID", readerUserID, "conversationID", conv.ConversationID, "seqs", seqs)
}
}
func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) {
tips := &sdkws.MarkAsReadTips{
MarkAsReadUserID: sendID,

@ -72,6 +72,7 @@ type msgServer struct {
conversationClient *rpcli.ConversationClient
spamReportDB database.SpamReport
globalBlackDB controller.UserGlobalBlackDatabase
msgBurnDeadlineDB database.MsgBurnDeadline
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
@ -132,6 +133,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
msgBurnDeadlineDB, err := mgo.NewMsgBurnDeadlineMongo(mgocli.GetDB())
if err != nil {
return err
}
s := &msgServer{
MsgDatabase: msgDatabase,
RegisterCenter: client,
@ -144,6 +149,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
conversationClient: conversationClient,
spamReportDB: spamReportDB,
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
msgBurnDeadlineDB: msgBurnDeadlineDB,
}
s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg))

@ -0,0 +1,55 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"fmt"
"os"
"time"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
)
// clearBurnExpiredMsgs 阅后即焚 cron 入口:循环调用 conversation 服务的
// ClearBurnExpiredMsgs每次至多处理 burnLimit 个 (user, conversation) 分组,
// 直至本轮没有新的过期分组或达到防御性的最大循环次数。
func (c *cronServer) clearBurnExpiredMsgs() {
now := time.Now()
operationID := fmt.Sprintf("cron_burn_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear burn expired msgs cron start")
const (
maxLoop = 10000
burnLimit = 100
)
var count int
for i := 1; i <= maxLoop; i++ {
resp, err := c.conversationClient.ClearBurnExpiredMsgs(ctx, &pbconversation.ClearBurnExpiredMsgsReq{
Timestamp: now.UnixMilli(),
Limit: burnLimit,
})
if err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs failed.", err)
return
}
count += int(resp.Count)
if resp.Count < burnLimit {
break
}
}
log.ZDebug(ctx, "clear burn expired msgs cron completed", "cost", time.Since(now), "count", count)
}

@ -84,6 +84,9 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerClearUserMsg(); err != nil {
return err
}
if err := srv.registerClearBurnExpiredMsgs(); err != nil {
return err
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
srv.cron.Start()
<-ctx.Done()
@ -121,3 +124,8 @@ func (c *cronServer) registerClearUserMsg() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg)
return errs.WrapMsg(err, "failed to register clear user msg cron task")
}
func (c *cronServer) registerClearBurnExpiredMsgs() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs)
return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task")
}

@ -0,0 +1,134 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewMsgBurnDeadlineMongo(db *mongo.Database) (database.MsgBurnDeadline, error) {
coll := db.Collection(database.MsgBurnDeadlineName)
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{
{Key: "user_id", Value: 1},
{Key: "conversation_id", Value: 1},
{Key: "seq", Value: 1},
},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "deadline_ms", Value: 1}},
},
})
if err != nil {
return nil, errs.Wrap(err)
}
return &msgBurnDeadlineMgo{coll: coll}, nil
}
type msgBurnDeadlineMgo struct {
coll *mongo.Collection
}
func (m *msgBurnDeadlineMgo) UpsertIfAbsent(ctx context.Context, items []*model.MsgBurnDeadline) error {
if len(items) == 0 {
return nil
}
models := make([]mongo.WriteModel, 0, len(items))
for _, item := range items {
filter := bson.M{
"user_id": item.UserID,
"conversation_id": item.ConversationID,
"seq": item.Seq,
}
setOnInsert := bson.M{
"user_id": item.UserID,
"conversation_id": item.ConversationID,
"seq": item.Seq,
"deadline_ms": item.DeadlineMs,
"create_time": item.CreateTime,
}
models = append(models,
mongo.NewUpdateOneModel().
SetFilter(filter).
SetUpdate(bson.M{"$setOnInsert": setOnInsert}).
SetUpsert(true),
)
}
_, err := m.coll.BulkWrite(ctx, models, options.BulkWrite().SetOrdered(false))
return errs.Wrap(err)
}
func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, limit int) ([]*database.ExpiredBurnGroup, error) {
if limit <= 0 {
return nil, nil
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: bson.M{"deadline_ms": bson.M{"$lte": nowMs}}}},
bson.D{{Key: "$group", Value: bson.M{
"_id": bson.M{
"user_id": "$user_id",
"conversation_id": "$conversation_id",
},
"max_seq": bson.M{"$max": "$seq"},
"seqs": bson.M{"$push": "$seq"},
}}},
bson.D{{Key: "$limit", Value: int64(limit)}},
}
type aggRow struct {
ID struct {
UserID string `bson:"user_id"`
ConversationID string `bson:"conversation_id"`
} `bson:"_id"`
MaxSeq int64 `bson:"max_seq"`
Seqs []int64 `bson:"seqs"`
}
rows, err := mongoutil.Aggregate[*aggRow](ctx, m.coll, pipeline)
if err != nil {
return nil, err
}
res := make([]*database.ExpiredBurnGroup, 0, len(rows))
for _, r := range rows {
res = append(res, &database.ExpiredBurnGroup{
UserID: r.ID.UserID,
ConversationID: r.ID.ConversationID,
MaxSeq: r.MaxSeq,
Seqs: r.Seqs,
})
}
return res, nil
}
func (m *msgBurnDeadlineMgo) DeleteByUserConversationSeqs(ctx context.Context, userID, conversationID string, seqs []int64) error {
if len(seqs) == 0 {
return nil
}
filter := bson.M{
"user_id": userID,
"conversation_id": conversationID,
"seq": bson.M{"$in": seqs},
}
_, err := m.coll.DeleteMany(ctx, filter)
return errs.Wrap(err)
}

@ -0,0 +1,48 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
// ExpiredBurnGroup 表示某个 (UserID, ConversationID) 上需要被推进 min_seq 的 seq 集合。
type ExpiredBurnGroup struct {
UserID string
ConversationID string
// MaxSeq 当前批次中最大的过期 seq推进 min_seq 时使用 MaxSeq + 1。
MaxSeq int64
// Seqs 当前批次实际涉及的所有过期 seq便于精确删除已处理的 deadline 记录。
Seqs []int64
}
// MsgBurnDeadline 持久化每条消息对每个用户的「阅后即焚截止时间」。
// 写入位置msg 服务 MarkMsgsAsRead / MarkConversationAsRead单聊
// 消费位置conversation 服务 ClearBurnExpiredMsgs cron 入口。
type MsgBurnDeadline interface {
// UpsertIfAbsent 仅在 (UserID, ConversationID, Seq) 不存在时插入;
// 已存在则不覆盖,保证「首次阅读时刻」决定 deadline。
UpsertIfAbsent(ctx context.Context, items []*model.MsgBurnDeadline) error
// FindExpiredGroups 查询 deadline_ms <= nowMs 的记录,按 (UserID, ConversationID)
// 聚合并返回每组的最大 seq 与所涉及的 seq 列表。limit 限制返回的 group 数量。
FindExpiredGroups(ctx context.Context, nowMs int64, limit int) ([]*ExpiredBurnGroup, error)
// DeleteByUserConversationSeqs 删除某 (UserID, ConversationID) 下指定 seq 列表的 deadline 记录。
// 一般在成功推进 min_seq 后调用。
DeleteByUserConversationSeqs(ctx context.Context, userID, conversationID string, seqs []int64) error
}

@ -23,4 +23,5 @@ const (
SignalInvitationName = "signal_invitation"
SignalRecordName = "signal_record"
SpamReportName = "spam_report"
MsgBurnDeadlineName = "msg_burn_deadline"
)

@ -0,0 +1,31 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
// MsgBurnDeadline 单条消息的「阅后即焚截止时间」记录。
// 在 MarkMsgsAsRead / MarkConversationAsRead单聊场景时按
// (UserID, ConversationID, Seq) 维度写入。读取时间锁定后续不再覆盖。
//
// 当当前时间 > DeadlineMs 时cron 会把该用户在该会话上的 min_seq
// 推进到 max(已过期 seq) + 1从而让这些消息从对该用户的拉取结果中消失。
type MsgBurnDeadline struct {
UserID string `bson:"user_id"`
ConversationID string `bson:"conversation_id"`
Seq int64 `bson:"seq"`
// DeadlineMs 截止时间戳(毫秒);超过即可被 cron 收走推进 min_seq。
DeadlineMs int64 `bson:"deadline_ms"`
// CreateTime 写入时刻(毫秒);用于排查/审计。
CreateTime int64 `bson:"create_time"`
}
Loading…
Cancel
Save