diff --git a/internal/rpc/user/online.go b/internal/rpc/user/online.go index 0e5365ed9..5d8051e2d 100644 --- a/internal/rpc/user/online.go +++ b/internal/rpc/user/online.go @@ -2,7 +2,10 @@ package user import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/protocol/constant" @@ -80,10 +83,49 @@ func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUse if err := s.online.SetUserOnline(ctx, status.UserID, status.Online, status.Offline); err != nil { return nil, err } + s.updateOfflineRecord(ctx, status.UserID, len(status.Offline) > 0, len(status.Online) > 0) } return &pbuser.SetUserOnlineStatusResp{}, nil } +// updateOfflineRecord 根据用户当前在线状态维护 user_offline_record 集合: +// - 若某平台刚断开且用户已全平台离线 → upsert 离线记录(仅首次写入,保留最早离线时刻) +// - 若某平台刚上线且用户当前有在线平台 → 删除离线记录(停止计时) +func (s *userServer) updateOfflineRecord(ctx context.Context, userID string, hasOffline, hasOnline bool) { + if !hasOffline && !hasOnline { + return + } + platformIDs, err := s.online.GetOnline(ctx, userID) + if err != nil { + log.ZWarn(ctx, "updateOfflineRecord: GetOnline failed", err, "userID", userID) + return + } + if len(platformIDs) == 0 { + // 所有平台已离线,写入离线记录(含预计算的删除截止时间) + offlineTime := time.Now() + deadline := s.calcDeleteDeadline(ctx, userID, offlineTime) + if err := s.userOfflineRecord.Upsert(ctx, userID, offlineTime, deadline); err != nil { + log.ZWarn(ctx, "updateOfflineRecord: Upsert failed", err, "userID", userID) + } + } else if hasOnline { + // 用户重新上线,删除离线记录,停止计时 + if err := s.userOfflineRecord.Delete(ctx, userID); err != nil { + log.ZWarn(ctx, "updateOfflineRecord: Delete failed", err, "userID", userID) + } + } +} + +// calcDeleteDeadline 查询用户的 delete_account_interval 并计算删除截止时间。 +// 若查询失败或 interval 为 0,则使用系统默认值(18 个月)。 +func (s *userServer) calcDeleteDeadline(ctx context.Context, userID string, from time.Time) time.Time { + interval := int32(model.DefaultDeleteAccountIntervalSec) + users, err := s.db.Find(ctx, []string{userID}) + if err == nil && len(users) > 0 && users[0].DeleteAccountInterval > 0 { + interval = users[0].DeleteAccountInterval + } + return from.Add(time.Duration(interval) * time.Second) +} + func (s *userServer) GetAllOnlineUsers(ctx context.Context, req *pbuser.GetAllOnlineUsersReq) (*pbuser.GetAllOnlineUsersResp, error) { resMap, nextCursor, err := s.online.GetAllOnlineUsers(ctx, req.Cursor) if err != nil { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index abd64eaf0..607d586e8 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -29,6 +29,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" @@ -71,6 +72,7 @@ type userServer struct { groupClient *rpcli.GroupClient relationClient *rpcli.RelationClient globalBlackDB controller.UserGlobalBlackDatabase + userOfflineRecord database.UserOfflineRecord } type Config struct { @@ -122,6 +124,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi if err != nil { return err } + userOfflineRecordDB, err := mgo.NewUserOfflineRecordMongo(mgocli.GetDB()) + if err != nil { + return err + } localcache.InitLocalCache(&config.LocalCacheConfig) u := &userServer{ online: redis.NewUserOnline(rdb), @@ -132,9 +138,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), - groupClient: rpcli.NewGroupClient(groupConn), - relationClient: rpcli.NewRelationClient(friendConn), - globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo), + groupClient: rpcli.NewGroupClient(groupConn), + relationClient: rpcli.NewRelationClient(friendConn), + globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo), + userOfflineRecord: userOfflineRecordDB, } pbuser.RegisterUserServer(server, u) return u.db.InitOnce(context.Background(), users) @@ -482,6 +489,18 @@ func (s *userServer) SetDeleteAccountInterval(ctx context.Context, req *pbuser.S "deleteAccountInterval", req.DeleteAccountInterval) return nil, err } + // 若用户当前处于离线状态(user_offline_record 中有记录),将 offline_time 与 + // delete_user_deadline 刷新为当前时刻及新截止时间,使倒计时从本次设置时刻重新起算。 + now := time.Now() + interval := req.DeleteAccountInterval + if interval == 0 { + interval = tablerelation.DefaultDeleteAccountIntervalSec + } + newDeadline := now.Add(time.Duration(interval) * time.Second) + if err := s.userOfflineRecord.RefreshOfflineTime(ctx, req.UserID, now, newDeadline); err != nil { + log.ZWarn(ctx, "SetDeleteAccountInterval: RefreshOfflineTime failed", err, + "userID", req.UserID) + } return &pbuser.SetDeleteAccountIntervalResp{}, nil } diff --git a/internal/tools/chat_admin_token.go b/internal/tools/chat_admin_token.go new file mode 100644 index 000000000..93fe01baf --- /dev/null +++ b/internal/tools/chat_admin_token.go @@ -0,0 +1,23 @@ +package tools + +import ( + "context" + + "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/tools/log" +) + +// fetchChatAdminToken 通过 IM auth-rpc GetAdminToken 获取管理员 token。 +// 使用 config.Share.Secret 和第一个 IMAdminUserID 作为凭据。 +func (c *cronServer) fetchChatAdminToken(ctx context.Context) (string, error) { + userID := c.config.Share.IMAdminUserID[0] + resp, err := c.authClient.GetAdminToken(ctx, &auth.GetAdminTokenReq{ + Secret: c.config.Share.Secret, + UserID: userID, + }) + if err != nil { + return "", err + } + log.ZDebug(ctx, "fetchChatAdminToken: ok", "userID", userID) + return resp.Token, nil +} diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 1e28ffafc..8dcb0195a 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -19,9 +19,13 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" @@ -34,9 +38,10 @@ import ( ) type CronTaskConfig struct { - CronTask config.CronTask - Share config.Share - Discovery config.Discovery + CronTask config.CronTask + Share config.Share + Discovery config.Discovery + MongodbConfig config.Mongo } func Start(ctx context.Context, config *CronTaskConfig) error { @@ -55,24 +60,40 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err != nil { return err } - thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { return err } + authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth) + if err != nil { + return err + } + + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + if err != nil { + return errs.WrapMsg(err, "crontask: connect mongodb failed") + } + db := mgocli.GetDB() + + userOfflineRecordDB, err := mgo.NewUserOfflineRecordMongo(db) + if err != nil { + return errs.WrapMsg(err, "crontask: init user_offline_record collection failed") + } srv := &cronServer{ - ctx: ctx, - config: config, - cron: cron.New(), - msgClient: msg.NewMsgClient(msgConn), - conversationClient: pbconversation.NewConversationClient(conversationConn), - thirdClient: third.NewThirdClient(thirdConn), + ctx: ctx, + config: config, + cron: cron.New(), + msgClient: msg.NewMsgClient(msgConn), + conversationClient: pbconversation.NewConversationClient(conversationConn), + thirdClient: third.NewThirdClient(thirdConn), + authClient: rpcli.NewAuthClient(authConn), + userOfflineRecordDB: userOfflineRecordDB, + chatAPIAddress: config.CronTask.ChatAPI.Address, } if err := srv.registerClearS3(); err != nil { @@ -87,6 +108,9 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err := srv.registerClearBurnExpiredMsgs(); err != nil { return err } + if err := srv.registerDeleteExpiredOfflineUsers(); err != nil { + return err + } log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) srv.cron.Start() <-ctx.Done() @@ -94,12 +118,15 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } type cronServer struct { - ctx context.Context - config *CronTaskConfig - cron *cron.Cron - msgClient msg.MsgClient - conversationClient pbconversation.ConversationClient - thirdClient third.ThirdClient + ctx context.Context + config *CronTaskConfig + cron *cron.Cron + msgClient msg.MsgClient + conversationClient pbconversation.ConversationClient + thirdClient third.ThirdClient + authClient *rpcli.AuthClient + userOfflineRecordDB database.UserOfflineRecord + chatAPIAddress string } func (c *cronServer) registerClearS3() error { @@ -129,3 +156,15 @@ func (c *cronServer) registerClearBurnExpiredMsgs() error { _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs) return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task") } + +// registerDeleteExpiredOfflineUsers 注册每小时执行一次的用户自动删除任务。 +// 固定使用 "@hourly" 表达式,与其他任务使用的 CronExecuteTime 独立。 +// chatAPI.address 未配置时跳过注册。 +func (c *cronServer) registerDeleteExpiredOfflineUsers() error { + if c.chatAPIAddress == "" { + log.ZInfo(c.ctx, "disable auto delete expired offline users: chatAPI.address not configured") + return nil + } + _, err := c.cron.AddFunc("@hourly", c.deleteExpiredOfflineUsers) + return errs.WrapMsg(err, "failed to register delete expired offline users cron task") +} diff --git a/internal/tools/delete_expired_user.go b/internal/tools/delete_expired_user.go new file mode 100644 index 000000000..bb303213d --- /dev/null +++ b/internal/tools/delete_expired_user.go @@ -0,0 +1,95 @@ +package tools + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" +) + +const deleteExpiredUserBatchLimit = 100 + +// chatHTTPClient 带超时,防止 chat 服务无响应时 cron worker 永久挂起。 +var chatHTTPClient = &http.Client{Timeout: 3 * time.Second} + +// deleteExpiredOfflineUsers 是 cron "@hourly" 触发的入口。 +// 批量查询离线时长超过 delete_account_interval 的用户并依次调用 chat /account/del 删除。 +func (c *cronServer) deleteExpiredOfflineUsers() { + now := time.Now() + operationID := fmt.Sprintf("cron_del_expired_user_%d_%d", os.Getpid(), now.UnixMilli()) + ctx := mcontext.SetOperationID(c.ctx, operationID) + log.ZInfo(ctx, "deleteExpiredOfflineUsers: start", "time", now) + + users, err := c.userOfflineRecordDB.FindExpiredUsers(ctx, now, deleteExpiredUserBatchLimit) + if err != nil { + log.ZError(ctx, "deleteExpiredOfflineUsers: FindExpiredUsers failed", err) + return + } + if len(users) == 0 { + log.ZDebug(ctx, "deleteExpiredOfflineUsers: no expired users found") + return + } + log.ZInfo(ctx, "deleteExpiredOfflineUsers: found expired users", "count", len(users)) + + adminToken, err := c.fetchChatAdminToken(ctx) + if err != nil { + log.ZError(ctx, "deleteExpiredOfflineUsers: fetchChatAdminToken failed", err) + return + } + + for i, u := range users { + subCtx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i)) + c.deleteExpiredUser(subCtx, adminToken, u.UserID) + } + log.ZInfo(ctx, "deleteExpiredOfflineUsers: done", "count", len(users), "elapsed", time.Since(now)) +} + +// deleteExpiredUser 通过 chat HTTP API POST /account/del 删除单个过期用户。 +// chat 服务端会处理:强制登出、删除好友/群组关系、清理 chat 账号数据等。 +// adminToken 为当次批次开始时通过 admin-api /account/login 获取的管理员 token。 +func (c *cronServer) deleteExpiredUser(ctx context.Context, adminToken, userID string) { + log.ZInfo(ctx, "deleteExpiredUser: start", "userID", userID) + + operationID := mcontext.GetOperationID(ctx) + + body, _ := json.Marshal(map[string]any{"userIDs": []string{userID}}) + url := c.chatAPIAddress + "/account/del" + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + log.ZError(ctx, "deleteExpiredUser: build request failed", err, "userID", userID) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("token", adminToken) + req.Header.Set("operationID", operationID) + + resp, err := chatHTTPClient.Do(req) + if err != nil { + log.ZError(ctx, "deleteExpiredUser: HTTP call failed", err, "userID", userID, "url", url) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + var result map[string]any + _ = json.NewDecoder(resp.Body).Decode(&result) + log.ZError(ctx, "deleteExpiredUser: chat API returned error", + fmt.Errorf("status %d", resp.StatusCode), + "userID", userID, "response", result) + return + } + + // chat /account/del 已处理好友/群组/IM用户删除;仅清理 user_offline_record 防止重复触发 + if err := c.userOfflineRecordDB.Delete(ctx, userID); err != nil { + log.ZWarn(ctx, "deleteExpiredUser: Delete offline record failed", err, "userID", userID) + } + + log.ZInfo(ctx, "deleteExpiredUser: done", "userID", userID) +} diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index d6c5e472e..cb62e6f72 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -33,10 +33,16 @@ type CronTaskCmd struct { func NewCronTaskCmd() *CronTaskCmd { var cronTaskConfig tools.CronTaskConfig ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} + // ChatAPI 配置内嵌在 openim-crontask.yml 的 chatAPI 字段中,无需单独文件。 + // 示例: + // chatAPI: + // address: "http://127.0.0.1:10008" + // adminToken 由 crontask 通过 IM auth-rpc GetAdminToken 自动获取,无需额外配置。 ret.configMap = map[string]any{ OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, ShareFileName: &cronTaskConfig.Share, DiscoveryConfigFilename: &cronTaskConfig.Discovery, + MongodbConfigFileName: &cronTaskConfig.MongodbConfig, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 52ddb7cac..0805e1767 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -119,6 +119,8 @@ type CronTask struct { RetainChatRecords int `mapstructure:"retainChatRecords"` FileExpireTime int `mapstructure:"fileExpireTime"` DeleteObjectType []string `mapstructure:"deleteObjectType"` + // ChatAPI 是 chat HTTP API 服务的访问配置,用于调用 /account/del 等需要管理员权限的接口。 + ChatAPI ChatAPI `mapstructure:"chatAPI"` } type OfflinePushConfig struct { @@ -404,6 +406,13 @@ type Share struct { RPCMaxBodySize MaxRequestBody `mapstructure:"rpcMaxBodySize"` } +// ChatAPI 是 chat HTTP API 服务的访问配置。 +// Address 为 chat-api 根地址(如 http://127.0.0.1:10008),用于 POST /account/del。 +// token 由 crontask 通过 IM auth-rpc GetAdminToken 自动获取,无需手动填写。 +type ChatAPI struct { + Address string `mapstructure:"address"` +} + type MaxRequestBody struct { RequestMaxBodySize int `mapstructure:"requestMaxBodySize"` ResponseMaxBodySize int `mapstructure:"responseMaxBodySize"` diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go index d1ff44101..4568eac5a 100644 --- a/pkg/common/storage/controller/user.go +++ b/pkg/common/storage/controller/user.go @@ -69,6 +69,9 @@ type UserDatabase interface { SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error) + // Delete permanently removes users and invalidates their cache entries. + Delete(ctx context.Context, userIDs []string) error + // CRUD user command AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error @@ -222,6 +225,18 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri return u.userDB.SortQuery(ctx, userIDName, asc) } +func (u *userDatabase) Delete(ctx context.Context, userIDs []string) error { + if len(userIDs) == 0 { + return nil + } + return u.tx.Transaction(ctx, func(ctx context.Context) error { + if err := u.userDB.Delete(ctx, userIDs); err != nil { + return err + } + return u.cache.DelUsersInfo(userIDs...).ChainExecDel(ctx) + }) +} + func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error { return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex) } diff --git a/pkg/common/storage/database/mgo/user_offline_record.go b/pkg/common/storage/database/mgo/user_offline_record.go new file mode 100644 index 000000000..f470fac69 --- /dev/null +++ b/pkg/common/storage/database/mgo/user_offline_record.go @@ -0,0 +1,92 @@ +package mgo + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/errs" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewUserOfflineRecordMongo(db *mongo.Database) (database.UserOfflineRecord, error) { + coll := db.Collection(database.UserOfflineRecordName) + indexes := []mongo.IndexModel{ + { + Keys: bson.D{{Key: "user_id", Value: 1}}, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{{Key: "delete_user_deadline", Value: 1}}, + }, + } + if _, err := coll.Indexes().CreateMany(context.Background(), indexes); err != nil { + return nil, errs.Wrap(err) + } + return &userOfflineRecordMgo{coll: coll}, nil +} + +type userOfflineRecordMgo struct { + coll *mongo.Collection +} + +// Upsert 写入用户的离线记录;若记录已存在则不覆盖($setOnInsert), +// 保留最早一次的全离线时刻作为计时起点。 +// deadline = offlineTime + delete_account_interval,供范围查询快速定位过期账号。 +func (u *userOfflineRecordMgo) Upsert(ctx context.Context, userID string, offlineTime, deadline time.Time) error { + filter := bson.M{"user_id": userID} + update := bson.M{ + "$setOnInsert": bson.M{ + "user_id": userID, + "offline_time": offlineTime, + "delete_user_deadline": deadline, + }, + } + opt := options.Update().SetUpsert(true) + _, err := u.coll.UpdateOne(ctx, filter, update, opt) + return errs.Wrap(err) +} + +// RefreshOfflineTime 将离线记录的 offline_time 与 delete_user_deadline 同时覆盖写为新值($set), +// 仅更新已存在的记录;用户在线时(无记录)不做任何操作。 +// 适用场景:用户修改 delete_account_interval,让倒计时从设置时刻重新起算。 +func (u *userOfflineRecordMgo) RefreshOfflineTime(ctx context.Context, userID string, newOfflineTime, newDeadline time.Time) error { + filter := bson.M{"user_id": userID} + update := bson.M{"$set": bson.M{ + "offline_time": newOfflineTime, + "delete_user_deadline": newDeadline, + }} + _, err := u.coll.UpdateOne(ctx, filter, update) + return errs.Wrap(err) +} + +// Delete 删除用户的离线记录(用户重新上线时调用,停止计时)。 +func (u *userOfflineRecordMgo) Delete(ctx context.Context, userID string) error { + _, err := u.coll.DeleteOne(ctx, bson.M{"user_id": userID}) + return errs.Wrap(err) +} + +// FindExpiredUsers 返回 delete_user_deadline <= now 的用户。 +// 通过 $lookup 联表 user 集合获取完整 *model.User,$unwind 同时起到过滤孤儿记录的作用 +// (若 user 文档已不存在,$unwind 会将其丢弃,避免对无效账号重复触发删除)。 +func (u *userOfflineRecordMgo) FindExpiredUsers(ctx context.Context, now time.Time, limit int) ([]*model.User, error) { + pipeline := bson.A{ + bson.M{"$match": bson.M{ + "delete_user_deadline": bson.M{"$lte": now}, + }}, + bson.M{"$limit": limit}, + bson.M{"$lookup": bson.M{ + "from": database.UserName, + "localField": "user_id", + "foreignField": "user_id", + "as": "u", + }}, + bson.M{"$unwind": "$u"}, + bson.M{"$replaceRoot": bson.M{"newRoot": "$u"}}, + } + return mongoutil.Aggregate[*model.User](ctx, u.coll, pipeline) +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 08f8aa6c3..7d74142ce 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -24,4 +24,5 @@ const ( SignalRecordName = "signal_record" SpamReportName = "spam_report" MsgBurnDeadlineName = "msg_burn_deadline" + UserOfflineRecordName = "user_offline_record" ) diff --git a/pkg/common/storage/database/user_offline_record.go b/pkg/common/storage/database/user_offline_record.go new file mode 100644 index 000000000..5ed1416aa --- /dev/null +++ b/pkg/common/storage/database/user_offline_record.go @@ -0,0 +1,28 @@ +package database + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +// UserOfflineRecord 管理 user_offline_record 集合。 +// 集合中的每条记录代表一个当前处于全平台离线状态的用户及其首次全离线时刻。 +type UserOfflineRecord interface { + // Upsert 写入用户的离线记录;若记录已存在则不覆盖(保留最早的离线时刻)。 + // deadline = offlineTime + delete_account_interval,供 FindExpiredUsers 快速过滤。 + Upsert(ctx context.Context, userID string, offlineTime, deadline time.Time) error + + // RefreshOfflineTime 将已存在的离线记录的 offline_time 与 delete_user_deadline + // 同时刷新,使删除倒计时从 newOfflineTime 重新起算。 + // 若记录不存在(用户在线)则无操作。 + RefreshOfflineTime(ctx context.Context, userID string, newOfflineTime, newDeadline time.Time) error + + // Delete 删除用户的离线记录(用户重新上线时调用)。 + Delete(ctx context.Context, userID string) error + + // FindExpiredUsers 返回 delete_user_deadline <= now 的用户($lookup user 集合获取完整信息)。 + // limit 限制单次返回条数,防止单批处理量过大。 + FindExpiredUsers(ctx context.Context, now time.Time, limit int) ([]*model.User, error) +} diff --git a/pkg/common/storage/model/user_offline_record.go b/pkg/common/storage/model/user_offline_record.go new file mode 100644 index 000000000..a09026ed2 --- /dev/null +++ b/pkg/common/storage/model/user_offline_record.go @@ -0,0 +1,14 @@ +package model + +import "time" + +// UserOfflineRecord 记录用户全平台离线的时刻及账号自动删除截止时间。 +// 用户上线时删除记录;用户全部平台离线时 upsert 记录。 +// crontask 每小时扫描此集合,删除 DeleteUserDeadline <= now 的账号。 +type UserOfflineRecord struct { + UserID string `bson:"user_id"` + OfflineTime time.Time `bson:"offline_time"` + // DeleteUserDeadline = OfflineTime + delete_account_interval(秒) + // 用户修改 delete_account_interval 时同步刷新此字段。 + DeleteUserDeadline time.Time `bson:"delete_user_deadline"` +}