From 2b2a75f19e47daee990d329757ab0a3d3e84fe2f Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 28 Jun 2024 15:29:03 +0800 Subject: [PATCH 1/2] implement minio expire delete. --- config/openim-crontask.yml | 1 + go.mod | 2 +- go.sum | 4 ++-- internal/rpc/third/s3.go | 12 ++++++++++- internal/tools/cron_task.go | 25 +++++++++++++++++++++-- pkg/common/config/config.go | 6 ++++-- pkg/common/storage/controller/s3.go | 9 ++++++-- pkg/common/storage/database/mgo/object.go | 7 +++++++ pkg/common/storage/database/object.go | 3 +++ pkg/rpcclient/third.go | 4 ++++ 10 files changed, 63 insertions(+), 10 deletions(-) diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index 9bbccfd25..d2154d263 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,2 +1,3 @@ chatRecordsClearTime: "0 2 * * *" retainChatRecords: 365 +fileExpireTime: 90 diff --git a/go.mod b/go.mod index 2614e0f32..a1a51b4fa 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.17 + github.com/openimsdk/protocol v0.0.69-alpha.22 github.com/openimsdk/tools v0.0.49-alpha.25 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index fe4f0c390..e67b30206 100644 --- a/go.sum +++ b/go.sum @@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M= -github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ= +github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 4cb1b81d0..19a519d2e 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,11 +19,12 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "path" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/protocol/third" @@ -283,6 +284,15 @@ func (t *thirdServer) apiAddress(prefix, name string) string { return prefix + name } +func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { + expireTime := time.UnixMilli(req.ExpireTime) + err := t.s3dataBase.DeleteByExpires(ctx, expireTime) + if err != nil { + return nil, err + } + return &third.DeleteOutdatedDataResp{}, nil +} + type FormDataMate struct { Name string `json:"name"` Size int64 `json:"size"` diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index bf037b694..28963dc63 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -17,15 +17,17 @@ package tools import ( "context" "fmt" + "os" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "os" - "time" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -69,6 +71,25 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil { return errs.Wrap(err) } + deleteFunc := func() { + now := time.Now() + deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) + ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) + log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return + } + thirdClient := third.NewThirdClient(tConn) + if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { + log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) + return + } + log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) + } + if _, err := crontab.AddFunc(string(config.CronTask.FileExpireTime), deleteFunc); err != nil { + return errs.Wrap(err) + } log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime) crontab.Start() <-ctx.Done() diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 6260dc00f..7e5649987 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -15,14 +15,15 @@ package config import ( + "strings" + "time" + "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/s3/oss" - "strings" - "time" ) type CacheConfig struct { @@ -107,6 +108,7 @@ type API struct { type CronTask struct { ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"` RetainChatRecords int `mapstructure:"retainChatRecords"` + FileExpireTime int `mapstructure:"fileExpireTime"` } type OfflinePushConfig struct { diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index b0ad61203..5d83e2677 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -16,11 +16,12 @@ package controller import ( "context" + "path/filepath" + "time" + redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "path/filepath" - "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/tools/s3" @@ -38,6 +39,7 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) + DeleteByExpires(ctx context.Context, duration time.Time) error } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { @@ -111,3 +113,6 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } +func (s *s3Database) DeleteByExpires(ctx context.Context, duration time.Time) error { + return s.db.DeleteByExpires(ctx, duration) +} diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index df4d10ec4..689c869d5 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -16,6 +16,8 @@ package mgo import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -68,3 +70,8 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } +func (o *S3Mongo) DeleteByExpires(ctx context.Context, duration time.Time) error { + return mongoutil.DeleteMany(ctx, o.coll, bson.M{ + "create_time": bson.M{"$lt": duration}, + }) +} diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 554f71f35..3752993d8 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -16,6 +16,8 @@ package database import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) @@ -23,4 +25,5 @@ type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error + DeleteByExpires(ctx context.Context, duration time.Time) error } diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index 4c71dff6a..7cdc60d52 100644 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -41,3 +41,7 @@ func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl } return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl} } +func (t *Third) DeleteOutdatedData(ctx context.Context, expires int64) error { + _, err := t.Client.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expires}) + return err +} From dac8fba11f325d03c481f9cf0528664b8d3944b2 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 28 Jun 2024 17:59:57 +0800 Subject: [PATCH 2/2] implement minio expire delete logic. --- go.mod | 2 +- go.sum | 4 ++-- internal/rpc/third/s3.go | 6 +++++- pkg/common/storage/controller/s3.go | 15 ++++++++++++--- pkg/common/storage/database/mgo/object.go | 4 ++-- pkg/common/storage/database/object.go | 2 +- 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index a1a51b4fa..f8437922c 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69-alpha.22 - github.com/openimsdk/tools v0.0.49-alpha.25 + github.com/openimsdk/tools v0.0.49-alpha.30 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index e67b30206..e05652d99 100644 --- a/go.sum +++ b/go.sum @@ -272,8 +272,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ= github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= -github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= +github.com/openimsdk/tools v0.0.49-alpha.30 h1:iT2+1F8cJmlwKEris25YgK0seiJRUear+wTgc1bzcg8= +github.com/openimsdk/tools v0.0.49-alpha.30/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 19a519d2e..e952175ad 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -286,10 +286,14 @@ func (t *thirdServer) apiAddress(prefix, name string) string { func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { expireTime := time.UnixMilli(req.ExpireTime) - err := t.s3dataBase.DeleteByExpires(ctx, expireTime) + models, err := t.s3dataBase.FindByExpires(ctx, expireTime) if err != nil { return nil, err } + for _, model := range models { + t.s3dataBase.DeleteObject(ctx, model.Key) + t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Key) + } return &third.DeleteOutdatedDataResp{}, nil } diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 5d83e2677..b1dce502b 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -39,7 +39,9 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) - DeleteByExpires(ctx context.Context, duration time.Time) error + FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) + DeleteObject(ctx context.Context, name string) error + DeleteSpecifiedData(ctx context.Context, engine string, name string) error } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { @@ -113,6 +115,13 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } -func (s *s3Database) DeleteByExpires(ctx context.Context, duration time.Time) error { - return s.db.DeleteByExpires(ctx, duration) +func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) { + return s.db.FindByExpires(ctx, duration) +} + +func (s *s3Database) DeleteObject(ctx context.Context, name string) error { + return s.s3.DeleteObject(ctx, name) +} +func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error { + return s.db.Delete(ctx, engine, name) } diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 689c869d5..a0abf7a7a 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -70,8 +70,8 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } -func (o *S3Mongo) DeleteByExpires(ctx context.Context, duration time.Time) error { - return mongoutil.DeleteMany(ctx, o.coll, bson.M{ +func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) { + return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{ "create_time": bson.M{"$lt": duration}, }) } diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 3752993d8..44329cbc4 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -25,5 +25,5 @@ type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error - DeleteByExpires(ctx context.Context, duration time.Time) error + FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) }