diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index c05bd2485..449b5c311 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,3 +1,3 @@ cronExecuteTime: 0 2 * * * retainChatRecords: 365 -fileExpireTime: 90 +fileExpireTime: 180 diff --git a/go.mod b/go.mod index 96c229166..8bd544d73 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.55 - github.com/openimsdk/tools v0.0.50-alpha.32 + github.com/openimsdk/tools v0.0.50-alpha.34 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 36f3d1615..e36ea8c6a 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.55 h1:9PPWPHvkFk3neBSbNr+IoOdKIFjxTvEqUfMK/TEq1+8= github.com/openimsdk/protocol v0.0.72-alpha.55/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.50-alpha.32 h1:JEsUFHFnaYg230TG+Ke3SUnaA2h44t4kABAzEdv5VZw= -github.com/openimsdk/tools v0.0.50-alpha.32/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo= +github.com/openimsdk/tools v0.0.50-alpha.34 h1:qIWFHMMnzAw3CbsubEsJXECnUZG5V0BmLLz2XzCIwr8= +github.com/openimsdk/tools v0.0.50-alpha.34/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index fb6a1157e..f9625d57f 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -291,46 +291,72 @@ func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteO var conf config.Third expireTime := time.UnixMilli(req.ExpireTime) var deltotal int + excuteNum := 5 + findPagination := &sdkws.RequestPagination{ PageNumber: 1, ShowNumber: 1000, } - for { + + for i := 0; i < excuteNum; i++ { + // Find all expired data in S3 database total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { return nil, errs.Wrap(err) } - needDelObjectKeys := make([]string, 0) + needDelObjectKeys := make([]string, len(models)) for _, model := range models { needDelObjectKeys = append(needDelObjectKeys, model.Key) } + // Remove duplicate keys, have the same key use in different models needDelObjectKeys = datautil.Distinct(needDelObjectKeys) + for _, key := range needDelObjectKeys { - count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) + // Find all models by key + keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key) if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { return nil, errs.Wrap(err) } - if int(count) < 1 && t.minio != nil { + + // check keyModels, if all keyModels. + needDelKey := true // Default can delete + for _, model := range keyModels { + // If group is empty or CreateTime is after expireTime, can't delete this key + if model.Group == "" || model.CreateTime.After(expireTime) { + needDelKey = false + break + } + } + + // If this object is not referenced by not expire data, delete it + if needDelKey && t.minio != nil { thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) t.s3dataBase.DeleteObject(ctx, thumbnailKey) - t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...) t.s3dataBase.DeleteObject(ctx, key) + + t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key) } } + for _, model := range models { + // Delete all expired data row in S3 database err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) if err != nil { return nil, errs.Wrap(err) } } + if total < int64(findPagination.ShowNumber) { break } + deltotal += int(total) } + log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal) + return &third.DeleteOutdatedDataResp{}, nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index dbb4e34f6..83982ca68 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -24,6 +24,7 @@ import ( kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" @@ -58,10 +59,10 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return err } - // thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) - // if err != nil { - // return err - // } + thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return err + } conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { @@ -70,7 +71,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { msgClient := msg.NewMsgClient(msgConn) conversationClient := pbconversation.NewConversationClient(conversationConn) - // thirdClient := third.NewThirdClient(thirdConn) + thirdClient := third.NewThirdClient(thirdConn) crontab := cron.New() @@ -92,6 +93,13 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } // scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. + // lose seq-user, but conversation still exists. + // seq是总长度 seq-user是用户能获取的范围 + // 如果退群了 seq-user就固定 拉取不到最新的 + + // 群聊控制 seq 用户控制 seq-user + + // 改动后需要写进去Conversation msgDestructFunc := func() { now := time.Now() ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) @@ -114,21 +122,21 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return errs.Wrap(err) } - // // scheduled delete outdated file Objects and their datas in specific time. - // deleteObjectFunc := func() { - // now := time.Now() - // deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) - // ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) - // log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) - // if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { - // log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) - // return - // } - // log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) - // } - // if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { - // return errs.Wrap(err) - // } + // scheduled delete outdated file Objects and their datas in specific time. + deleteObjectFunc := func() { + now := time.Now() + deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) + ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) + log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { + log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) + return + } + log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) + } + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { + return errs.Wrap(err) + } log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 9b56661a5..a31bac277 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -43,7 +43,7 @@ type S3Database interface { FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) DeleteObject(ctx context.Context, name string) error DeleteSpecifiedData(ctx context.Context, engine string, name string) error - FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) + FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) DelS3Key(ctx context.Context, engine string, keys ...string) error } @@ -132,8 +132,8 @@ func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, nam return s.db.Delete(ctx, engine, name) } -func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { - return s.db.FindNotDelByS3(ctx, key, duration) +func (s *s3Database) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) { + return s.db.FindModelsByKey(ctx, key) } func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 4242fbb53..3096ca2fa 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -31,6 +31,8 @@ import ( func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) { coll := db.Collection(database.ObjectName) + + // Create index for name _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "name", Value: 1}, @@ -40,6 +42,27 @@ func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) { if err != nil { return nil, errs.Wrap(err) } + + // Create index for create_time + _, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "create_time", Value: 1}, + }, + }) + if err != nil { + return nil, errs.Wrap(err) + } + + // Create index for key + _, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "key", Value: 1}, + }, + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &S3Mongo{coll: coll}, nil } @@ -71,14 +94,18 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } + +// Find Expires object func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ "create_time": bson.M{"$lt": duration}, + "group": bson.M{"$ne": ""}, }, pagination) } -func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { - return mongoutil.Count(ctx, o.coll, bson.M{ - "key": key, - "create_time": bson.M{"$gt": duration}, + +// Find object by key +func (o *S3Mongo) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) { + return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{ + "key": key, }) } diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 8292006a0..0486ee00f 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -27,5 +27,5 @@ type ObjectInfo interface { Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) - FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) + FindModelsByKey(ctx context.Context, key string)( objects []*model.Object,err error) }