删除账户,默认18个月

pull/3727/head
hawklin2017 3 weeks ago
parent fb9eb73ef2
commit aa2eaee960

@ -2,7 +2,10 @@ package user
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/protocol/constant"
@ -80,10 +83,49 @@ func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUse
if err := s.online.SetUserOnline(ctx, status.UserID, status.Online, status.Offline); err != nil {
return nil, err
}
s.updateOfflineRecord(ctx, status.UserID, len(status.Offline) > 0, len(status.Online) > 0)
}
return &pbuser.SetUserOnlineStatusResp{}, nil
}
// updateOfflineRecord 根据用户当前在线状态维护 user_offline_record 集合:
// - 若某平台刚断开且用户已全平台离线 → upsert 离线记录(仅首次写入,保留最早离线时刻)
// - 若某平台刚上线且用户当前有在线平台 → 删除离线记录(停止计时)
func (s *userServer) updateOfflineRecord(ctx context.Context, userID string, hasOffline, hasOnline bool) {
if !hasOffline && !hasOnline {
return
}
platformIDs, err := s.online.GetOnline(ctx, userID)
if err != nil {
log.ZWarn(ctx, "updateOfflineRecord: GetOnline failed", err, "userID", userID)
return
}
if len(platformIDs) == 0 {
// 所有平台已离线,写入离线记录(含预计算的删除截止时间)
offlineTime := time.Now()
deadline := s.calcDeleteDeadline(ctx, userID, offlineTime)
if err := s.userOfflineRecord.Upsert(ctx, userID, offlineTime, deadline); err != nil {
log.ZWarn(ctx, "updateOfflineRecord: Upsert failed", err, "userID", userID)
}
} else if hasOnline {
// 用户重新上线,删除离线记录,停止计时
if err := s.userOfflineRecord.Delete(ctx, userID); err != nil {
log.ZWarn(ctx, "updateOfflineRecord: Delete failed", err, "userID", userID)
}
}
}
// calcDeleteDeadline 查询用户的 delete_account_interval 并计算删除截止时间。
// 若查询失败或 interval 为 0则使用系统默认值18 个月)。
func (s *userServer) calcDeleteDeadline(ctx context.Context, userID string, from time.Time) time.Time {
interval := int32(model.DefaultDeleteAccountIntervalSec)
users, err := s.db.Find(ctx, []string{userID})
if err == nil && len(users) > 0 && users[0].DeleteAccountInterval > 0 {
interval = users[0].DeleteAccountInterval
}
return from.Add(time.Duration(interval) * time.Second)
}
func (s *userServer) GetAllOnlineUsers(ctx context.Context, req *pbuser.GetAllOnlineUsersReq) (*pbuser.GetAllOnlineUsersResp, error) {
resMap, nextCursor, err := s.online.GetAllOnlineUsers(ctx, req.Cursor)
if err != nil {

@ -29,6 +29,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
@ -71,6 +72,7 @@ type userServer struct {
groupClient *rpcli.GroupClient
relationClient *rpcli.RelationClient
globalBlackDB controller.UserGlobalBlackDatabase
userOfflineRecord database.UserOfflineRecord
}
type Config struct {
@ -122,6 +124,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
if err != nil {
return err
}
userOfflineRecordDB, err := mgo.NewUserOfflineRecordMongo(mgocli.GetDB())
if err != nil {
return err
}
localcache.InitLocalCache(&config.LocalCacheConfig)
u := &userServer{
online: redis.NewUserOnline(rdb),
@ -132,9 +138,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
groupClient: rpcli.NewGroupClient(groupConn),
relationClient: rpcli.NewRelationClient(friendConn),
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
groupClient: rpcli.NewGroupClient(groupConn),
relationClient: rpcli.NewRelationClient(friendConn),
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
userOfflineRecord: userOfflineRecordDB,
}
pbuser.RegisterUserServer(server, u)
return u.db.InitOnce(context.Background(), users)
@ -482,6 +489,18 @@ func (s *userServer) SetDeleteAccountInterval(ctx context.Context, req *pbuser.S
"deleteAccountInterval", req.DeleteAccountInterval)
return nil, err
}
// 若用户当前处于离线状态user_offline_record 中有记录),将 offline_time 与
// delete_user_deadline 刷新为当前时刻及新截止时间,使倒计时从本次设置时刻重新起算。
now := time.Now()
interval := req.DeleteAccountInterval
if interval == 0 {
interval = tablerelation.DefaultDeleteAccountIntervalSec
}
newDeadline := now.Add(time.Duration(interval) * time.Second)
if err := s.userOfflineRecord.RefreshOfflineTime(ctx, req.UserID, now, newDeadline); err != nil {
log.ZWarn(ctx, "SetDeleteAccountInterval: RefreshOfflineTime failed", err,
"userID", req.UserID)
}
return &pbuser.SetDeleteAccountIntervalResp{}, nil
}

@ -0,0 +1,23 @@
package tools
import (
"context"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/log"
)
// fetchChatAdminToken 通过 IM auth-rpc GetAdminToken 获取管理员 token。
// 使用 config.Share.Secret 和第一个 IMAdminUserID 作为凭据。
func (c *cronServer) fetchChatAdminToken(ctx context.Context) (string, error) {
userID := c.config.Share.IMAdminUserID[0]
resp, err := c.authClient.GetAdminToken(ctx, &auth.GetAdminTokenReq{
Secret: c.config.Share.Secret,
UserID: userID,
})
if err != nil {
return "", err
}
log.ZDebug(ctx, "fetchChatAdminToken: ok", "userID", userID)
return resp.Token, nil
}

@ -19,9 +19,13 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
@ -34,9 +38,10 @@ import (
)
type CronTaskConfig struct {
CronTask config.CronTask
Share config.Share
Discovery config.Discovery
CronTask config.CronTask
Share config.Share
Discovery config.Discovery
MongodbConfig config.Mongo
}
func Start(ctx context.Context, config *CronTaskConfig) error {
@ -55,24 +60,40 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err != nil {
return err
}
thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
if err != nil {
return err
}
authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth)
if err != nil {
return err
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return errs.WrapMsg(err, "crontask: connect mongodb failed")
}
db := mgocli.GetDB()
userOfflineRecordDB, err := mgo.NewUserOfflineRecordMongo(db)
if err != nil {
return errs.WrapMsg(err, "crontask: init user_offline_record collection failed")
}
srv := &cronServer{
ctx: ctx,
config: config,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
ctx: ctx,
config: config,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
authClient: rpcli.NewAuthClient(authConn),
userOfflineRecordDB: userOfflineRecordDB,
chatAPIAddress: config.CronTask.ChatAPI.Address,
}
if err := srv.registerClearS3(); err != nil {
@ -87,6 +108,9 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerClearBurnExpiredMsgs(); err != nil {
return err
}
if err := srv.registerDeleteExpiredOfflineUsers(); err != nil {
return err
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
srv.cron.Start()
<-ctx.Done()
@ -94,12 +118,15 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
}
type cronServer struct {
ctx context.Context
config *CronTaskConfig
cron *cron.Cron
msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient
thirdClient third.ThirdClient
ctx context.Context
config *CronTaskConfig
cron *cron.Cron
msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient
thirdClient third.ThirdClient
authClient *rpcli.AuthClient
userOfflineRecordDB database.UserOfflineRecord
chatAPIAddress string
}
func (c *cronServer) registerClearS3() error {
@ -129,3 +156,15 @@ func (c *cronServer) registerClearBurnExpiredMsgs() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs)
return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task")
}
// registerDeleteExpiredOfflineUsers 注册每小时执行一次的用户自动删除任务。
// 固定使用 "@hourly" 表达式,与其他任务使用的 CronExecuteTime 独立。
// chatAPI.address 未配置时跳过注册。
func (c *cronServer) registerDeleteExpiredOfflineUsers() error {
if c.chatAPIAddress == "" {
log.ZInfo(c.ctx, "disable auto delete expired offline users: chatAPI.address not configured")
return nil
}
_, err := c.cron.AddFunc("@hourly", c.deleteExpiredOfflineUsers)
return errs.WrapMsg(err, "failed to register delete expired offline users cron task")
}

@ -0,0 +1,95 @@
package tools
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
)
const deleteExpiredUserBatchLimit = 100
// chatHTTPClient 带超时,防止 chat 服务无响应时 cron worker 永久挂起。
var chatHTTPClient = &http.Client{Timeout: 3 * time.Second}
// deleteExpiredOfflineUsers 是 cron "@hourly" 触发的入口。
// 批量查询离线时长超过 delete_account_interval 的用户并依次调用 chat /account/del 删除。
func (c *cronServer) deleteExpiredOfflineUsers() {
now := time.Now()
operationID := fmt.Sprintf("cron_del_expired_user_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZInfo(ctx, "deleteExpiredOfflineUsers: start", "time", now)
users, err := c.userOfflineRecordDB.FindExpiredUsers(ctx, now, deleteExpiredUserBatchLimit)
if err != nil {
log.ZError(ctx, "deleteExpiredOfflineUsers: FindExpiredUsers failed", err)
return
}
if len(users) == 0 {
log.ZDebug(ctx, "deleteExpiredOfflineUsers: no expired users found")
return
}
log.ZInfo(ctx, "deleteExpiredOfflineUsers: found expired users", "count", len(users))
adminToken, err := c.fetchChatAdminToken(ctx)
if err != nil {
log.ZError(ctx, "deleteExpiredOfflineUsers: fetchChatAdminToken failed", err)
return
}
for i, u := range users {
subCtx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
c.deleteExpiredUser(subCtx, adminToken, u.UserID)
}
log.ZInfo(ctx, "deleteExpiredOfflineUsers: done", "count", len(users), "elapsed", time.Since(now))
}
// deleteExpiredUser 通过 chat HTTP API POST /account/del 删除单个过期用户。
// chat 服务端会处理:强制登出、删除好友/群组关系、清理 chat 账号数据等。
// adminToken 为当次批次开始时通过 admin-api /account/login 获取的管理员 token。
func (c *cronServer) deleteExpiredUser(ctx context.Context, adminToken, userID string) {
log.ZInfo(ctx, "deleteExpiredUser: start", "userID", userID)
operationID := mcontext.GetOperationID(ctx)
body, _ := json.Marshal(map[string]any{"userIDs": []string{userID}})
url := c.chatAPIAddress + "/account/del"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
log.ZError(ctx, "deleteExpiredUser: build request failed", err, "userID", userID)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("token", adminToken)
req.Header.Set("operationID", operationID)
resp, err := chatHTTPClient.Do(req)
if err != nil {
log.ZError(ctx, "deleteExpiredUser: HTTP call failed", err, "userID", userID, "url", url)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
var result map[string]any
_ = json.NewDecoder(resp.Body).Decode(&result)
log.ZError(ctx, "deleteExpiredUser: chat API returned error",
fmt.Errorf("status %d", resp.StatusCode),
"userID", userID, "response", result)
return
}
// chat /account/del 已处理好友/群组/IM用户删除仅清理 user_offline_record 防止重复触发
if err := c.userOfflineRecordDB.Delete(ctx, userID); err != nil {
log.ZWarn(ctx, "deleteExpiredUser: Delete offline record failed", err, "userID", userID)
}
log.ZInfo(ctx, "deleteExpiredUser: done", "userID", userID)
}

@ -33,10 +33,16 @@ type CronTaskCmd struct {
func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig tools.CronTaskConfig
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
// ChatAPI 配置内嵌在 openim-crontask.yml 的 chatAPI 字段中,无需单独文件。
// 示例:
// chatAPI:
// address: "http://127.0.0.1:10008"
// adminToken 由 crontask 通过 IM auth-rpc GetAdminToken 自动获取,无需额外配置。
ret.configMap = map[string]any{
OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
ShareFileName: &cronTaskConfig.Share,
DiscoveryConfigFilename: &cronTaskConfig.Discovery,
MongodbConfigFileName: &cronTaskConfig.MongodbConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)

@ -119,6 +119,8 @@ type CronTask struct {
RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"`
DeleteObjectType []string `mapstructure:"deleteObjectType"`
// ChatAPI 是 chat HTTP API 服务的访问配置,用于调用 /account/del 等需要管理员权限的接口。
ChatAPI ChatAPI `mapstructure:"chatAPI"`
}
type OfflinePushConfig struct {
@ -404,6 +406,13 @@ type Share struct {
RPCMaxBodySize MaxRequestBody `mapstructure:"rpcMaxBodySize"`
}
// ChatAPI 是 chat HTTP API 服务的访问配置。
// Address 为 chat-api 根地址(如 http://127.0.0.1:10008用于 POST /account/del。
// token 由 crontask 通过 IM auth-rpc GetAdminToken 自动获取,无需手动填写。
type ChatAPI struct {
Address string `mapstructure:"address"`
}
type MaxRequestBody struct {
RequestMaxBodySize int `mapstructure:"requestMaxBodySize"`
ResponseMaxBodySize int `mapstructure:"responseMaxBodySize"`

@ -69,6 +69,9 @@ type UserDatabase interface {
SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
// Delete permanently removes users and invalidates their cache entries.
Delete(ctx context.Context, userIDs []string) error
// CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error
@ -222,6 +225,18 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri
return u.userDB.SortQuery(ctx, userIDName, asc)
}
func (u *userDatabase) Delete(ctx context.Context, userIDs []string) error {
if len(userIDs) == 0 {
return nil
}
return u.tx.Transaction(ctx, func(ctx context.Context) error {
if err := u.userDB.Delete(ctx, userIDs); err != nil {
return err
}
return u.cache.DelUsersInfo(userIDs...).ChainExecDel(ctx)
})
}
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error {
return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex)
}

@ -0,0 +1,92 @@
package mgo
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewUserOfflineRecordMongo(db *mongo.Database) (database.UserOfflineRecord, error) {
coll := db.Collection(database.UserOfflineRecordName)
indexes := []mongo.IndexModel{
{
Keys: bson.D{{Key: "user_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "delete_user_deadline", Value: 1}},
},
}
if _, err := coll.Indexes().CreateMany(context.Background(), indexes); err != nil {
return nil, errs.Wrap(err)
}
return &userOfflineRecordMgo{coll: coll}, nil
}
type userOfflineRecordMgo struct {
coll *mongo.Collection
}
// Upsert 写入用户的离线记录;若记录已存在则不覆盖($setOnInsert
// 保留最早一次的全离线时刻作为计时起点。
// deadline = offlineTime + delete_account_interval供范围查询快速定位过期账号。
func (u *userOfflineRecordMgo) Upsert(ctx context.Context, userID string, offlineTime, deadline time.Time) error {
filter := bson.M{"user_id": userID}
update := bson.M{
"$setOnInsert": bson.M{
"user_id": userID,
"offline_time": offlineTime,
"delete_user_deadline": deadline,
},
}
opt := options.Update().SetUpsert(true)
_, err := u.coll.UpdateOne(ctx, filter, update, opt)
return errs.Wrap(err)
}
// RefreshOfflineTime 将离线记录的 offline_time 与 delete_user_deadline 同时覆盖写为新值($set
// 仅更新已存在的记录;用户在线时(无记录)不做任何操作。
// 适用场景:用户修改 delete_account_interval让倒计时从设置时刻重新起算。
func (u *userOfflineRecordMgo) RefreshOfflineTime(ctx context.Context, userID string, newOfflineTime, newDeadline time.Time) error {
filter := bson.M{"user_id": userID}
update := bson.M{"$set": bson.M{
"offline_time": newOfflineTime,
"delete_user_deadline": newDeadline,
}}
_, err := u.coll.UpdateOne(ctx, filter, update)
return errs.Wrap(err)
}
// Delete 删除用户的离线记录(用户重新上线时调用,停止计时)。
func (u *userOfflineRecordMgo) Delete(ctx context.Context, userID string) error {
_, err := u.coll.DeleteOne(ctx, bson.M{"user_id": userID})
return errs.Wrap(err)
}
// FindExpiredUsers 返回 delete_user_deadline <= now 的用户。
// 通过 $lookup 联表 user 集合获取完整 *model.User$unwind 同时起到过滤孤儿记录的作用
// (若 user 文档已不存在,$unwind 会将其丢弃,避免对无效账号重复触发删除)。
func (u *userOfflineRecordMgo) FindExpiredUsers(ctx context.Context, now time.Time, limit int) ([]*model.User, error) {
pipeline := bson.A{
bson.M{"$match": bson.M{
"delete_user_deadline": bson.M{"$lte": now},
}},
bson.M{"$limit": limit},
bson.M{"$lookup": bson.M{
"from": database.UserName,
"localField": "user_id",
"foreignField": "user_id",
"as": "u",
}},
bson.M{"$unwind": "$u"},
bson.M{"$replaceRoot": bson.M{"newRoot": "$u"}},
}
return mongoutil.Aggregate[*model.User](ctx, u.coll, pipeline)
}

@ -24,4 +24,5 @@ const (
SignalRecordName = "signal_record"
SpamReportName = "spam_report"
MsgBurnDeadlineName = "msg_burn_deadline"
UserOfflineRecordName = "user_offline_record"
)

@ -0,0 +1,28 @@
package database
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
// UserOfflineRecord 管理 user_offline_record 集合。
// 集合中的每条记录代表一个当前处于全平台离线状态的用户及其首次全离线时刻。
type UserOfflineRecord interface {
// Upsert 写入用户的离线记录;若记录已存在则不覆盖(保留最早的离线时刻)。
// deadline = offlineTime + delete_account_interval供 FindExpiredUsers 快速过滤。
Upsert(ctx context.Context, userID string, offlineTime, deadline time.Time) error
// RefreshOfflineTime 将已存在的离线记录的 offline_time 与 delete_user_deadline
// 同时刷新,使删除倒计时从 newOfflineTime 重新起算。
// 若记录不存在(用户在线)则无操作。
RefreshOfflineTime(ctx context.Context, userID string, newOfflineTime, newDeadline time.Time) error
// Delete 删除用户的离线记录(用户重新上线时调用)。
Delete(ctx context.Context, userID string) error
// FindExpiredUsers 返回 delete_user_deadline <= now 的用户($lookup user 集合获取完整信息)。
// limit 限制单次返回条数,防止单批处理量过大。
FindExpiredUsers(ctx context.Context, now time.Time, limit int) ([]*model.User, error)
}

@ -0,0 +1,14 @@
package model
import "time"
// UserOfflineRecord 记录用户全平台离线的时刻及账号自动删除截止时间。
// 用户上线时删除记录;用户全部平台离线时 upsert 记录。
// crontask 每小时扫描此集合,删除 DeleteUserDeadline <= now 的账号。
type UserOfflineRecord struct {
UserID string `bson:"user_id"`
OfflineTime time.Time `bson:"offline_time"`
// DeleteUserDeadline = OfflineTime + delete_account_interval
// 用户修改 delete_account_interval 时同步刷新此字段。
DeleteUserDeadline time.Time `bson:"delete_user_deadline"`
}
Loading…
Cancel
Save