群消息阅后即焚

pull/3727/head
hawklin2017 2 weeks ago
parent d28dd965f2
commit cd68d00ff3

@ -295,6 +295,35 @@ func (o *GroupApi) GetEditSetting(c *gin.Context) {
})
}
// SetMsgBurnDuration 设置群消息阅后即焚时长burnDuration=0 表示关闭。
func (o *GroupApi) SetMsgBurnDuration(c *gin.Context) {
var req struct {
GroupID string `json:"groupID"`
BurnDuration int32 `json:"burnDuration"`
}
if err := c.ShouldBindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WrapMsg(err.Error()))
return
}
if req.GroupID == "" {
apiresp.GinError(c, errs.ErrArgs.WrapMsg("groupID is empty"))
return
}
if req.BurnDuration < 0 {
apiresp.GinError(c, errs.ErrArgs.WrapMsg("burnDuration must be >= 0"))
return
}
resp, err := o.Client.SetGroupInfoEx(c.Request.Context(), &group.SetGroupInfoExReq{
GroupID: req.GroupID,
MsgBurnDuration: wrapperspb.Int32(req.BurnDuration),
})
if err != nil {
apiresp.GinError(c, err)
return
}
apiresp.GinSuccess(c, resp)
}
func (o *GroupApi) JoinGroup(c *gin.Context) {
a2r.Call(c, group.GroupClient.JoinGroup, o.Client)
}

@ -241,6 +241,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
groupRouterGroup.POST("/get_pin_setting", g.GetPinSetting)
groupRouterGroup.POST("/set_edit_setting", g.SetEditSetting)
groupRouterGroup.POST("/get_edit_setting", g.GetEditSetting)
groupRouterGroup.POST("/set_msg_burn_duration", g.SetMsgBurnDuration)
groupRouterGroup.POST("/join_group", g.JoinGroup)
groupRouterGroup.POST("/quit_group", g.QuitGroup)
groupRouterGroup.POST("/group_application_response", g.ApplicationGroupResponse)

@ -49,6 +49,7 @@ type conversationServer struct {
pbconversation.UnimplementedConversationServer
conversationDatabase controller.ConversationDatabase
msgBurnDeadlineDB database.MsgBurnDeadline
groupMsgBurnRecordDB database.GroupMsgBurnRecord
conversationNotificationSender *ConversationNotificationSender
config *Config
@ -85,6 +86,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
groupMsgBurnRecordDB, err := mgo.NewGroupMsgBurnRecordMongo(mgocli.GetDB())
if err != nil {
return err
}
userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User)
if err != nil {
return err
@ -103,10 +108,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()),
msgBurnDeadlineDB: msgBurnDeadlineDB,
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
msgBurnDeadlineDB: msgBurnDeadlineDB,
groupMsgBurnRecordDB: groupMsgBurnRecordDB,
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
})
return nil
}
@ -878,3 +884,70 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco
}
return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil
}
// ClearGroupBurnExpiredMsgs 处理群消息「阅后即焚」到期记录:
// 1. 查询满足 read_count >= member_count 且 burn_end_time 过期的记录(按 group_id 聚合)。
// 2. 对每个群,获取所有成员 ID批量推进他们在群会话上的 min_seq。
// 3. 更新每个成员的会话 min_seq 并下发 ConversationChangeNotification。
// 4. 删除已处理的 group_msg_burn_record 记录。
func (c *conversationServer) ClearGroupBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearGroupBurnExpiredMsgsReq) (*pbconversation.ClearGroupBurnExpiredMsgsResp, error) {
if c.groupMsgBurnRecordDB == nil {
return &pbconversation.ClearGroupBurnExpiredMsgsResp{Count: 0}, nil
}
limit := int(req.Limit)
if limit <= 0 {
limit = 100
}
groups, err := c.groupMsgBurnRecordDB.FindExpired(ctx, req.Timestamp, limit)
if err != nil {
return nil, err
}
var processed int32
for _, g := range groups {
if g.GroupID == "" || g.MaxSeq <= 0 {
continue
}
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, g.GroupID)
newMinSeq := g.MaxSeq + 1
// 获取群所有成员 ID
memberIDs, err := c.groupClient.GetGroupMemberUserIDs(ctx, g.GroupID)
if err != nil {
log.ZError(ctx, "ClearGroupBurnExpiredMsgs GetGroupMemberUserIDs failed", err,
"groupID", g.GroupID)
continue
}
if len(memberIDs) == 0 {
continue
}
// 批量推进所有成员的 min_seqseq 层)
if err := c.msgClient.SetUserConversationMin(ctx, conversationID, memberIDs, newMinSeq); err != nil {
log.ZError(ctx, "ClearGroupBurnExpiredMsgs SetUserConversationMin failed", err,
"groupID", g.GroupID, "conversationID", conversationID, "minSeq", newMinSeq)
continue
}
// 更新每个成员会话文档中的 min_seq 并发送通知
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, memberIDs, conversationID,
map[string]any{"min_seq": newMinSeq}); err != nil {
log.ZError(ctx, "ClearGroupBurnExpiredMsgs UpdateUsersConversationField failed", err,
"groupID", g.GroupID, "conversationID", conversationID, "minSeq", newMinSeq)
continue
}
for _, memberID := range memberIDs {
c.conversationNotificationSender.ConversationChangeNotification(ctx, memberID, []string{conversationID})
}
// 删除已处理记录
if err := c.groupMsgBurnRecordDB.DeleteByGroupSeqs(ctx, g.GroupID, g.Seqs); err != nil {
log.ZError(ctx, "ClearGroupBurnExpiredMsgs DeleteByGroupSeqs failed", err,
"groupID", g.GroupID, "seqs", g.Seqs)
}
log.ZDebug(ctx, "ClearGroupBurnExpiredMsgs advanced min_seq for group",
"groupID", g.GroupID, "conversationID", conversationID,
"minSeq", newMinSeq, "memberCount", len(memberIDs), "seqs", g.Seqs)
processed++
}
return &pbconversation.ClearGroupBurnExpiredMsgsResp{Count: processed}, nil
}

@ -128,6 +128,10 @@ func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq)
m["allow_edit_group_info"] = group.AllowEditGroupInfo.Value
normalFlag = true
}
if group.MsgBurnDuration != nil {
m["msg_burn_duration"] = group.MsgBurnDuration.Value
normalFlag = true
}
return m, normalFlag, groupNameFlag, notificationFlag, nil
}

@ -175,6 +175,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq)
} else if conversation.ConversationType == constant.ReadGroupChatType ||
conversation.ConversationType == constant.NotificationChatType {
var oldHasReadSeq int64 = hasReadSeq
if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
if err != nil {
@ -182,6 +183,14 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
}
hasReadSeq = req.HasReadSeq
}
// 计算本次新增已读的 seq 范围,用于阅后即焚计数
if conversation.ConversationType == constant.ReadGroupChatType && req.HasReadSeq > oldHasReadSeq {
var groupSeqs []int64
for i := oldHasReadSeq + 1; i <= req.HasReadSeq; i++ {
groupSeqs = append(groupSeqs, i)
}
m.recordGroupBurnReadCount(ctx, conversation, groupSeqs)
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
req.UserID, seqs, hasReadSeq)
}
@ -264,6 +273,30 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.
}
}
// recordGroupBurnReadCount 在群聊阅读时记录「阅后即焚」进度。
// 每次已读触发 $inc read_count首次写入时记录 member_count 与 burn_end_time。
// 仅在群的 MsgBurnDuration > 0 时生效;失败只记日志,不影响主流程。
func (m *msgServer) recordGroupBurnReadCount(ctx context.Context, conv *conversation.Conversation, seqs []int64) {
if len(seqs) == 0 || m.groupMsgBurnRecordDB == nil {
return
}
groupInfo, err := m.GroupLocalCache.GetGroupInfo(ctx, conv.GroupID)
if err != nil {
log.ZWarn(ctx, "recordGroupBurnReadCount GetGroupInfo failed", err, "groupID", conv.GroupID)
return
}
if groupInfo.MsgBurnDuration <= 0 {
return
}
now := time.Now().UnixMilli()
burnEndTimeMs := now + int64(groupInfo.MsgBurnDuration)*1000
memberCount := int32(groupInfo.MemberCount)
if err := m.groupMsgBurnRecordDB.UpsertOnRead(ctx, conv.GroupID, seqs, memberCount, burnEndTimeMs); err != nil {
log.ZError(ctx, "recordGroupBurnReadCount UpsertOnRead failed", err,
"groupID", conv.GroupID, "seqs", seqs)
}
}
func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) {
tips := &sdkws.MarkAsReadTips{
MarkAsReadUserID: sendID,

@ -74,6 +74,7 @@ type msgServer struct {
globalBlackDB controller.UserGlobalBlackDatabase
userMuteDB controller.UserMuteDatabase
msgBurnDeadlineDB database.MsgBurnDeadline
groupMsgBurnRecordDB database.GroupMsgBurnRecord
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
@ -138,6 +139,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
groupMsgBurnRecordDB, err := mgo.NewGroupMsgBurnRecordMongo(mgocli.GetDB())
if err != nil {
return err
}
userMuteMgo, err := mgo.NewUserMuteMongo(mgocli.GetDB())
if err != nil {
return err
@ -156,6 +161,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
userMuteDB: controller.NewUserMuteDatabase(userMuteMgo),
msgBurnDeadlineDB: msgBurnDeadlineDB,
groupMsgBurnRecordDB: groupMsgBurnRecordDB,
}
s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg))

@ -53,3 +53,33 @@ func (c *cronServer) clearBurnExpiredMsgs() {
}
log.ZDebug(ctx, "clear burn expired msgs cron completed", "cost", time.Since(now), "count", count)
}
// clearGroupBurnExpiredMsgs 群消息阅后即焚 cron 入口:循环调用 conversation 服务的
// ClearGroupBurnExpiredMsgs每次至多处理 burnLimit 个 group 分组,
// 直至本轮没有到期分组或达到防御性的最大循环次数。
func (c *cronServer) clearGroupBurnExpiredMsgs() {
now := time.Now()
operationID := fmt.Sprintf("cron_group_burn_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear group burn expired msgs cron start")
const (
maxLoop = 10000
burnLimit = 100
)
var count int
for i := 1; i <= maxLoop; i++ {
resp, err := c.conversationClient.ClearGroupBurnExpiredMsgs(ctx, &pbconversation.ClearGroupBurnExpiredMsgsReq{
Timestamp: now.UnixMilli(),
Limit: burnLimit,
})
if err != nil {
log.ZError(ctx, "ClearGroupBurnExpiredMsgs failed.", err)
return
}
count += int(resp.Count)
if resp.Count < burnLimit {
break
}
}
log.ZDebug(ctx, "clear group burn expired msgs cron completed", "cost", time.Since(now), "count", count)
}

@ -108,6 +108,9 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerClearBurnExpiredMsgs(); err != nil {
return err
}
if err := srv.registerClearGroupBurnExpiredMsgs(); err != nil {
return err
}
if err := srv.registerDeleteExpiredOfflineUsers(); err != nil {
return err
}
@ -157,6 +160,11 @@ func (c *cronServer) registerClearBurnExpiredMsgs() error {
return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task")
}
func (c *cronServer) registerClearGroupBurnExpiredMsgs() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearGroupBurnExpiredMsgs)
return errs.WrapMsg(err, "failed to register clear group burn expired msgs cron task")
}
// registerDeleteExpiredOfflineUsers 注册每小时执行一次的用户自动删除任务。
// 固定使用 "@hourly" 表达式,与其他任务使用的 CronExecuteTime 独立。
// chatAPI.address 未配置时跳过注册。

@ -45,6 +45,7 @@ func Db2PbGroupInfo(m *model.Group, ownerUserID string, memberCount uint32) *sdk
AllowPinMsg: m.AllowPinMsg,
AllowAddMember: m.AllowAddMember,
AllowEditGroupInfo: m.AllowEditGroupInfo,
MsgBurnDuration: m.MsgBurnDuration,
}
}

@ -0,0 +1,48 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package database
import "context"
const GroupMsgBurnRecordName = "group_msg_burn_record"
// ExpiredGroupBurn 表示一个群中某批到期消息的聚合结果。
type ExpiredGroupBurn struct {
// GroupID 群组 ID
GroupID string
// MaxSeq 该批中最大的 seq推进 min_seq 时使用 MaxSeq + 1
MaxSeq int64
// Seqs 该批所有到期的 seq 列表,用于精确删除已处理记录
Seqs []int64
}
// GroupMsgBurnRecord 持久化群消息「阅后即焚」的阅读计数与截止时间。
//
// 写入msg 服务 MarkConversationAsRead 群聊分支。
// 消费conversation 服务 ClearGroupBurnExpiredMsgs cron 入口。
type GroupMsgBurnRecord interface {
// UpsertOnRead 批量原子更新阅读记录:
// - 若 (group_id, seq) 不存在:插入 {member_count, burn_end_time, create_time, read_count=1}
// - 若已存在:仅对 read_count 执行 $inc不覆盖首次写入的 burn_end_time
UpsertOnRead(ctx context.Context, groupID string, seqs []int64, memberCount int32, burnEndTimeMs int64) error
// FindExpired 查询满足以下条件的记录并按 group_id 聚合:
// burn_end_time <= nowMs AND read_count >= member_count
// limit 限制返回的 group 数量。
FindExpired(ctx context.Context, nowMs int64, limit int) ([]*ExpiredGroupBurn, error)
// DeleteByGroupSeqs 删除指定群下一批 seq 的记录,在成功推进 min_seq 后调用。
DeleteByGroupSeqs(ctx context.Context, groupID string, seqs []int64) error
}

@ -0,0 +1,139 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mgo
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// NewGroupMsgBurnRecordMongo 初始化 group_msg_burn_record 集合及索引。
func NewGroupMsgBurnRecordMongo(db *mongo.Database) (database.GroupMsgBurnRecord, error) {
coll := db.Collection(database.GroupMsgBurnRecordName)
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{
{Key: "group_id", Value: 1},
{Key: "seq", Value: 1},
},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "burn_end_time", Value: 1}},
},
})
if err != nil {
return nil, errs.Wrap(err)
}
return &groupMsgBurnRecordMgo{coll: coll}, nil
}
type groupMsgBurnRecordMgo struct {
coll *mongo.Collection
}
// UpsertOnRead 对每条 seq 执行 upsert
// - 首次插入($setOnInsert写入 member_count、burn_end_time、create_timeread_count 初始化为 1。
// - 已存在时仅对 read_count 执行 $inc 1。
func (m *groupMsgBurnRecordMgo) UpsertOnRead(ctx context.Context, groupID string, seqs []int64, memberCount int32, burnEndTimeMs int64) error {
if len(seqs) == 0 {
return nil
}
now := time.Now().UnixMilli()
models := make([]mongo.WriteModel, 0, len(seqs))
for _, seq := range seqs {
filter := bson.M{
"group_id": groupID,
"seq": seq,
}
update := bson.M{
"$inc": bson.M{"read_count": int32(1)},
"$setOnInsert": bson.M{
"group_id": groupID,
"seq": seq,
"member_count": memberCount,
"burn_end_time": burnEndTimeMs,
"create_time": now,
},
}
models = append(models,
mongo.NewUpdateOneModel().
SetFilter(filter).
SetUpdate(update).
SetUpsert(true),
)
}
_, err := m.coll.BulkWrite(ctx, models, options.BulkWrite().SetOrdered(false))
return errs.Wrap(err)
}
// FindExpired 查询 burn_end_time <= nowMs 且 read_count >= member_count 的记录,
// 按 group_id 聚合后返回每组的最大 seq 与所有 seq 列表。
func (m *groupMsgBurnRecordMgo) FindExpired(ctx context.Context, nowMs int64, limit int) ([]*database.ExpiredGroupBurn, error) {
if limit <= 0 {
return nil, nil
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: bson.M{
"burn_end_time": bson.M{"$lte": nowMs},
"$expr": bson.M{"$gte": bson.A{"$read_count", "$member_count"}},
}}},
bson.D{{Key: "$group", Value: bson.M{
"_id": "$group_id",
"max_seq": bson.M{"$max": "$seq"},
"seqs": bson.M{"$push": "$seq"},
}}},
bson.D{{Key: "$limit", Value: int64(limit)}},
}
type aggRow struct {
GroupID string `bson:"_id"`
MaxSeq int64 `bson:"max_seq"`
Seqs []int64 `bson:"seqs"`
}
rows, err := mongoutil.Aggregate[*aggRow](ctx, m.coll, pipeline)
if err != nil {
return nil, err
}
res := make([]*database.ExpiredGroupBurn, 0, len(rows))
for _, r := range rows {
res = append(res, &database.ExpiredGroupBurn{
GroupID: r.GroupID,
MaxSeq: r.MaxSeq,
Seqs: r.Seqs,
})
}
return res, nil
}
// DeleteByGroupSeqs 删除指定群下一批 seq 的记录。
func (m *groupMsgBurnRecordMgo) DeleteByGroupSeqs(ctx context.Context, groupID string, seqs []int64) error {
if len(seqs) == 0 {
return nil
}
filter := bson.M{
"group_id": groupID,
"seq": bson.M{"$in": seqs},
}
_, err := m.coll.DeleteMany(ctx, filter)
return errs.Wrap(err)
}

@ -49,4 +49,6 @@ type Group struct {
AllowAddMember int32 `bson:"allow_add_member"`
// AllowEditGroupInfo 0=全员可编辑群资料 1=仅群主/管理员可编辑群资料
AllowEditGroupInfo int32 `bson:"allow_edit_group_info"`
// MsgBurnDuration 群消息阅后即焚时长0 表示未开启
MsgBurnDuration int32 `bson:"msg_burn_duration"`
}

@ -0,0 +1,37 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
// GroupMsgBurnRecord 记录群消息「阅后即焚」的阅读进度。
//
// 写入时机:某成员首次通过 MarkConversationAsRead 标记某 seq 为已读时创建记录,
// 后续成员已读时通过原子 $inc 累加 ReadCount。
//
// 删除时机cron 发现 ReadCount >= MemberCount 且 BurnEndTime <= now 时触发删除,
// 同步推进所有成员的 min_seq 并发送会话变更通知。
type GroupMsgBurnRecord struct {
// GroupID 群组 ID
GroupID string `bson:"group_id"`
// Seq 消息序列号
Seq int64 `bson:"seq"`
// ReadCount 已阅读该消息的成员数量(原子累加)
ReadCount int32 `bson:"read_count"`
// MemberCount 创建记录时的群成员总数;用于判断是否全员已读
MemberCount int32 `bson:"member_count"`
// BurnEndTime 消息焚毁截止时间戳(毫秒);首次阅读时写入,不再覆盖
BurnEndTime int64 `bson:"burn_end_time"`
// CreateTime 记录创建时间戳(毫秒)
CreateTime int64 `bson:"create_time"`
}
Loading…
Cancel
Save