feat: implement scheduled delete outdated object in minio.

pull/2393/head
Monet Lee 1 year ago
parent dac8fba11f
commit 330611796c

@ -1,3 +1,3 @@
chatRecordsClearTime: "0 2 * * *" cronExecuteTime: "0 2 * * *"
retainChatRecords: 365 retainChatRecords: 365
fileExpireTime: 90 fileExpireTime: 90

@ -14,7 +14,7 @@ require (
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.22 github.com/openimsdk/protocol v0.0.69-alpha.22
github.com/openimsdk/tools v0.0.49-alpha.30 github.com/openimsdk/tools v0.0.49-alpha.39
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0

@ -272,8 +272,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ= github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ=
github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.30 h1:iT2+1F8cJmlwKEris25YgK0seiJRUear+wTgc1bzcg8= github.com/openimsdk/tools v0.0.49-alpha.39 h1:bl5+q7xHsc/j1NnkN8/gYmn23RsNNbRizDY58d2EY1w=
github.com/openimsdk/tools v0.0.49-alpha.30/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU= github.com/openimsdk/tools v0.0.49-alpha.39/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -24,6 +24,7 @@ import (
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/mongo"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
@ -287,12 +288,37 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
expireTime := time.UnixMilli(req.ExpireTime) expireTime := time.UnixMilli(req.ExpireTime)
models, err := t.s3dataBase.FindByExpires(ctx, expireTime) models, err := t.s3dataBase.FindByExpires(ctx, expireTime)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}
needDelObjectKeys := make([]string, 0)
for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key)
}
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
for _, key := range needDelObjectKeys {
count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}
if int(count) < 1 {
thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key)
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err)
} }
t.s3dataBase.DeleteObject(ctx, thumbnailKey)
t.s3dataBase.DelS3Key(ctx, "minio", needDelObjectKeys...)
t.s3dataBase.DeleteObject(ctx, key)
}
}
for _, model := range models { for _, model := range models {
t.s3dataBase.DeleteObject(ctx, model.Key) err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Key) if err != nil {
return nil, errs.Wrap(err)
}
} }
return &third.DeleteOutdatedDataResp{}, nil return &third.DeleteOutdatedDataResp{}, nil
} }

@ -41,7 +41,7 @@ type CronTaskConfig struct {
} }
func Start(ctx context.Context, config *CronTaskConfig) error { func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords) log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 { if config.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap() return errs.New("msg destruct time must be greater than 1").Wrap()
} }
@ -68,29 +68,31 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
} }
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
} }
if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
if err != nil {
return err
}
thirdClient := third.NewThirdClient(tConn)
deleteFunc := func() { deleteFunc := func() {
now := time.Now() now := time.Now()
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
if err != nil {
return
}
thirdClient := third.NewThirdClient(tConn)
if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
return return
} }
log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
} }
if _, err := crontab.AddFunc(string(config.CronTask.FileExpireTime), deleteFunc); err != nil { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime) log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start() crontab.Start()
<-ctx.Done() <-ctx.Done()
return nil return nil

@ -106,7 +106,7 @@ type API struct {
} }
type CronTask struct { type CronTask struct {
ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"` CronExecuteTime string `mapstructure:"cronExecuteTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"` RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"` FileExpireTime int `mapstructure:"fileExpireTime"`
} }

@ -19,7 +19,7 @@ import (
"path/filepath" "path/filepath"
"time" "time"
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" redisCache "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -42,12 +42,16 @@ type S3Database interface {
FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
DeleteObject(ctx context.Context, name string) error DeleteObject(ctx context.Context, name string) error
DeleteSpecifiedData(ctx context.Context, engine string, name string) error DeleteSpecifiedData(ctx context.Context, engine string, name string) error
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
DelS3Key(ctx context.Context, engine string, keys ...string) error
GetImageThumbnailKey(ctx context.Context, name string) (string, error)
} }
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
return &s3Database{ return &s3Database{
s3: cont.New(redis2.NewS3Cache(rdb, s3), s3), s3: cont.New(redisCache.NewS3Cache(rdb, s3), s3),
cache: redis2.NewObjectCacheRedis(rdb, obj), cache: redisCache.NewObjectCacheRedis(rdb, obj),
s3cache: redisCache.NewS3Cache(rdb, s3),
db: obj, db: obj,
} }
} }
@ -55,6 +59,7 @@ func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.Obje
type s3Database struct { type s3Database struct {
s3 *cont.Controller s3 *cont.Controller
cache cache.ObjectCache cache cache.ObjectCache
s3cache cont.S3Cache
db database.ObjectInfo db database.ObjectInfo
} }
@ -125,3 +130,15 @@ func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error { func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error {
return s.db.Delete(ctx, engine, name) return s.db.Delete(ctx, engine, name)
} }
func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
return s.db.FindNotDelByS3(ctx, key, duration)
}
func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {
return s.s3cache.DelS3Key(ctx, engine, keys...)
}
func (s *s3Database) GetImageThumbnailKey(ctx context.Context, name string) (string, error) {
return s.s3.GetImageThumbnailKey(ctx, name)
}

@ -16,9 +16,10 @@ package controller
import ( import (
"context" "context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"

@ -75,3 +75,9 @@ func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*mod
"create_time": bson.M{"$lt": duration}, "create_time": bson.M{"$lt": duration},
}) })
} }
func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
return mongoutil.Count(ctx, o.coll, bson.M{
"key": key,
"create_time": bson.M{"$gt": duration},
})
}

@ -26,4 +26,5 @@ type ObjectInfo interface {
Take(ctx context.Context, engine string, name string) (*model.Object, error) Take(ctx context.Context, engine string, name string) (*model.Object, error)
Delete(ctx context.Context, engine string, name string) error Delete(ctx context.Context, engine string, name string) error
FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
} }

Loading…
Cancel
Save