diff --git a/go.mod b/go.mod index a1a51b4fa..f8437922c 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69-alpha.22 - github.com/openimsdk/tools v0.0.49-alpha.25 + github.com/openimsdk/tools v0.0.49-alpha.30 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index e67b30206..e05652d99 100644 --- a/go.sum +++ b/go.sum @@ -272,8 +272,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ= github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= -github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= +github.com/openimsdk/tools v0.0.49-alpha.30 h1:iT2+1F8cJmlwKEris25YgK0seiJRUear+wTgc1bzcg8= +github.com/openimsdk/tools v0.0.49-alpha.30/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 19a519d2e..e952175ad 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -286,10 +286,14 @@ func (t *thirdServer) apiAddress(prefix, name string) string { func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { expireTime := time.UnixMilli(req.ExpireTime) - err := t.s3dataBase.DeleteByExpires(ctx, expireTime) + models, err := t.s3dataBase.FindByExpires(ctx, expireTime) if err != nil { return nil, err } + for _, model := range models { + t.s3dataBase.DeleteObject(ctx, model.Key) + t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Key) + } return &third.DeleteOutdatedDataResp{}, nil } diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 5d83e2677..b1dce502b 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -39,7 +39,9 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) - DeleteByExpires(ctx context.Context, duration time.Time) error + FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) + DeleteObject(ctx context.Context, name string) error + DeleteSpecifiedData(ctx context.Context, engine string, name string) error } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { @@ -113,6 +115,13 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } -func (s *s3Database) DeleteByExpires(ctx context.Context, duration time.Time) error { - return s.db.DeleteByExpires(ctx, duration) +func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) { + return s.db.FindByExpires(ctx, duration) +} + +func (s *s3Database) DeleteObject(ctx context.Context, name string) error { + return s.s3.DeleteObject(ctx, name) +} +func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error { + return s.db.Delete(ctx, engine, name) } diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 689c869d5..a0abf7a7a 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -70,8 +70,8 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } -func (o *S3Mongo) DeleteByExpires(ctx context.Context, duration time.Time) error { - return mongoutil.DeleteMany(ctx, o.coll, bson.M{ +func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) { + return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{ "create_time": bson.M{"$lt": duration}, }) } diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 3752993d8..44329cbc4 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -25,5 +25,5 @@ type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error - DeleteByExpires(ctx context.Context, duration time.Time) error + FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) }