diff --git a/pkg/common/storage/database/mgo/seq_conversation_test.go b/pkg/common/storage/database/mgo/seq_conversation_test.go index abae2d1b1..e6466a1c6 100644 --- a/pkg/common/storage/database/mgo/seq_conversation_test.go +++ b/pkg/common/storage/database/mgo/seq_conversation_test.go @@ -15,15 +15,9 @@ func Result[V any](val V, err error) V { return val } -func Check(err error) { - if err != nil { - panic(err) - } -} - func TestName(t *testing.T) { cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - tmp, err := NewSeqMongo(cli.Database("openim_v3")) + tmp, err := NewSeqConversationMongo(cli.Database("openim_v3")) if err != nil { panic(err) } diff --git a/start-config.yml b/start-config.yml index a9c412b33..21436d7a9 100644 --- a/start-config.yml +++ b/start-config.yml @@ -14,4 +14,5 @@ serviceBinaries: toolBinaries: - check-free-memory - check-component + - seq maxFileDescriptors: 10000 diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go index 5cfd9fd45..54a7d477d 100644 --- a/tools/seq/internal/main.go +++ b/tools/seq/internal/main.go @@ -12,9 +12,11 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "gopkg.in/yaml.v3" "os" "path/filepath" @@ -23,7 +25,12 @@ import ( "time" ) -const batchSize = 5 +const ( + batchSize = 100 + dataVersionCollection = "data_version" + seqKey = "seq" + seqVersion = 38 +) func readConfig[T any](dir string, name string) (*T, error) { data, err := os.ReadFile(filepath.Join(dir, name)) @@ -37,7 +44,7 @@ func readConfig[T any](dir string, name string) (*T, error) { return &conf, nil } -func redisKey(rdb redis.UniversalClient, prefix string, fn func(ctx context.Context, key string, delKey map[string]struct{}) error) error { +func redisKey(rdb redis.UniversalClient, prefix string, del time.Duration, fn func(ctx context.Context, key string, delKey map[string]struct{}) error) error { var ( cursor uint64 keys []string @@ -58,9 +65,20 @@ func redisKey(rdb redis.UniversalClient, prefix string, fn func(ctx context.Cont } } if len(delKey) > 0 { - //if err := rdb.Del(ctx, datautil.Keys(delKey)...).Err(); err != nil { - // return err - //} + delKeys := datautil.Keys(delKey) + if del < time.Second { + if err := rdb.Del(ctx, datautil.Keys(delKey)...).Err(); err != nil { + return err + } + } else { + pipe := rdb.Pipeline() + for _, key := range delKeys { + pipe.Expire(ctx, key, del) + } + if _, err := pipe.Exec(ctx); err != nil { + return err + } + } } if cursor == 0 { return nil @@ -68,7 +86,7 @@ func redisKey(rdb redis.UniversalClient, prefix string, fn func(ctx context.Cont } } -func Main(conf string) error { +func Main(conf string, del time.Duration) error { redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName) if err != nil { return err @@ -87,12 +105,22 @@ func Main(conf string) error { if err != nil { return err } + versionColl := mgocli.GetDB().Collection(dataVersionCollection) + converted, err := CheckVersion(versionColl, seqKey, seqVersion) + if err != nil { + return err + } + if converted { + fmt.Println("[seq] seq data has been converted") + return nil + } if _, err := mgo.NewSeqConversationMongo(mgocli.GetDB()); err != nil { return err } coll := mgocli.GetDB().Collection(database.SeqConversationName) const prefix = cachekey.MaxSeq - return redisKey(rdb, prefix, func(ctx context.Context, key string, delKey map[string]struct{}) error { + fmt.Println("start to convert seq conversation") + err = redisKey(rdb, prefix, del, func(ctx context.Context, key string, delKey map[string]struct{}) error { conversationId := strings.TrimPrefix(key, prefix) delKey[key] = struct{}{} maxValue, err := rdb.Get(ctx, key).Result() @@ -149,4 +177,42 @@ func Main(conf string) error { return err } }) + if err != nil { + return err + } + fmt.Println("convert seq conversation success") + return SetVersion(versionColl, seqKey, seqVersion) +} + +func CheckVersion(coll *mongo.Collection, key string, currentVersion int) (converted bool, err error) { + type VersionTable struct { + Key string `bson:"key"` + Value string `bson:"value"` + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + res, err := mongoutil.FindOne[VersionTable](ctx, coll, bson.M{"key": key}) + if err == nil { + ver, err := strconv.Atoi(res.Value) + if err != nil { + return false, fmt.Errorf("version %s parse error %w", res.Value, err) + } + if ver >= currentVersion { + return true, nil + } + return false, nil + } else if errors.Is(err, mongo.ErrNoDocuments) { + return false, nil + } else { + return false, err + } +} + +func SetVersion(coll *mongo.Collection, key string, version int) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + option := options.Update().SetUpsert(true) + filter := bson.M{"key": key, "value": strconv.Itoa(version)} + update := bson.M{"$set": bson.M{"key": key, "value": strconv.Itoa(version)}} + return mongoutil.UpdateOne(ctx, coll, filter, update, false, option) } diff --git a/tools/seq/main.go b/tools/seq/main.go index ca5a043e7..6bb4b7657 100644 --- a/tools/seq/main.go +++ b/tools/seq/main.go @@ -4,13 +4,18 @@ import ( "flag" "fmt" "github.com/openimsdk/open-im-server/v3/tools/seq/internal" + "time" ) func main() { - var config string - flag.StringVar(&config, "redis", "/Users/chao/Desktop/project/open-im-server/config", "config directory") + var ( + config string + second int + ) + flag.StringVar(&config, "c", "", "config directory") + flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion") flag.Parse() - if err := internal.Main(config); err != nil { + if err := internal.Main(config, time.Duration(second)*time.Second); err != nil { fmt.Println("seq task", err) } }