diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 99c21d5bb..c419cfc8c 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -23,6 +23,8 @@ import ( "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/common" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "go.mongodb.org/mongo-driver/mongo" @@ -286,39 +288,48 @@ func (t *thirdServer) apiAddress(prefix, name string) string { } func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { + var conf config.Third expireTime := time.UnixMilli(req.ExpireTime) - models, err := t.s3dataBase.FindByExpires(ctx, expireTime) - if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { - return nil, errs.Wrap(err) + findPagination := &common.FindPagination{ + PageNumber: 1, + ShowNumber: 1000, } - needDelObjectKeys := make([]string, 0) - for _, model := range models { - needDelObjectKeys = append(needDelObjectKeys, model.Key) - } - - needDelObjectKeys = datautil.Distinct(needDelObjectKeys) - for _, key := range needDelObjectKeys { - count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) + for { + total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { return nil, errs.Wrap(err) } - if int(count) < 1 { - thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key) + needDelObjectKeys := make([]string, 0) + for _, model := range models { + needDelObjectKeys = append(needDelObjectKeys, model.Key) + } + + needDelObjectKeys = datautil.Distinct(needDelObjectKeys) + for _, key := range needDelObjectKeys { + count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + if int(count) < 1 { + thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key) + if err != nil { + return nil, errs.Wrap(err) + } + t.s3dataBase.DeleteObject(ctx, thumbnailKey) + t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...) + t.s3dataBase.DeleteObject(ctx, key) + } + } + for _, model := range models { + err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) if err != nil { return nil, errs.Wrap(err) } - t.s3dataBase.DeleteObject(ctx, thumbnailKey) - t.s3dataBase.DelS3Key(ctx, "minio", needDelObjectKeys...) - - t.s3dataBase.DeleteObject(ctx, key) } - } - - for _, model := range models { - err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) - if err != nil { - return nil, errs.Wrap(err) + if total < int64(findPagination.ShowNumber) { + break } + findPagination.PageNumber++ } return &third.DeleteOutdatedDataResp{}, nil } diff --git a/pkg/common/storage/common/types.go b/pkg/common/storage/common/types.go index 759121158..a19de205e 100644 --- a/pkg/common/storage/common/types.go +++ b/pkg/common/storage/common/types.go @@ -24,3 +24,15 @@ type GroupSimpleUserID struct { Hash uint64 MemberNum uint32 } + +type FindPagination struct { + PageNumber int32 + ShowNumber int32 +} + +func (f *FindPagination) GetPageNumber() int32 { + return f.PageNumber +} +func (f *FindPagination) GetShowNumber() int32 { + return f.ShowNumber +} diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index f32d8803e..592659536 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -24,6 +24,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/s3" "github.com/openimsdk/tools/s3/cont" "github.com/redis/go-redis/v9" @@ -39,7 +40,7 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) - FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) + FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) DeleteObject(ctx context.Context, name string) error DeleteSpecifiedData(ctx context.Context, engine string, name string) error FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) @@ -120,8 +121,9 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } -func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) { - return s.db.FindByExpires(ctx, duration) +func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + + return s.db.FindByExpires(ctx, duration, pagination) } func (s *s3Database) DeleteObject(ctx context.Context, name string) error { diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 3ae5b8ec8..4242fbb53 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -22,6 +22,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -70,10 +71,10 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } -func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) { - return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{ +func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ "create_time": bson.M{"$lt": duration}, - }) + }, pagination) } func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { return mongoutil.Count(ctx, o.coll, bson.M{ diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 4046da2f3..8292006a0 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -19,12 +19,13 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" ) type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error - FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) + FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) }