diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index 9bbccfd25..3839104a4 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,2 +1,3 @@ -chatRecordsClearTime: "0 2 * * *" +cronExecuteTime: "0 2 * * *" retainChatRecords: 365 +fileExpireTime: 90 diff --git a/go.mod b/go.mod index 2111208f2..6302453cf 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.17 + github.com/openimsdk/protocol v0.0.69-alpha.22 github.com/openimsdk/tools v0.0.49-alpha.45 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 @@ -175,5 +175,3 @@ require ( golang.org/x/crypto v0.21.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) - -//replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol diff --git a/go.sum b/go.sum index 5bc9f6d98..f452bcd87 100644 --- a/go.sum +++ b/go.sum @@ -200,6 +200,8 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kelindar/bitmap v1.5.2 h1:XwX7CTvJtetQZ64zrOkApoZZHBJRkjE23NfqUALA/HE= @@ -220,6 +222,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= +github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4= +github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA= +github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ= +github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVkM= github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= @@ -262,10 +270,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M= -github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0= -github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8= +github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ= +github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/tools v0.0.49-alpha.39 h1:bl5+q7xHsc/j1NnkN8/gYmn23RsNNbRizDY58d2EY1w= +github.com/openimsdk/tools v0.0.49-alpha.39/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -519,8 +527,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= -gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/api/friend.go b/internal/api/friend.go index 11d7375fa..9266fe75c 100644 --- a/internal/api/friend.go +++ b/internal/api/friend.go @@ -57,7 +57,6 @@ func (o *FriendApi) GetFriendList(c *gin.Context) { func (o *FriendApi) GetDesignatedFriends(c *gin.Context) { a2r.Call(relation.FriendClient.GetDesignatedFriends, o.Client, c) - //a2r.Call(relation.FriendClient.GetDesignatedFriends, o.Client, c, a2r.NewNilReplaceOption(relation.FriendClient.GetDesignatedFriends)) } func (o *FriendApi) SetFriendRemark(c *gin.Context) { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 65d04f381..c17abbc30 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -82,12 +82,21 @@ func Start(ctx context.Context, index int, config *Config) error { client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) msgModel := redis.NewMsgCache(rdb) - seqModel := redis.NewSeqCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err } - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) + seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) if err != nil { return err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index f1fb28fff..de0f698ea 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -86,12 +86,21 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } msgModel := redis.NewMsgCache(rdb) - seqModel := redis.NewSeqCache(rdb) conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) + seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) if err != nil { return err } diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 4cb1b81d0..54cbc30c1 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,13 +19,17 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "path" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/mongo" + "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -283,6 +287,52 @@ func (t *thirdServer) apiAddress(prefix, name string) string { return prefix + name } +func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { + var conf config.Third + expireTime := time.UnixMilli(req.ExpireTime) + findPagination := &sdkws.RequestPagination{ + PageNumber: 1, + ShowNumber: 1000, + } + for { + total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + needDelObjectKeys := make([]string, 0) + for _, model := range models { + needDelObjectKeys = append(needDelObjectKeys, model.Key) + } + + needDelObjectKeys = datautil.Distinct(needDelObjectKeys) + for _, key := range needDelObjectKeys { + count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + if int(count) < 1 { + thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key) + if err != nil { + return nil, errs.Wrap(err) + } + t.s3dataBase.DeleteObject(ctx, thumbnailKey) + t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...) + t.s3dataBase.DeleteObject(ctx, key) + } + } + for _, model := range models { + err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + if err != nil { + return nil, errs.Wrap(err) + } + } + if total < int64(findPagination.ShowNumber) { + break + } + } + return &third.DeleteOutdatedDataResp{}, nil +} + type FormDataMate struct { Name string `json:"name"` Size int64 `json:"size"` diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 211b360b7..2fea232b4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -60,6 +60,11 @@ type userServer struct { webhookClient *webhook.Client } +func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) { + //TODO implement me + panic("implement me") +} + type Config struct { RpcConfig config.User RedisConfig config.Redis diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index bf037b694..dcdcf2f40 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -17,15 +17,17 @@ package tools import ( "context" "fmt" + "os" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "os" - "time" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -39,7 +41,7 @@ type CronTaskConfig struct { } func Start(ctx context.Context, config *CronTaskConfig) error { - log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords) + log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } @@ -66,10 +68,31 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil { + return errs.Wrap(err) + } + + tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return err + } + thirdClient := third.NewThirdClient(tConn) + + deleteFunc := func() { + now := time.Now() + deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) + ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) + log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { + log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) + return + } + log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) + } + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil { return errs.Wrap(err) } - log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime) + log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() <-ctx.Done() return nil diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index 6ea6a29ed..f4deb9fb1 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -15,7 +15,7 @@ package apistruct import ( - sdkws "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/protocol/sdkws" ) // SendMsg defines the structure for sending messages with various metadata. diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 536e4d547..a6c380e9f 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -15,14 +15,15 @@ package config import ( + "strings" + "time" + "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/s3/oss" - "strings" - "time" ) type CacheConfig struct { @@ -108,8 +109,9 @@ type API struct { } type CronTask struct { - ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"` - RetainChatRecords int `mapstructure:"retainChatRecords"` + CronExecuteTime string `mapstructure:"cronExecuteTime"` + RetainChatRecords int `mapstructure:"retainChatRecords"` + FileExpireTime int `mapstructure:"fileExpireTime"` } type OfflinePushConfig struct { diff --git a/pkg/common/storage/cache/cachekey/seq.go b/pkg/common/storage/cache/cachekey/seq.go index 3f0ce98a4..b32e78300 100644 --- a/pkg/common/storage/cache/cachekey/seq.go +++ b/pkg/common/storage/cache/cachekey/seq.go @@ -1,38 +1,30 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package cachekey const ( - maxSeq = "MAX_SEQ:" - minSeq = "MIN_SEQ:" - conversationUserMinSeq = "CON_USER_MIN_SEQ:" - hasReadSeq = "HAS_READ_SEQ:" + MallocSeq = "MALLOC_SEQ:" + MallocMinSeqLock = "MALLOC_MIN_SEQ:" + + SeqUserMaxSeq = "SEQ_USER_MAX:" + SeqUserMinSeq = "SEQ_USER_MIN:" + SeqUserReadSeq = "SEQ_USER_READ:" ) -func GetMaxSeqKey(conversationID string) string { - return maxSeq + conversationID +func GetMallocSeqKey(conversationID string) string { + return MallocSeq + conversationID +} + +func GetMallocMinSeqKey(conversationID string) string { + return MallocMinSeqLock + conversationID } -func GetMinSeqKey(conversationID string) string { - return minSeq + conversationID +func GetSeqUserMaxSeqKey(conversationID string, userID string) string { + return SeqUserMaxSeq + conversationID + ":" + userID } -func GetHasReadSeqKey(conversationID string, userID string) string { - return hasReadSeq + userID + ":" + conversationID +func GetSeqUserMinSeqKey(conversationID string, userID string) string { + return SeqUserMinSeq + conversationID + ":" + userID } -func GetConversationUserMinSeqKey(conversationID, userID string) string { - return conversationUserMinSeq + conversationID + "u:" + userID +func GetSeqUserReadSeqKey(conversationID string, userID string) string { + return SeqUserReadSeq + conversationID + ":" + userID } diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go deleted file mode 100644 index 09ad5b609..000000000 --- a/pkg/common/storage/cache/redis/seq.go +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redis - -import ( - "context" - "errors" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/stringutil" - "github.com/redis/go-redis/v9" - "sync" -) - -func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache { - return &seqCache{rdb: rdb} -} - -type seqCache struct { - rdb redis.UniversalClient -} - -func (c *seqCache) getMaxSeqKey(conversationID string) string { - return cachekey.GetMaxSeqKey(conversationID) -} - -func (c *seqCache) getMinSeqKey(conversationID string) string { - return cachekey.GetMinSeqKey(conversationID) -} - -func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string { - return cachekey.GetHasReadSeqKey(conversationID, userID) -} - -func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string { - return cachekey.GetConversationUserMinSeqKey(conversationID, userID) -} - -func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { - return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) -} - -func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { - val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64() - if err != nil { - return 0, errs.Wrap(err) - } - return val, nil -} - -func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { - m = make(map[string]int64, len(items)) - var ( - reverseMap = make(map[string]string, len(items)) - keys = make([]string, len(items)) - lock sync.Mutex - ) - - for i, v := range items { - keys[i] = getkey(v) - reverseMap[getkey(v)] = v - } - - manager := NewRedisShardManager(c.rdb) - if err = manager.ProcessKeysBySlot(ctx, keys, func(ctx context.Context, _ int64, keys []string) error { - res, err := c.rdb.MGet(ctx, keys...).Result() - if err != nil && !errors.Is(err, redis.Nil) { - return errs.Wrap(err) - } - - // len(res) <= len(items) - for i := range res { - strRes, ok := res[i].(string) - if !ok { - continue - } - val := stringutil.StringToInt64(strRes) - if val != 0 { - lock.Lock() - m[reverseMap[keys[i]]] = val - lock.Unlock() - } - } - return nil - }); err != nil { - return nil, err - } - - return m, nil -} - -func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) -} - -func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { - return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) -} - -func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMaxSeqKey) -} - -func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) -} - -func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { - for conversationID, seq := range seqs { - if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { - return errs.Wrap(err) - } - } - return nil -} - -func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { - return c.setSeqs(ctx, seqs, c.getMinSeqKey) -} - -func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) -} - -func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMinSeqKey) -} - -func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { - val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() - if err != nil { - return 0, errs.Wrap(err) - } - return val, nil -} - -func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { - return c.getSeqs(ctx, userIDs, func(userID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) -} - -func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { - return c.setSeqs(ctx, seqs, func(userID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) { - return c.setSeqs(ctx, seqs, func(conversationID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) -} - -func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error { - return c.setSeqs(ctx, hasReadSeqs, func(userID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { - return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { - return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64() - if err != nil { - return 0, err - } - return val, nil -} diff --git a/pkg/common/storage/cache/redis/seq_conversation.go b/pkg/common/storage/cache/redis/seq_conversation.go new file mode 100644 index 000000000..034462fd1 --- /dev/null +++ b/pkg/common/storage/cache/redis/seq_conversation.go @@ -0,0 +1,255 @@ +package redis + +import ( + "context" + "fmt" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/redis/go-redis/v9" + "time" +) + +func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache { + return &seqConversationCacheRedis{ + rdb: rdb, + mgo: mgo, + lockTime: time.Second * 3, + dataTime: time.Hour * 24 * 365, + minSeqExpireTime: time.Hour, + rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + } +} + +type seqConversationCacheRedis struct { + rdb redis.UniversalClient + mgo database.SeqConversation + rocks *rockscache.Client + lockTime time.Duration + dataTime time.Duration + minSeqExpireTime time.Duration +} + +func (s *seqConversationCacheRedis) getMinSeqKey(conversationID string) string { + return cachekey.GetMallocMinSeqKey(conversationID) +} + +func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { + if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil { + return err + } + if err := s.rocks.TagAsDeleted2(ctx, s.getMinSeqKey(conversationID)); err != nil { + return errs.Wrap(err) + } + return nil +} + +func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMinSeq(ctx, conversationID) + }) +} + +func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) string { + return cachekey.GetMallocSeqKey(conversationID) +} + +func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) { + if lastSeq < currSeq { + return 0, errs.New("lastSeq must be greater than currSeq") + } + // 0: success + // 1: success the lock has expired, but has not been locked by anyone else + // 2: already locked, but not by yourself + script := ` +local key = KEYS[1] +local lockValue = ARGV[1] +local dataSecond = ARGV[2] +local curr_seq = tonumber(ARGV[3]) +local last_seq = tonumber(ARGV[4]) +if redis.call("EXISTS", key) == 0 then + redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq) + redis.call("EXPIRE", key, dataSecond) + return 1 +end +if redis.call("HGET", key, "LOCK") ~= lockValue then + return 2 +end +redis.call("HDEL", key, "LOCK") +redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq) +redis.call("EXPIRE", key, dataSecond) +return 0 +` + result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq).Int64() + if err != nil { + return 0, errs.Wrap(err) + } + return result, nil +} + +// malloc size=0 is to get the current seq size>0 is to allocate seq +func (s *seqConversationCacheRedis) malloc(ctx context.Context, key string, size int64) ([]int64, error) { + // 0: success + // 1: need to obtain and lock + // 2: already locked + // 3: exceeded the maximum value and locked + script := ` +local key = KEYS[1] +local size = tonumber(ARGV[1]) +local lockSecond = ARGV[2] +local dataSecond = ARGV[3] +local result = {} +if redis.call("EXISTS", key) == 0 then + local lockValue = math.random(0, 999999999) + redis.call("HSET", key, "LOCK", lockValue) + redis.call("EXPIRE", key, lockSecond) + table.insert(result, 1) + table.insert(result, lockValue) + return result +end +if redis.call("HEXISTS", key, "LOCK") == 1 then + table.insert(result, 2) + return result +end +local curr_seq = tonumber(redis.call("HGET", key, "CURR")) +local last_seq = tonumber(redis.call("HGET", key, "LAST")) +if size == 0 then + redis.call("EXPIRE", key, dataSecond) + table.insert(result, 0) + table.insert(result, curr_seq) + table.insert(result, last_seq) + return result +end +local max_seq = curr_seq + size +if max_seq > last_seq then + local lockValue = math.random(0, 999999999) + redis.call("HSET", key, "LOCK", lockValue) + redis.call("HSET", key, "CURR", last_seq) + redis.call("EXPIRE", key, lockSecond) + table.insert(result, 3) + table.insert(result, curr_seq) + table.insert(result, last_seq) + table.insert(result, lockValue) + return result +end +redis.call("HSET", key, "CURR", max_seq) +redis.call("EXPIRE", key, dataSecond) +table.insert(result, 0) +table.insert(result, curr_seq) +table.insert(result, last_seq) +return result +` + result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second)).Int64Slice() + if err != nil { + return nil, errs.Wrap(err) + } + return result, nil +} + +func (s *seqConversationCacheRedis) wait(ctx context.Context) error { + timer := time.NewTimer(time.Second / 4) + defer timer.Stop() + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) { + for i := 0; i < 10; i++ { + state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq) + if err != nil { + log.ZError(ctx, "set seq cache failed", err, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq, "count", i+1) + if err := s.wait(ctx); err != nil { + return + } + continue + } + switch state { + case 0: // ideal state + case 1: + log.ZWarn(ctx, "set seq cache lock not found", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq) + case 2: + log.ZWarn(ctx, "set seq cache lock to be held by someone else", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq) + default: + log.ZError(ctx, "set seq cache lock unknown state", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq) + } + return + } + log.ZError(ctx, "set seq cache retrying still failed", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq) +} + +func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size int64) int64 { + if size == 0 { + return 0 + } + var basicSize int64 + if msgprocessor.IsGroupConversationID(conversationID) { + basicSize = 100 + } else { + basicSize = 50 + } + basicSize += size + return basicSize +} + +func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { + if size < 0 { + return 0, errs.New("size must be greater than 0") + } + key := s.getSeqMallocKey(conversationID) + for i := 0; i < 10; i++ { + states, err := s.malloc(ctx, key, size) + if err != nil { + return 0, err + } + switch states[0] { + case 0: // success + return states[1], nil + case 1: // not found + mallocSize := s.getMallocSize(conversationID, size) + seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) + if err != nil { + return 0, err + } + s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) + return seq, nil + case 2: // locked + if err := s.wait(ctx); err != nil { + return 0, err + } + continue + case 3: // exceeded cache max value + currSeq := states[1] + lastSeq := states[2] + mallocSize := s.getMallocSize(conversationID, size) + seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) + if err != nil { + return 0, err + } + if lastSeq == seq { + s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize) + return currSeq, nil + } else { + log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq) + s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize) + return seq, nil + } + default: + log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size) + return 0, errs.New(fmt.Sprintf("unknown state: %d", states[0])) + } + } + log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size) + return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) +} + +func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { + return s.Malloc(ctx, conversationID, 0) +} diff --git a/pkg/common/storage/cache/redis/seq_conversation_test.go b/pkg/common/storage/cache/redis/seq_conversation_test.go new file mode 100644 index 000000000..1a40624b8 --- /dev/null +++ b/pkg/common/storage/cache/redis/seq_conversation_test.go @@ -0,0 +1,109 @@ +package redis + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" +) + +func newTestSeq() *seqConversationCacheRedis { + mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) + if err != nil { + panic(err) + } + model, err := mgo.NewSeqConversationMongo(mgocli.Database("openim_v3")) + if err != nil { + panic(err) + } + opt := &redis.Options{ + Addr: "172.16.8.48:16379", + Password: "openIM123", + DB: 1, + } + rdb := redis.NewClient(opt) + if err := rdb.Ping(context.Background()).Err(); err != nil { + panic(err) + } + return NewSeqConversationCacheRedis(rdb, model).(*seqConversationCacheRedis) +} + +func TestSeq(t *testing.T) { + ts := newTestSeq() + var ( + wg sync.WaitGroup + speed atomic.Int64 + ) + + const count = 128 + wg.Add(count) + for i := 0; i < count; i++ { + index := i + 1 + go func() { + defer wg.Done() + var size int64 = 10 + cID := strconv.Itoa(index * 1) + for i := 1; ; i++ { + //first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo + first, err := ts.Malloc(context.Background(), cID, size) // redis + if err != nil { + t.Logf("[%d-%d] %s %s", index, i, cID, err) + return + } + speed.Add(size) + _ = first + //t.Logf("[%d] %d -> %d", i, first+1, first+size) + } + }() + } + + done := make(chan struct{}) + + go func() { + wg.Wait() + close(done) + }() + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-done: + ticker.Stop() + return + case <-ticker.C: + value := speed.Swap(0) + t.Logf("speed: %d/s", value) + } + } +} + +func TestDel(t *testing.T) { + ts := newTestSeq() + for i := 1; i < 100; i++ { + var size int64 = 100 + first, err := ts.Malloc(context.Background(), "100", size) + if err != nil { + t.Logf("[%d] %s", i, err) + return + } + t.Logf("[%d] %d -> %d", i, first+1, first+size) + time.Sleep(time.Second) + } +} + +func TestSeqMalloc(t *testing.T) { + ts := newTestSeq() + t.Log(ts.GetMaxSeq(context.Background(), "100")) +} + +func TestMinSeq(t *testing.T) { + ts := newTestSeq() + t.Log(ts.GetMinSeq(context.Background(), "10000000")) +} diff --git a/pkg/common/storage/cache/redis/seq_user.go b/pkg/common/storage/cache/redis/seq_user.go new file mode 100644 index 000000000..3c533cdd8 --- /dev/null +++ b/pkg/common/storage/cache/redis/seq_user.go @@ -0,0 +1,89 @@ +package redis + +import ( + "context" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/tools/errs" + "github.com/redis/go-redis/v9" + "strconv" + "time" +) + +func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser { + return &seqUserCacheRedis{ + rdb: rdb, + mgo: mgo, + readSeqWriteRatio: 100, + expireTime: time.Hour * 24 * 7, + readExpireTime: time.Hour * 24 * 30, + rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + } +} + +type seqUserCacheRedis struct { + rdb redis.UniversalClient + mgo database.SeqUser + rocks *rockscache.Client + expireTime time.Duration + readExpireTime time.Duration + readSeqWriteRatio int64 +} + +func (s *seqUserCacheRedis) getSeqUserMaxSeqKey(conversationID string, userID string) string { + return cachekey.GetSeqUserMaxSeqKey(conversationID, userID) +} + +func (s *seqUserCacheRedis) getSeqUserMinSeqKey(conversationID string, userID string) string { + return cachekey.GetSeqUserMinSeqKey(conversationID, userID) +} + +func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID string) string { + return cachekey.GetSeqUserReadSeqKey(conversationID, userID) +} + +func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMaxSeq(ctx, conversationID, userID) + }) +} + +func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID)) +} + +func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMaxSeq(ctx, conversationID, userID) + }) +} + +func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMinSeqKey(conversationID, userID)) +} + +func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMaxSeq(ctx, conversationID, userID) + }) +} + +func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + if seq%s.readSeqWriteRatio == 0 { + if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { + return errs.Wrap(err) + } + return nil +} diff --git a/pkg/common/storage/cache/seq.go b/pkg/common/storage/cache/seq.go deleted file mode 100644 index 091b318c8..000000000 --- a/pkg/common/storage/cache/seq.go +++ /dev/null @@ -1,30 +0,0 @@ -package cache - -import ( - "context" -) - -type SeqCache interface { - SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error - GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error - SetMinSeqs(ctx context.Context, seqs map[string]int64) error - GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMinSeq(ctx context.Context, conversationID string) (int64, error) - GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) - GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) - SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error - // seqs map: key userID value minSeq - SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) - // seqs map: key conversationID value minSeq - SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error - // has read seq - SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error - // k: user, v: seq - SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error - // k: conversation, v :seq - UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error - GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) - GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) -} diff --git a/pkg/common/storage/cache/seq_conversation.go b/pkg/common/storage/cache/seq_conversation.go new file mode 100644 index 000000000..5d38537a9 --- /dev/null +++ b/pkg/common/storage/cache/seq_conversation.go @@ -0,0 +1,10 @@ +package cache + +import "context" + +type SeqConversationCache interface { + Malloc(ctx context.Context, conversationID string, size int64) (int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, seq int64) error + GetMinSeq(ctx context.Context, conversationID string) (int64, error) +} diff --git a/pkg/common/storage/cache/seq_user.go b/pkg/common/storage/cache/seq_user.go new file mode 100644 index 000000000..9e68399a7 --- /dev/null +++ b/pkg/common/storage/cache/seq_user.go @@ -0,0 +1,12 @@ +package cache + +import "context" + +type SeqUser interface { + GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 8eb9e8e6f..1e7b5c229 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -69,26 +69,26 @@ type CommonMsgDatabase interface { DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error - SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error + //SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error + //SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error SetMinSeqs(ctx context.Context, seqs map[string]int64) error - GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMinSeq(ctx context.Context, conversationID string) (int64, error) - GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) - GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) - SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error - SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) + //GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + //GetMinSeq(ctx context.Context, conversationID string) (int64, error) + //GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + //GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) + //SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error + //SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error - GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) - GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) + //GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) + //GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) @@ -108,7 +108,7 @@ type CommonMsgDatabase interface { DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) } -func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { +func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) if err != nil { return nil, err @@ -128,29 +128,20 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cach return &commonMsgDatabase{ msgDocDatabase: msgDocModel, msg: msg, - seq: seq, + seqUser: seqUser, + seqConversation: seqConversation, producer: producerToRedis, producerToMongo: producerToMongo, producerToPush: producerToPush, }, nil } -//func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) { -// msgDocModel, err := database.NewMsgMongo(database) -// if err != nil { -// return nil, err -// } -// //todo MsgCacheTimeout -// msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) -// seq := cache.NewSeqCache(rdb) -// return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig) -//} - type commonMsgDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel msg cache.MsgCache - seq cache.SeqCache + seqConversation cache.SeqConversationCache + seqUser cache.SeqUser producer *kafka.Producer producerToMongo *kafka.Producer producerToPush *kafka.Producer @@ -348,12 +339,16 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) } -func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { - currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { - log.ZError(ctx, "storage.seq.GetMaxSeq", err) - return 0, false, err +func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { + for userID, seq := range userSeqMap { + if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil { + return err + } } + return nil +} + +func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { lenList := len(msgs) if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() @@ -361,9 +356,12 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa if lenList < 1 { return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() } - if errs.Unwrap(err) == redis.Nil { - isNew = true + currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) + if err != nil { + log.ZError(ctx, "storage.seq.Malloc", err) + return 0, false, err } + isNew = currentMaxSeq == 0 lastMaxSeq := currentMaxSeq userSeqMap := make(map[string]int64) for _, m := range msgs { @@ -379,14 +377,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } else { prommetrics.MsgInsertRedisSuccessCounter.Inc() } - - err = db.seq.SetMaxSeq(ctx, conversationID, currentMaxSeq) - if err != nil { - log.ZError(ctx, "storage.seq.SetMaxSeq error", err, "conversationID", conversationID) - prommetrics.SeqSetFailedCounter.Inc() - } - - err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap) + err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) if err != nil { log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() @@ -514,12 +505,12 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin // "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group. // This ensures that their message retrieval starts from the point they joined. func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { - userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) + userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } - minSeq, err := db.seq.GetMinSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } if userMinSeq > minSeq { @@ -530,8 +521,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end) return 0, 0, nil, nil } - maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq) @@ -571,11 +562,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin var successMsgs []*sdkws.MsgData log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) - if err != nil { - if err != redis.Nil { - - log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) - } + if err != nil && !errors.Is(err, redis.Nil) { + log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) } successMsgs = append(successMsgs, cachedMsgs...) log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs) @@ -595,16 +583,16 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin } func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { - userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) + userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } - minSeq, err := db.seq.GetMinSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } - maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } if userMinSeq < minSeq { @@ -648,7 +636,7 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont if minSeq == 0 { return nil } - return db.seq.SetMinSeq(ctx, conversationID, minSeq) + return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { @@ -693,12 +681,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) if len(seqs) > 0 { userMinSeq := seqs[len(seqs)-1] + 1 - currentUserMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) - if err != nil && errs.Unwrap(err) != redis.Nil { + currentUserMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) + if err != nil { return nil, err } if currentUserMinSeq < userMinSeq { - if err := db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { + if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { return nil, err } } @@ -802,7 +790,7 @@ func (db *commonMsgDatabase) DeleteMsgsBySeqs(ctx context.Context, conversationI func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { for _, conversationID := range conversationIDs { - maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) + maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { if err == redis.Nil { log.ZDebug(ctx, "max seq is nil", "conversationID", conversationID) @@ -811,74 +799,87 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u } continue } - if err := db.seq.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { + if err := db.seqConversation.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1) } } } -func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return db.seq.SetMaxSeq(ctx, conversationID, maxSeq) -} +//func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { +// return db.seq.SetMaxSeq(ctx, conversationID, maxSeq) +//} func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return db.seq.GetMaxSeqs(ctx, conversationIDs) + result := make(map[string]int64) + for _, conversationID := range conversationIDs { + if result[conversationID] != 0 { + continue + } + seq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) + if err != nil { + return nil, err + } + result[conversationID] = seq + } + return result, nil } func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return db.seq.GetMaxSeq(ctx, conversationID) + return db.seqConversation.GetMaxSeq(ctx, conversationID) } func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return db.seq.SetMinSeq(ctx, conversationID, minSeq) + return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { - return db.seq.SetMinSeqs(ctx, seqs) -} - -func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return db.seq.GetMinSeqs(ctx, conversationIDs) -} - -func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return db.seq.GetMinSeq(ctx, conversationID) -} - -func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { - return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) -} - -func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) { - return db.seq.GetConversationUserMinSeqs(ctx, conversationID, userIDs) -} - -func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { - return db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq) -} - -func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { - return db.seq.SetConversationUserMinSeqs(ctx, conversationID, seqs) + for conversationID, seq := range seqs { + if err := db.seqConversation.SetMinSeq(ctx, conversationID, seq); err != nil { + return err + } + } + return nil } func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { - return db.seq.SetUserConversationsMinSeqs(ctx, userID, seqs) + for conversationID, seq := range seqs { + if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil } func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { - return db.seq.UserSetHasReadSeqs(ctx, userID, hasReadSeqs) + for conversationID, seq := range hasReadSeqs { + if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil } func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seq.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq) + return db.seqUser.SetReadSeq(ctx, conversationID, userID, hasReadSeq) } func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { - return db.seq.GetHasReadSeqs(ctx, userID, conversationIDs) + cSeq := make(map[string]int64) + for _, conversationID := range conversationIDs { + if _, ok := cSeq[conversationID]; ok { + continue + } + seq, err := db.seqUser.GetReadSeq(ctx, conversationID, userID) + if err != nil { + return nil, err + } + cSeq[conversationID] = seq + } + return cSeq, nil } func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - return db.seq.GetHasReadSeq(ctx, userID, conversationID) + return db.seqUser.GetReadSeq(ctx, conversationID, userID) } func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { @@ -894,11 +895,11 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context if err != nil { return } - minSeqCache, err = db.seq.GetMinSeq(ctx, conversationID) + minSeqCache, err = db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return } - maxSeqCache, err = db.seq.GetMaxSeq(ctx, conversationID) + maxSeqCache, err = db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return } @@ -1010,33 +1011,8 @@ func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, d } } -//func (db *commonMsgDatabase) ClearMsg(ctx context.Context, ts int64) (err error) { -// var ( -// docNum int -// msgNum int -// start = time.Now() -// ) -// for { -// msgs, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, 100) -// if err != nil { -// return err -// } -// if len(msgs) == 0 { -// return nil -// } -// for _, msg := range msgs { -// num, err := db.deleteOneMsg(ctx, ts, msg) -// if err != nil { -// return err -// } -// docNum++ -// msgNum += num -// } -// } -//} - func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error { - dbSeq, err := db.seq.GetMinSeq(ctx, conversationID) + dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { if errors.Is(errs.Unwrap(err), redis.Nil) { return nil @@ -1046,5 +1022,5 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin if dbSeq >= seq { return nil } - return db.seq.SetMinSeq(ctx, conversationID, seq) + return db.seqConversation.SetMinSeq(ctx, conversationID, seq) } diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index b0ad61203..592659536 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -16,13 +16,15 @@ package controller import ( "context" - redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "path/filepath" "time" + redisCache "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/s3" "github.com/openimsdk/tools/s3/cont" "github.com/redis/go-redis/v9" @@ -38,20 +40,28 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) + FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) + DeleteObject(ctx context.Context, name string) error + DeleteSpecifiedData(ctx context.Context, engine string, name string) error + FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) + DelS3Key(ctx context.Context, engine string, keys ...string) error + GetImageThumbnailKey(ctx context.Context, name string) (string, error) } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { return &s3Database{ - s3: cont.New(redis2.NewS3Cache(rdb, s3), s3), - cache: redis2.NewObjectCacheRedis(rdb, obj), - db: obj, + s3: cont.New(redisCache.NewS3Cache(rdb, s3), s3), + cache: redisCache.NewObjectCacheRedis(rdb, obj), + s3cache: redisCache.NewS3Cache(rdb, s3), + db: obj, } } type s3Database struct { - s3 *cont.Controller - cache cache.ObjectCache - db database.ObjectInfo + s3 *cont.Controller + cache cache.ObjectCache + s3cache cont.S3Cache + db database.ObjectInfo } func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) { @@ -111,3 +121,26 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } +func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + + return s.db.FindByExpires(ctx, duration, pagination) +} + +func (s *s3Database) DeleteObject(ctx context.Context, name string) error { + return s.s3.DeleteObject(ctx, name) +} +func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error { + return s.db.Delete(ctx, engine, name) +} + +func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { + return s.db.FindNotDelByS3(ctx, key, duration) +} + +func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { + return s.s3cache.DelS3Key(ctx, engine, keys...) +} + +func (s *s3Database) GetImageThumbnailKey(ctx context.Context, name string) (string, error) { + return s.s3.GetImageThumbnailKey(ctx, name) +} diff --git a/pkg/common/storage/controller/third.go b/pkg/common/storage/controller/third.go index 344501466..a9c2ae403 100644 --- a/pkg/common/storage/controller/third.go +++ b/pkg/common/storage/controller/third.go @@ -16,9 +16,10 @@ package controller import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/tools/db/pagination" diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index df4d10ec4..4242fbb53 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -16,10 +16,13 @@ package mgo import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -68,3 +71,14 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } +func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ + "create_time": bson.M{"$lt": duration}, + }, pagination) +} +func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { + return mongoutil.Count(ctx, o.coll, bson.M{ + "key": key, + "create_time": bson.M{"$gt": duration}, + }) +} diff --git a/pkg/common/storage/database/mgo/seq_conversation.go b/pkg/common/storage/database/mgo/seq_conversation.go new file mode 100644 index 000000000..7971b7e1a --- /dev/null +++ b/pkg/common/storage/database/mgo/seq_conversation.go @@ -0,0 +1,103 @@ +package mgo + +import ( + "context" + "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewSeqConversationMongo(db *mongo.Database) (database.SeqConversation, error) { + coll := db.Collection(database.SeqConversationName) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "conversation_id", Value: 1}, + }, + }) + if err != nil { + return nil, err + } + return &seqConversationMongo{coll: coll}, nil +} + +type seqConversationMongo struct { + coll *mongo.Collection +} + +func (s *seqConversationMongo) setSeq(ctx context.Context, conversationID string, seq int64, field string) error { + filter := map[string]any{ + "conversation_id": conversationID, + } + insert := bson.M{ + "conversation_id": conversationID, + "min_seq": 0, + "max_seq": 0, + } + delete(insert, field) + update := map[string]any{ + "$set": bson.M{ + field: seq, + }, + "$setOnInsert": insert, + } + opt := options.Update().SetUpsert(true) + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt) +} + +func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { + if size < 0 { + return 0, errors.New("size must be greater than 0") + } + if size == 0 { + return s.GetMaxSeq(ctx, conversationID) + } + filter := map[string]any{"conversation_id": conversationID} + update := map[string]any{ + "$inc": map[string]any{"max_seq": size}, + "$set": map[string]any{"min_seq": int64(0)}, + } + opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1}) + lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt) + if err != nil { + return 0, err + } + return lastSeq - size, nil +} + +func (s *seqConversationMongo) SetMaxSeq(ctx context.Context, conversationID string, seq int64) error { + return s.setSeq(ctx, conversationID, seq, "max_seq") +} + +func (s *seqConversationMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { + seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "max_seq": 1})) + if err == nil { + return seq, nil + } else if IsNotFound(err) { + return 0, nil + } else { + return 0, err + } +} + +func (s *seqConversationMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "min_seq": 1})) + if err == nil { + return seq, nil + } else if IsNotFound(err) { + return 0, nil + } else { + return 0, err + } +} + +func (s *seqConversationMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { + return s.setSeq(ctx, conversationID, seq, "min_seq") +} + +func (s *seqConversationMongo) GetConversation(ctx context.Context, conversationID string) (*model.SeqConversation, error) { + return mongoutil.FindOne[*model.SeqConversation](ctx, s.coll, bson.M{"conversation_id": conversationID}) +} diff --git a/pkg/common/storage/database/mgo/seq_conversation_test.go b/pkg/common/storage/database/mgo/seq_conversation_test.go new file mode 100644 index 000000000..5167314da --- /dev/null +++ b/pkg/common/storage/database/mgo/seq_conversation_test.go @@ -0,0 +1,37 @@ +package mgo + +import ( + "context" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "testing" + "time" +) + +func Result[V any](val V, err error) V { + if err != nil { + panic(err) + } + return val +} + +func Mongodb() *mongo.Database { + return Result( + mongo.Connect(context.Background(), + options.Client(). + ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100"). + SetConnectTimeout(5*time.Second)), + ).Database("openim_v3") +} + +func TestUserSeq(t *testing.T) { + uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo) + t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4)) +} + +func TestConversationSeq(t *testing.T) { + cSeq := Result(NewSeqConversationMongo(Mongodb())).(*seqConversationMongo) + t.Log(cSeq.SetMaxSeq(context.Background(), "2000", 10)) + t.Log(cSeq.Malloc(context.Background(), "2000", 10)) + t.Log(cSeq.GetMaxSeq(context.Background(), "2000")) +} diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go new file mode 100644 index 000000000..5e0eb2022 --- /dev/null +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -0,0 +1,92 @@ +package mgo + +import ( + "context" + "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/tools/db/mongoutil" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) { + coll := db.Collection(database.SeqUserName) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "user_id", Value: 1}, + {Key: "conversation_id", Value: 1}, + }, + }) + if err != nil { + return nil, err + } + return &seqUserMongo{coll: coll}, nil +} + +type seqUserMongo struct { + coll *mongo.Collection +} + +func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error { + filter := map[string]any{ + "user_id": userID, + "conversation_id": conversationID, + } + insert := bson.M{ + "user_id": userID, + "conversation_id": conversationID, + "min_seq": 0, + "max_seq": 0, + "read_seq": 0, + } + delete(insert, field) + update := map[string]any{ + "$set": bson.M{ + field: seq, + }, + "$setOnInsert": insert, + } + opt := options.Update().SetUpsert(true) + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt) +} + +func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) { + filter := map[string]any{ + "user_id": userID, + "conversation_id": conversationID, + } + opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1}) + seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt) + if err == nil { + return seq, nil + } else if errors.Is(err, mongo.ErrNoDocuments) { + return 0, nil + } else { + return 0, err + } +} + +func (s *seqUserMongo) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "max_seq") +} + +func (s *seqUserMongo) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "max_seq") +} + +func (s *seqUserMongo) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "min_seq") +} + +func (s *seqUserMongo) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "min_seq") +} + +func (s *seqUserMongo) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "read_seq") +} + +func (s *seqUserMongo) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "read_seq") +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 986f22a1a..f47b7a6ee 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -14,4 +14,6 @@ const ( LogName = "log" ObjectName = "s3" UserName = "user" + SeqConversationName = "seq" + SeqUserName = "seq_user" ) diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 554f71f35..8292006a0 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -16,11 +16,16 @@ package database import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" ) type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error + FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) + FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) } diff --git a/pkg/common/storage/database/seq.go b/pkg/common/storage/database/seq.go new file mode 100644 index 000000000..cf93b795f --- /dev/null +++ b/pkg/common/storage/database/seq.go @@ -0,0 +1,11 @@ +package database + +import "context" + +type SeqConversation interface { + Malloc(ctx context.Context, conversationID string, size int64) (int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, seq int64) error + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, seq int64) error +} diff --git a/pkg/common/storage/database/seq_user.go b/pkg/common/storage/database/seq_user.go new file mode 100644 index 000000000..d32d614dd --- /dev/null +++ b/pkg/common/storage/database/seq_user.go @@ -0,0 +1,12 @@ +package database + +import "context" + +type SeqUser interface { + GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error +} diff --git a/pkg/common/storage/model/seq.go b/pkg/common/storage/model/seq.go new file mode 100644 index 000000000..1dc75eff1 --- /dev/null +++ b/pkg/common/storage/model/seq.go @@ -0,0 +1,7 @@ +package model + +type SeqConversation struct { + ConversationID string `bson:"conversation_id"` + MaxSeq int64 `bson:"max_seq"` + MinSeq int64 `bson:"min_seq"` +} diff --git a/pkg/common/storage/model/seq_user.go b/pkg/common/storage/model/seq_user.go new file mode 100644 index 000000000..845996bb8 --- /dev/null +++ b/pkg/common/storage/model/seq_user.go @@ -0,0 +1,9 @@ +package model + +type SeqUser struct { + UserID string `bson:"user_id"` + ConversationID string `bson:"conversation_id"` + MinSeq int64 `bson:"min_seq"` + MaxSeq int64 `bson:"max_seq"` + ReadSeq int64 `bson:"read_seq"` +} diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index b369269cc..f8140cc7d 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -24,6 +24,10 @@ import ( "google.golang.org/protobuf/proto" ) +func IsGroupConversationID(conversationID string) bool { + return strings.HasPrefix(conversationID, "g_") || strings.HasPrefix(conversationID, "sg_") +} + func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string { switch msg.SessionType { case constant.SingleChatType: diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index 4c71dff6a..7cdc60d52 100644 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -41,3 +41,7 @@ func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl } return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl} } +func (t *Third) DeleteOutdatedData(ctx context.Context, expires int64) error { + _, err := t.Client.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expires}) + return err +} diff --git a/start-config.yml b/start-config.yml index a9c412b33..21436d7a9 100644 --- a/start-config.yml +++ b/start-config.yml @@ -14,4 +14,5 @@ serviceBinaries: toolBinaries: - check-free-memory - check-component + - seq maxFileDescriptors: 10000 diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go new file mode 100644 index 000000000..5200ad5fe --- /dev/null +++ b/tools/seq/internal/main.go @@ -0,0 +1,331 @@ +package internal + +import ( + "bytes" + "context" + "errors" + "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "gopkg.in/yaml.v3" + "os" + "os/signal" + "path/filepath" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" +) + +const ( + MaxSeq = "MAX_SEQ:" + MinSeq = "MIN_SEQ:" + ConversationUserMinSeq = "CON_USER_MIN_SEQ:" + HasReadSeq = "HAS_READ_SEQ:" +) + +const ( + batchSize = 100 + dataVersionCollection = "data_version" + seqKey = "seq" + seqVersion = 38 +) + +func readConfig[T any](dir string, name string) (*T, error) { + data, err := os.ReadFile(filepath.Join(dir, name)) + if err != nil { + return nil, err + } + var conf T + if err := yaml.Unmarshal(data, &conf); err != nil { + return nil, err + } + return &conf, nil +} + +func Main(conf string, del time.Duration) error { + redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName) + if err != nil { + return err + } + mongodbConfig, err := readConfig[config.Mongo](conf, cmd.MongodbConfigFileName) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + rdb, err := redisutil.NewRedisClient(ctx, redisConfig.Build()) + if err != nil { + return err + } + mgocli, err := mongoutil.NewMongoDB(ctx, mongodbConfig.Build()) + if err != nil { + return err + } + versionColl := mgocli.GetDB().Collection(dataVersionCollection) + converted, err := CheckVersion(versionColl, seqKey, seqVersion) + if err != nil { + return err + } + if converted { + fmt.Println("[seq] seq data has been converted") + return nil + } + if _, err := mgo.NewSeqConversationMongo(mgocli.GetDB()); err != nil { + return err + } + cSeq, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) + if err != nil { + return err + } + uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + if err != nil { + return err + } + uSpitHasReadSeq := func(id string) (conversationID string, userID string, err error) { + // HasReadSeq + userID + ":" + conversationID + arr := strings.Split(id, ":") + if len(arr) != 2 || arr[0] == "" || arr[1] == "" { + return "", "", fmt.Errorf("invalid has read seq id %s", id) + } + userID = arr[0] + conversationID = arr[1] + return + } + uSpitConversationUserMinSeq := func(id string) (conversationID string, userID string, err error) { + // ConversationUserMinSeq + conversationID + "u:" + userID + arr := strings.Split(id, "u:") + if len(arr) != 2 || arr[0] == "" || arr[1] == "" { + return "", "", fmt.Errorf("invalid has read seq id %s", id) + } + conversationID = arr[0] + userID = arr[1] + return + } + + ts := []*taskSeq{ + { + Prefix: MaxSeq, + GetSeq: cSeq.GetMaxSeq, + SetSeq: cSeq.SetMinSeq, + }, + { + Prefix: MinSeq, + GetSeq: cSeq.GetMinSeq, + SetSeq: cSeq.SetMinSeq, + }, + { + Prefix: HasReadSeq, + GetSeq: func(ctx context.Context, id string) (int64, error) { + conversationID, userID, err := uSpitHasReadSeq(id) + if err != nil { + return 0, err + } + return uSeq.GetReadSeq(ctx, conversationID, userID) + }, + SetSeq: func(ctx context.Context, id string, seq int64) error { + conversationID, userID, err := uSpitHasReadSeq(id) + if err != nil { + return err + } + return uSeq.SetReadSeq(ctx, conversationID, userID, seq) + }, + }, + { + Prefix: ConversationUserMinSeq, + GetSeq: func(ctx context.Context, id string) (int64, error) { + conversationID, userID, err := uSpitConversationUserMinSeq(id) + if err != nil { + return 0, err + } + return uSeq.GetMinSeq(ctx, conversationID, userID) + }, + SetSeq: func(ctx context.Context, id string, seq int64) error { + conversationID, userID, err := uSpitConversationUserMinSeq(id) + if err != nil { + return err + } + return uSeq.SetMinSeq(ctx, conversationID, userID, seq) + }, + }, + } + + cancel() + ctx = context.Background() + + var wg sync.WaitGroup + wg.Add(len(ts)) + + for i := range ts { + go func(task *taskSeq) { + defer wg.Done() + err := seqRedisToMongo(ctx, rdb, task.GetSeq, task.SetSeq, task.Prefix, del, &task.Count) + task.End = time.Now() + task.Error = err + }(ts[i]) + } + start := time.Now() + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var buf bytes.Buffer + + printTaskInfo := func(now time.Time) { + buf.Reset() + buf.WriteString(now.Format(time.DateTime)) + buf.WriteString(" \n") + for i := range ts { + task := ts[i] + if task.Error == nil { + if task.End.IsZero() { + buf.WriteString(fmt.Sprintf("[%s] converting %s* count %d", now.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count))) + } else { + buf.WriteString(fmt.Sprintf("[%s] success %s* count %d", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count))) + } + } else { + buf.WriteString(fmt.Sprintf("[%s] failed %s* count %d error %s", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count), task.Error)) + } + buf.WriteString("\n") + } + fmt.Println(buf.String()) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case s := <-sigs: + return fmt.Errorf("exit by signal %s", s) + case <-done: + errs := make([]error, 0, len(ts)) + for i := range ts { + task := ts[i] + if task.Error != nil { + errs = append(errs, fmt.Errorf("seq %s failed %w", task.Prefix, task.Error)) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + printTaskInfo(time.Now()) + if err := SetVersion(versionColl, seqKey, seqVersion); err != nil { + return fmt.Errorf("set mongodb seq version %w", err) + } + return nil + case now := <-ticker.C: + printTaskInfo(now) + } + } +} + +type taskSeq struct { + Prefix string + Count int64 + Error error + End time.Time + GetSeq func(ctx context.Context, id string) (int64, error) + SetSeq func(ctx context.Context, id string, seq int64) error +} + +func seqRedisToMongo(ctx context.Context, rdb redis.UniversalClient, getSeq func(ctx context.Context, id string) (int64, error), setSeq func(ctx context.Context, id string, seq int64) error, prefix string, delAfter time.Duration, count *int64) error { + var ( + cursor uint64 + keys []string + err error + ) + for { + keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result() + if err != nil { + return err + } + if len(keys) > 0 { + for _, key := range keys { + seqStr, err := rdb.Get(ctx, key).Result() + if err != nil { + return fmt.Errorf("redis get %s failed %w", key, err) + } + seq, err := strconv.Atoi(seqStr) + if err != nil { + return fmt.Errorf("invalid %s seq %s", key, seqStr) + } + if seq < 0 { + return fmt.Errorf("invalid %s seq %s", key, seqStr) + } + id := strings.TrimPrefix(key, prefix) + redisSeq := int64(seq) + mongoSeq, err := getSeq(ctx, id) + if err != nil { + return fmt.Errorf("get mongo seq %s failed %w", key, err) + } + if mongoSeq < redisSeq { + if err := setSeq(ctx, id, redisSeq); err != nil { + return fmt.Errorf("set mongo seq %s failed %w", key, err) + } + } + if delAfter > 0 { + if err := rdb.Expire(ctx, key, delAfter).Err(); err != nil { + return fmt.Errorf("redis expire key %s failed %w", key, err) + } + } else { + if err := rdb.Del(ctx, key).Err(); err != nil { + return fmt.Errorf("redis del key %s failed %w", key, err) + } + } + atomic.AddInt64(count, 1) + } + } + if cursor == 0 { + return nil + } + } +} + +func CheckVersion(coll *mongo.Collection, key string, currentVersion int) (converted bool, err error) { + type VersionTable struct { + Key string `bson:"key"` + Value string `bson:"value"` + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + res, err := mongoutil.FindOne[VersionTable](ctx, coll, bson.M{"key": key}) + if err == nil { + ver, err := strconv.Atoi(res.Value) + if err != nil { + return false, fmt.Errorf("version %s parse error %w", res.Value, err) + } + if ver >= currentVersion { + return true, nil + } + return false, nil + } else if errors.Is(err, mongo.ErrNoDocuments) { + return false, nil + } else { + return false, err + } +} + +func SetVersion(coll *mongo.Collection, key string, version int) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + option := options.Update().SetUpsert(true) + filter := bson.M{"key": key, "value": strconv.Itoa(version)} + update := bson.M{"$set": bson.M{"key": key, "value": strconv.Itoa(version)}} + return mongoutil.UpdateOne(ctx, coll, filter, update, false, option) +} diff --git a/tools/seq/main.go b/tools/seq/main.go new file mode 100644 index 000000000..399e6d934 --- /dev/null +++ b/tools/seq/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "flag" + "fmt" + "github.com/openimsdk/open-im-server/v3/tools/seq/internal" + "time" +) + +func main() { + var ( + config string + second int + ) + flag.StringVar(&config, "c", "/Users/chao/Desktop/project/open-im-server/config", "config directory") + flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion") + flag.Parse() + if err := internal.Main(config, time.Duration(second)*time.Second); err != nil { + fmt.Println("seq task", err) + } + fmt.Println("seq task success!") +}