diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index d2154d263..3839104a4 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,3 +1,3 @@ -chatRecordsClearTime: "0 2 * * *" +cronExecuteTime: "0 2 * * *" retainChatRecords: 365 fileExpireTime: 90 diff --git a/go.mod b/go.mod index f8437922c..49e5b50a4 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69-alpha.22 - github.com/openimsdk/tools v0.0.49-alpha.30 + github.com/openimsdk/tools v0.0.49-alpha.39 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index e05652d99..a7dd1d632 100644 --- a/go.sum +++ b/go.sum @@ -272,8 +272,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ= github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.30 h1:iT2+1F8cJmlwKEris25YgK0seiJRUear+wTgc1bzcg8= -github.com/openimsdk/tools v0.0.49-alpha.30/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU= +github.com/openimsdk/tools v0.0.49-alpha.39 h1:bl5+q7xHsc/j1NnkN8/gYmn23RsNNbRizDY58d2EY1w= +github.com/openimsdk/tools v0.0.49-alpha.39/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index e952175ad..99c21d5bb 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -24,6 +24,7 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/mongo" "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -287,12 +288,37 @@ func (t *thirdServer) apiAddress(prefix, name string) string { func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { expireTime := time.UnixMilli(req.ExpireTime) models, err := t.s3dataBase.FindByExpires(ctx, expireTime) - if err != nil { - return nil, err + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + needDelObjectKeys := make([]string, 0) + for _, model := range models { + needDelObjectKeys = append(needDelObjectKeys, model.Key) + } + + needDelObjectKeys = datautil.Distinct(needDelObjectKeys) + for _, key := range needDelObjectKeys { + count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + if int(count) < 1 { + thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key) + if err != nil { + return nil, errs.Wrap(err) + } + t.s3dataBase.DeleteObject(ctx, thumbnailKey) + t.s3dataBase.DelS3Key(ctx, "minio", needDelObjectKeys...) + + t.s3dataBase.DeleteObject(ctx, key) + } } + for _, model := range models { - t.s3dataBase.DeleteObject(ctx, model.Key) - t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Key) + err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + if err != nil { + return nil, errs.Wrap(err) + } } return &third.DeleteOutdatedDataResp{}, nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 28963dc63..dcdcf2f40 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -41,7 +41,7 @@ type CronTaskConfig struct { } func Start(ctx context.Context, config *CronTaskConfig) error { - log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords) + log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } @@ -68,29 +68,31 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil { return errs.Wrap(err) } + + tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return err + } + thirdClient := third.NewThirdClient(tConn) + deleteFunc := func() { now := time.Now() deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) - tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) - if err != nil { - return - } - thirdClient := third.NewThirdClient(tConn) if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) return } log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(string(config.CronTask.FileExpireTime), deleteFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil { return errs.Wrap(err) } - log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime) + log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() <-ctx.Done() return nil diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 7e5649987..3d49ff577 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -106,9 +106,9 @@ type API struct { } type CronTask struct { - ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"` - RetainChatRecords int `mapstructure:"retainChatRecords"` - FileExpireTime int `mapstructure:"fileExpireTime"` + CronExecuteTime string `mapstructure:"cronExecuteTime"` + RetainChatRecords int `mapstructure:"retainChatRecords"` + FileExpireTime int `mapstructure:"fileExpireTime"` } type OfflinePushConfig struct { diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index b1dce502b..f32d8803e 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -19,7 +19,7 @@ import ( "path/filepath" "time" - redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + redisCache "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -42,20 +42,25 @@ type S3Database interface { FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) DeleteObject(ctx context.Context, name string) error DeleteSpecifiedData(ctx context.Context, engine string, name string) error + FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) + DelS3Key(ctx context.Context, engine string, keys ...string) error + GetImageThumbnailKey(ctx context.Context, name string) (string, error) } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { return &s3Database{ - s3: cont.New(redis2.NewS3Cache(rdb, s3), s3), - cache: redis2.NewObjectCacheRedis(rdb, obj), - db: obj, + s3: cont.New(redisCache.NewS3Cache(rdb, s3), s3), + cache: redisCache.NewObjectCacheRedis(rdb, obj), + s3cache: redisCache.NewS3Cache(rdb, s3), + db: obj, } } type s3Database struct { - s3 *cont.Controller - cache cache.ObjectCache - db database.ObjectInfo + s3 *cont.Controller + cache cache.ObjectCache + s3cache cont.S3Cache + db database.ObjectInfo } func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) { @@ -125,3 +130,15 @@ func (s *s3Database) DeleteObject(ctx context.Context, name string) error { func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error { return s.db.Delete(ctx, engine, name) } + +func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { + return s.db.FindNotDelByS3(ctx, key, duration) +} + +func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { + return s.s3cache.DelS3Key(ctx, engine, keys...) +} + +func (s *s3Database) GetImageThumbnailKey(ctx context.Context, name string) (string, error) { + return s.s3.GetImageThumbnailKey(ctx, name) +} diff --git a/pkg/common/storage/controller/third.go b/pkg/common/storage/controller/third.go index 344501466..a9c2ae403 100644 --- a/pkg/common/storage/controller/third.go +++ b/pkg/common/storage/controller/third.go @@ -16,9 +16,10 @@ package controller import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/tools/db/pagination" diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index a0abf7a7a..3ae5b8ec8 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -75,3 +75,9 @@ func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*mod "create_time": bson.M{"$lt": duration}, }) } +func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { + return mongoutil.Count(ctx, o.coll, bson.M{ + "key": key, + "create_time": bson.M{"$gt": duration}, + }) +} diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 44329cbc4..4046da2f3 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -26,4 +26,5 @@ type ObjectInfo interface { Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) + FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) }