fix: improve crontask delete outdated Data.

pull/2901/head
Monet Lee 10 months ago
parent c353f557b7
commit 27fa932040

@ -1,3 +1,3 @@
cronExecuteTime: 0 2 * * * cronExecuteTime: 0 2 * * *
retainChatRecords: 365 retainChatRecords: 365
fileExpireTime: 90 fileExpireTime: 180

@ -15,7 +15,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.55 github.com/openimsdk/protocol v0.0.72-alpha.55
github.com/openimsdk/tools v0.0.50-alpha.32 github.com/openimsdk/tools v0.0.50-alpha.34
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0

@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.55 h1:9PPWPHvkFk3neBSbNr+IoOdKIFjxTvEqUfMK/TEq1+8= github.com/openimsdk/protocol v0.0.72-alpha.55 h1:9PPWPHvkFk3neBSbNr+IoOdKIFjxTvEqUfMK/TEq1+8=
github.com/openimsdk/protocol v0.0.72-alpha.55/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.72-alpha.55/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.32 h1:JEsUFHFnaYg230TG+Ke3SUnaA2h44t4kABAzEdv5VZw= github.com/openimsdk/tools v0.0.50-alpha.34 h1:qIWFHMMnzAw3CbsubEsJXECnUZG5V0BmLLz2XzCIwr8=
github.com/openimsdk/tools v0.0.50-alpha.32/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo= github.com/openimsdk/tools v0.0.50-alpha.34/go.mod h1:r5U6RbxcR4xhKb2fhTmKGC9Yt5LcErHBVt3lhXQIHSo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -291,46 +291,72 @@ func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteO
var conf config.Third var conf config.Third
expireTime := time.UnixMilli(req.ExpireTime) expireTime := time.UnixMilli(req.ExpireTime)
var deltotal int var deltotal int
excuteNum := 5
findPagination := &sdkws.RequestPagination{ findPagination := &sdkws.RequestPagination{
PageNumber: 1, PageNumber: 1,
ShowNumber: 1000, ShowNumber: 1000,
} }
for {
for i := 0; i < excuteNum; i++ {
// Find all expired data in S3 database
total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
needDelObjectKeys := make([]string, 0) needDelObjectKeys := make([]string, len(models))
for _, model := range models { for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key) needDelObjectKeys = append(needDelObjectKeys, model.Key)
} }
// Remove duplicate keys, have the same key use in different models
needDelObjectKeys = datautil.Distinct(needDelObjectKeys) needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
for _, key := range needDelObjectKeys { for _, key := range needDelObjectKeys {
count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) // Find all models by key
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
if int(count) < 1 && t.minio != nil {
// check keyModels, if all keyModels.
needDelKey := true // Default can delete
for _, model := range keyModels {
// If group is empty or CreateTime is after expireTime, can't delete this key
if model.Group == "" || model.CreateTime.After(expireTime) {
needDelKey = false
break
}
}
// If this object is not referenced by not expire data, delete it
if needDelKey && t.minio != nil {
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
t.s3dataBase.DeleteObject(ctx, thumbnailKey) t.s3dataBase.DeleteObject(ctx, thumbnailKey)
t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...)
t.s3dataBase.DeleteObject(ctx, key) t.s3dataBase.DeleteObject(ctx, key)
t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
} }
} }
for _, model := range models { for _, model := range models {
// Delete all expired data row in S3 database
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
} }
if total < int64(findPagination.ShowNumber) { if total < int64(findPagination.ShowNumber) {
break break
} }
deltotal += int(total) deltotal += int(total)
} }
log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal) log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal)
return &third.DeleteOutdatedDataResp{}, nil return &third.DeleteOutdatedDataResp{}, nil
} }

@ -24,6 +24,7 @@ import (
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
pbconversation "github.com/openimsdk/protocol/conversation" pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/mw"
@ -58,10 +59,10 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
return err return err
} }
// thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
// if err != nil { if err != nil {
// return err return err
// } }
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
if err != nil { if err != nil {
@ -70,7 +71,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
msgClient := msg.NewMsgClient(msgConn) msgClient := msg.NewMsgClient(msgConn)
conversationClient := pbconversation.NewConversationClient(conversationConn) conversationClient := pbconversation.NewConversationClient(conversationConn)
// thirdClient := third.NewThirdClient(thirdConn) thirdClient := third.NewThirdClient(thirdConn)
crontab := cron.New() crontab := cron.New()
@ -92,6 +93,13 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
} }
// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. // scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature.
// lose seq-user, but conversation still exists.
// seq是总长度 seq-user是用户能获取的范围
// 如果退群了 seq-user就固定 拉取不到最新的
// 群聊控制 seq 用户控制 seq-user
// 改动后需要写进去Conversation
msgDestructFunc := func() { msgDestructFunc := func() {
now := time.Now() now := time.Now()
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
@ -114,21 +122,21 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
return errs.Wrap(err) return errs.Wrap(err)
} }
// // scheduled delete outdated file Objects and their datas in specific time. // scheduled delete outdated file Objects and their datas in specific time.
// deleteObjectFunc := func() { deleteObjectFunc := func() {
// now := time.Now() now := time.Now()
// deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
// ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
// log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
// if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
// log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
// return return
// } }
// log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
// } }
// if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
// return errs.Wrap(err) return errs.Wrap(err)
// } }
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start() crontab.Start()

@ -43,7 +43,7 @@ type S3Database interface {
FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
DeleteObject(ctx context.Context, name string) error DeleteObject(ctx context.Context, name string) error
DeleteSpecifiedData(ctx context.Context, engine string, name string) error DeleteSpecifiedData(ctx context.Context, engine string, name string) error
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error)
DelS3Key(ctx context.Context, engine string, keys ...string) error DelS3Key(ctx context.Context, engine string, keys ...string) error
} }
@ -132,8 +132,8 @@ func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, nam
return s.db.Delete(ctx, engine, name) return s.db.Delete(ctx, engine, name)
} }
func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { func (s *s3Database) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) {
return s.db.FindNotDelByS3(ctx, key, duration) return s.db.FindModelsByKey(ctx, key)
} }
func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {

@ -31,6 +31,8 @@ import (
func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) { func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) {
coll := db.Collection(database.ObjectName) coll := db.Collection(database.ObjectName)
// Create index for name
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{ Keys: bson.D{
{Key: "name", Value: 1}, {Key: "name", Value: 1},
@ -40,6 +42,27 @@ func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) {
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
// Create index for create_time
_, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "create_time", Value: 1},
},
})
if err != nil {
return nil, errs.Wrap(err)
}
// Create index for key
_, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "key", Value: 1},
},
})
if err != nil {
return nil, errs.Wrap(err)
}
return &S3Mongo{coll: coll}, nil return &S3Mongo{coll: coll}, nil
} }
@ -71,14 +94,18 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
} }
// Find Expires object
func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{
"create_time": bson.M{"$lt": duration}, "create_time": bson.M{"$lt": duration},
"group": bson.M{"$ne": ""},
}, pagination) }, pagination)
} }
func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
return mongoutil.Count(ctx, o.coll, bson.M{ // Find object by key
"key": key, func (o *S3Mongo) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) {
"create_time": bson.M{"$gt": duration}, return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{
"key": key,
}) })
} }

@ -27,5 +27,5 @@ type ObjectInfo interface {
Take(ctx context.Context, engine string, name string) (*model.Object, error) Take(ctx context.Context, engine string, name string) (*model.Object, error)
Delete(ctx context.Context, engine string, name string) error Delete(ctx context.Context, engine string, name string) error
FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) FindModelsByKey(ctx context.Context, key string)( objects []*model.Object,err error)
} }

Loading…
Cancel
Save