diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index e90c2288c..c17abbc30 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -82,7 +82,6 @@ func Start(ctx context.Context, index int, config *Config) error { client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) msgModel := redis.NewMsgCache(rdb) - seqModel := redis.NewSeqCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err @@ -92,7 +91,12 @@ func Start(ctx context.Context, index int, config *Config) error { return err } seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) if err != nil { return err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 7cffb23a9..de0f698ea 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -86,7 +86,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } msgModel := redis.NewMsgCache(rdb) - seqModel := redis.NewSeqCache(rdb) conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) @@ -96,7 +95,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) if err != nil { return err } diff --git a/pkg/common/storage/cache/cachekey/seq.go b/pkg/common/storage/cache/cachekey/seq.go index ded0286d8..b32e78300 100644 --- a/pkg/common/storage/cache/cachekey/seq.go +++ b/pkg/common/storage/cache/cachekey/seq.go @@ -1,38 +1,30 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package cachekey const ( - MaxSeq = "MAX_SEQ:" - MinSeq = "MIN_SEQ:" - ConversationUserMinSeq = "CON_USER_MIN_SEQ:" - HasReadSeq = "HAS_READ_SEQ:" + MallocSeq = "MALLOC_SEQ:" + MallocMinSeqLock = "MALLOC_MIN_SEQ:" + + SeqUserMaxSeq = "SEQ_USER_MAX:" + SeqUserMinSeq = "SEQ_USER_MIN:" + SeqUserReadSeq = "SEQ_USER_READ:" ) -func GetMaxSeqKey(conversationID string) string { - return MaxSeq + conversationID +func GetMallocSeqKey(conversationID string) string { + return MallocSeq + conversationID +} + +func GetMallocMinSeqKey(conversationID string) string { + return MallocMinSeqLock + conversationID } -func GetMinSeqKey(conversationID string) string { - return MinSeq + conversationID +func GetSeqUserMaxSeqKey(conversationID string, userID string) string { + return SeqUserMaxSeq + conversationID + ":" + userID } -func GetHasReadSeqKey(conversationID string, userID string) string { - return HasReadSeq + userID + ":" + conversationID +func GetSeqUserMinSeqKey(conversationID string, userID string) string { + return SeqUserMinSeq + conversationID + ":" + userID } -func GetConversationUserMinSeqKey(conversationID, userID string) string { - return ConversationUserMinSeq + conversationID + "u:" + userID +func GetSeqUserReadSeqKey(conversationID string, userID string) string { + return SeqUserReadSeq + conversationID + ":" + userID } diff --git a/pkg/common/storage/cache/cachekey/seq1.go b/pkg/common/storage/cache/cachekey/seq1.go deleted file mode 100644 index 04274db55..000000000 --- a/pkg/common/storage/cache/cachekey/seq1.go +++ /dev/null @@ -1,14 +0,0 @@ -package cachekey - -const ( - MallocSeq = "MALLOC_SEQ:" - MallocMinSeqLock = "MALLOC_MIN_SEQ:" -) - -func GetMallocSeqKey(conversationID string) string { - return MallocSeq + conversationID -} - -func GetMallocMinSeqKey(conversationID string) string { - return MallocMinSeqLock + conversationID -} diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go deleted file mode 100644 index 8624cc78f..000000000 --- a/pkg/common/storage/cache/redis/seq.go +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redis - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/stringutil" - "github.com/redis/go-redis/v9" -) - -func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache { - return &seqCache{rdb: rdb} -} - -type seqCache struct { - rdb redis.UniversalClient -} - -func (c *seqCache) getMaxSeqKey(conversationID string) string { - return cachekey.GetMaxSeqKey(conversationID) -} - -func (c *seqCache) getMinSeqKey(conversationID string) string { - return cachekey.GetMinSeqKey(conversationID) -} - -func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string { - return cachekey.GetHasReadSeqKey(conversationID, userID) -} - -func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string { - return cachekey.GetConversationUserMinSeqKey(conversationID, userID) -} - -func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { - return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) -} - -func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { - val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64() - if err != nil { - return 0, errs.Wrap(err) - } - return val, nil -} - -func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { - m = make(map[string]int64, len(items)) - for i, v := range items { - res, err := c.rdb.Get(ctx, getkey(v)).Result() - if err != nil && err != redis.Nil { - return nil, errs.Wrap(err) - } - val := stringutil.StringToInt64(res) - if val != 0 { - m[items[i]] = val - } - } - - return m, nil -} - -//func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { -// return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) -//} -// -//func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { -// return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) -//} -// -//func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { -// return c.getSeq(ctx, conversationID, c.getMaxSeqKey) -//} -// -//func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { -// return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) -//} -// -//func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { -// for conversationID, seq := range seqs { -// if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { -// return errs.Wrap(err) -// } -// } -// return nil -//} -// -// -//func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { -// return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) -//} -// -//func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { -// return c.getSeq(ctx, conversationID, c.getMinSeqKey) -//} -// -//func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { -// return c.setSeqs(ctx, seqs, c.getMinSeqKey) -//} - -func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { - for conversationID, seq := range seqs { - if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { - return errs.Wrap(err) - } - } - return nil -} - -func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { - val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() - if err != nil { - return 0, errs.Wrap(err) - } - return val, nil -} - -func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { - return c.getSeqs(ctx, userIDs, func(userID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) -} - -func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { - return c.setSeqs(ctx, seqs, func(userID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) { - return c.setSeqs(ctx, seqs, func(conversationID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) -} - -func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error { - return c.setSeqs(ctx, hasReadSeqs, func(userID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { - return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { - return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64() - if err != nil { - return 0, err - } - return val, nil -} diff --git a/pkg/common/storage/cache/redis/seq_user.go b/pkg/common/storage/cache/redis/seq_user.go new file mode 100644 index 000000000..3c533cdd8 --- /dev/null +++ b/pkg/common/storage/cache/redis/seq_user.go @@ -0,0 +1,89 @@ +package redis + +import ( + "context" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/tools/errs" + "github.com/redis/go-redis/v9" + "strconv" + "time" +) + +func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser { + return &seqUserCacheRedis{ + rdb: rdb, + mgo: mgo, + readSeqWriteRatio: 100, + expireTime: time.Hour * 24 * 7, + readExpireTime: time.Hour * 24 * 30, + rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + } +} + +type seqUserCacheRedis struct { + rdb redis.UniversalClient + mgo database.SeqUser + rocks *rockscache.Client + expireTime time.Duration + readExpireTime time.Duration + readSeqWriteRatio int64 +} + +func (s *seqUserCacheRedis) getSeqUserMaxSeqKey(conversationID string, userID string) string { + return cachekey.GetSeqUserMaxSeqKey(conversationID, userID) +} + +func (s *seqUserCacheRedis) getSeqUserMinSeqKey(conversationID string, userID string) string { + return cachekey.GetSeqUserMinSeqKey(conversationID, userID) +} + +func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID string) string { + return cachekey.GetSeqUserReadSeqKey(conversationID, userID) +} + +func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMaxSeq(ctx, conversationID, userID) + }) +} + +func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID)) +} + +func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMaxSeq(ctx, conversationID, userID) + }) +} + +func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMinSeqKey(conversationID, userID)) +} + +func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMaxSeq(ctx, conversationID, userID) + }) +} + +func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + if seq%s.readSeqWriteRatio == 0 { + if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { + return errs.Wrap(err) + } + return nil +} diff --git a/pkg/common/storage/cache/seq.go b/pkg/common/storage/cache/seq.go deleted file mode 100644 index 46e33c935..000000000 --- a/pkg/common/storage/cache/seq.go +++ /dev/null @@ -1,30 +0,0 @@ -package cache - -import ( - "context" -) - -type SeqCache interface { - //SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error - //GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - //GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - //SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error - //SetMinSeqs(ctx context.Context, seqs map[string]int64) error - //GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - //GetMinSeq(ctx context.Context, conversationID string) (int64, error) - GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) - GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) - SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error - // seqs map: key userID value minSeq - SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) - // seqs map: key conversationID value minSeq - SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error - // has read seq - SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error - // k: user, v: seq - SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error - // k: conversation, v :seq - UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error - GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) - GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) -} diff --git a/pkg/common/storage/cache/seq_user.go b/pkg/common/storage/cache/seq_user.go new file mode 100644 index 000000000..9e68399a7 --- /dev/null +++ b/pkg/common/storage/cache/seq_user.go @@ -0,0 +1,12 @@ +package cache + +import "context" + +type SeqUser interface { + GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index de8c111b3..1e7b5c229 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -108,7 +108,7 @@ type CommonMsgDatabase interface { DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) } -func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cache.SeqCache, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { +func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) if err != nil { return nil, err @@ -128,7 +128,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cach return &commonMsgDatabase{ msgDocDatabase: msgDocModel, msg: msg, - seq: seq, + seqUser: seqUser, seqConversation: seqConversation, producer: producerToRedis, producerToMongo: producerToMongo, @@ -140,8 +140,8 @@ type commonMsgDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel msg cache.MsgCache - seq cache.SeqCache seqConversation cache.SeqConversationCache + seqUser cache.SeqUser producer *kafka.Producer producerToMongo *kafka.Producer producerToPush *kafka.Producer @@ -339,6 +339,15 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) } +func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { + for userID, seq := range userSeqMap { + if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil +} + func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { lenList := len(msgs) if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { @@ -368,7 +377,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } else { prommetrics.MsgInsertRedisSuccessCounter.Inc() } - err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap) + err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) if err != nil { log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() @@ -496,7 +505,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin // "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group. // This ensures that their message retrieval starts from the point they joined. func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { - userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) + userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } @@ -574,7 +583,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin } func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { - userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) + userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } @@ -672,12 +681,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) if len(seqs) > 0 { userMinSeq := seqs[len(seqs)-1] + 1 - currentUserMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) - if err != nil && errs.Unwrap(err) != redis.Nil { + currentUserMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) + if err != nil { return nil, err } if currentUserMinSeq < userMinSeq { - if err := db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { + if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { return nil, err } } @@ -832,40 +841,45 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int return nil } -func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { - return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) -} - -func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) { - return db.seq.GetConversationUserMinSeqs(ctx, conversationID, userIDs) -} - -func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { - return db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq) -} - -func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { - return db.seq.SetConversationUserMinSeqs(ctx, conversationID, seqs) -} - func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { - return db.seq.SetUserConversationsMinSeqs(ctx, userID, seqs) + for conversationID, seq := range seqs { + if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil } func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { - return db.seq.UserSetHasReadSeqs(ctx, userID, hasReadSeqs) + for conversationID, seq := range hasReadSeqs { + if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil } func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seq.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq) + return db.seqUser.SetReadSeq(ctx, conversationID, userID, hasReadSeq) } func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { - return db.seq.GetHasReadSeqs(ctx, userID, conversationIDs) + cSeq := make(map[string]int64) + for _, conversationID := range conversationIDs { + if _, ok := cSeq[conversationID]; ok { + continue + } + seq, err := db.seqUser.GetReadSeq(ctx, conversationID, userID) + if err != nil { + return nil, err + } + cSeq[conversationID] = seq + } + return cSeq, nil } func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - return db.seq.GetHasReadSeq(ctx, userID, conversationID) + return db.seqUser.GetReadSeq(ctx, conversationID, userID) } func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { diff --git a/pkg/common/storage/database/mgo/seq_conversation.go b/pkg/common/storage/database/mgo/seq_conversation.go index c9a3ac41a..7971b7e1a 100644 --- a/pkg/common/storage/database/mgo/seq_conversation.go +++ b/pkg/common/storage/database/mgo/seq_conversation.go @@ -28,6 +28,26 @@ type seqConversationMongo struct { coll *mongo.Collection } +func (s *seqConversationMongo) setSeq(ctx context.Context, conversationID string, seq int64, field string) error { + filter := map[string]any{ + "conversation_id": conversationID, + } + insert := bson.M{ + "conversation_id": conversationID, + "min_seq": 0, + "max_seq": 0, + } + delete(insert, field) + update := map[string]any{ + "$set": bson.M{ + field: seq, + }, + "$setOnInsert": insert, + } + opt := options.Update().SetUpsert(true) + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt) +} + func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { if size < 0 { return 0, errors.New("size must be greater than 0") @@ -48,16 +68,8 @@ func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string return lastSeq - size, nil } -func (s *seqConversationMongo) MallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) { - first, err := s.Malloc(ctx, conversationID, size) - if err != nil { - return nil, err - } - seqs := make([]int64, 0, size) - for i := int64(0); i < size; i++ { - seqs = append(seqs, first+i+1) - } - return seqs, nil +func (s *seqConversationMongo) SetMaxSeq(ctx context.Context, conversationID string, seq int64) error { + return s.setSeq(ctx, conversationID, seq, "max_seq") } func (s *seqConversationMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { @@ -83,7 +95,7 @@ func (s *seqConversationMongo) GetMinSeq(ctx context.Context, conversationID str } func (s *seqConversationMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { - return mongoutil.UpdateOne(ctx, s.coll, bson.M{"conversation_id": conversationID}, bson.M{"$set": bson.M{"min_seq": seq}}, false) + return s.setSeq(ctx, conversationID, seq, "min_seq") } func (s *seqConversationMongo) GetConversation(ctx context.Context, conversationID string) (*model.SeqConversation, error) { diff --git a/pkg/common/storage/database/mgo/seq_conversation_test.go b/pkg/common/storage/database/mgo/seq_conversation_test.go index e6466a1c6..5167314da 100644 --- a/pkg/common/storage/database/mgo/seq_conversation_test.go +++ b/pkg/common/storage/database/mgo/seq_conversation_test.go @@ -15,19 +15,23 @@ func Result[V any](val V, err error) V { return val } -func TestName(t *testing.T) { - cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - tmp, err := NewSeqConversationMongo(cli.Database("openim_v3")) - if err != nil { - panic(err) - } - for i := 0; i < 10; i++ { - var size int64 = 100 - firstSeq, err := tmp.Malloc(context.Background(), "1", size) - if err != nil { - t.Log(err) - return - } - t.Logf("%d -> %d", firstSeq, firstSeq+size-1) - } +func Mongodb() *mongo.Database { + return Result( + mongo.Connect(context.Background(), + options.Client(). + ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100"). + SetConnectTimeout(5*time.Second)), + ).Database("openim_v3") +} + +func TestUserSeq(t *testing.T) { + uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo) + t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4)) +} + +func TestConversationSeq(t *testing.T) { + cSeq := Result(NewSeqConversationMongo(Mongodb())).(*seqConversationMongo) + t.Log(cSeq.SetMaxSeq(context.Background(), "2000", 10)) + t.Log(cSeq.Malloc(context.Background(), "2000", 10)) + t.Log(cSeq.GetMaxSeq(context.Background(), "2000")) } diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go index 4fa299a7a..5e0eb2022 100644 --- a/pkg/common/storage/database/mgo/seq_user.go +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -2,6 +2,7 @@ package mgo import ( "context" + "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/tools/db/mongoutil" "go.mongodb.org/mongo-driver/bson" @@ -10,7 +11,7 @@ import ( ) func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) { - coll := db.Collection(database.SeqConversationName) + coll := db.Collection(database.SeqUserName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "user_id", Value: 1}, @@ -27,45 +28,65 @@ type seqUserMongo struct { coll *mongo.Collection } -func (s *seqUserMongo) setSeq(ctx context.Context, userID string, conversationID string, seq int64, field string) error { +func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error { filter := map[string]any{ "user_id": userID, "conversation_id": conversationID, } + insert := bson.M{ + "user_id": userID, + "conversation_id": conversationID, + "min_seq": 0, + "max_seq": 0, + "read_seq": 0, + } + delete(insert, field) update := map[string]any{ - "$set": map[string]any{"field": int64(0)}, + "$set": bson.M{ + field: seq, + }, + "$setOnInsert": insert, } opt := options.Update().SetUpsert(true) return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt) } -func (s *seqUserMongo) GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error) { +func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) { + filter := map[string]any{ + "user_id": userID, + "conversation_id": conversationID, + } + opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1}) + seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt) + if err == nil { + return seq, nil + } else if errors.Is(err, mongo.ErrNoDocuments) { + return 0, nil + } else { + return 0, err + } +} - //TODO implement me - panic("implement me") +func (s *seqUserMongo) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "max_seq") } -func (s *seqUserMongo) SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error { - //TODO implement me - panic("implement me") +func (s *seqUserMongo) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "max_seq") } -func (s *seqUserMongo) GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - //TODO implement me - panic("implement me") +func (s *seqUserMongo) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "min_seq") } -func (s *seqUserMongo) SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error { - //TODO implement me - panic("implement me") +func (s *seqUserMongo) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "min_seq") } -func (s *seqUserMongo) GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - //TODO implement me - panic("implement me") +func (s *seqUserMongo) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "read_seq") } -func (s *seqUserMongo) SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error { - //TODO implement me - panic("implement me") +func (s *seqUserMongo) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "read_seq") } diff --git a/pkg/common/storage/database/seq.go b/pkg/common/storage/database/seq.go index fdbc2f8f3..cf93b795f 100644 --- a/pkg/common/storage/database/seq.go +++ b/pkg/common/storage/database/seq.go @@ -5,6 +5,7 @@ import "context" type SeqConversation interface { Malloc(ctx context.Context, conversationID string, size int64) (int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, seq int64) error GetMinSeq(ctx context.Context, conversationID string) (int64, error) SetMinSeq(ctx context.Context, conversationID string, seq int64) error } diff --git a/pkg/common/storage/database/seq_user.go b/pkg/common/storage/database/seq_user.go index 3e9b29ec2..d32d614dd 100644 --- a/pkg/common/storage/database/seq_user.go +++ b/pkg/common/storage/database/seq_user.go @@ -3,10 +3,10 @@ package database import "context" type SeqUser interface { - GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error) - SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error - GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error - GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) - SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error + GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error + GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) + SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error } diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go index 54a7d477d..5200ad5fe 100644 --- a/tools/seq/internal/main.go +++ b/tools/seq/internal/main.go @@ -1,30 +1,38 @@ package internal import ( + "bytes" "context" "errors" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "gopkg.in/yaml.v3" "os" + "os/signal" "path/filepath" "strconv" "strings" + "sync" + "sync/atomic" + "syscall" "time" ) +const ( + MaxSeq = "MAX_SEQ:" + MinSeq = "MIN_SEQ:" + ConversationUserMinSeq = "CON_USER_MIN_SEQ:" + HasReadSeq = "HAS_READ_SEQ:" +) + const ( batchSize = 100 dataVersionCollection = "data_version" @@ -44,48 +52,6 @@ func readConfig[T any](dir string, name string) (*T, error) { return &conf, nil } -func redisKey(rdb redis.UniversalClient, prefix string, del time.Duration, fn func(ctx context.Context, key string, delKey map[string]struct{}) error) error { - var ( - cursor uint64 - keys []string - err error - ) - ctx := context.Background() - for { - keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result() - if err != nil { - return err - } - delKey := make(map[string]struct{}) - if len(keys) > 0 { - for _, key := range keys { - if err := fn(ctx, key, delKey); err != nil { - return err - } - } - } - if len(delKey) > 0 { - delKeys := datautil.Keys(delKey) - if del < time.Second { - if err := rdb.Del(ctx, datautil.Keys(delKey)...).Err(); err != nil { - return err - } - } else { - pipe := rdb.Pipeline() - for _, key := range delKeys { - pipe.Expire(ctx, key, del) - } - if _, err := pipe.Exec(ctx); err != nil { - return err - } - } - } - if cursor == 0 { - return nil - } - } -} - func Main(conf string, del time.Duration) error { redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName) if err != nil { @@ -117,71 +83,218 @@ func Main(conf string, del time.Duration) error { if _, err := mgo.NewSeqConversationMongo(mgocli.GetDB()); err != nil { return err } - coll := mgocli.GetDB().Collection(database.SeqConversationName) - const prefix = cachekey.MaxSeq - fmt.Println("start to convert seq conversation") - err = redisKey(rdb, prefix, del, func(ctx context.Context, key string, delKey map[string]struct{}) error { - conversationId := strings.TrimPrefix(key, prefix) - delKey[key] = struct{}{} - maxValue, err := rdb.Get(ctx, key).Result() - if err != nil { - return err - } - seq, err := strconv.Atoi(maxValue) - if err != nil { - return fmt.Errorf("invalid max seq %s", maxValue) + cSeq, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) + if err != nil { + return err + } + uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + if err != nil { + return err + } + uSpitHasReadSeq := func(id string) (conversationID string, userID string, err error) { + // HasReadSeq + userID + ":" + conversationID + arr := strings.Split(id, ":") + if len(arr) != 2 || arr[0] == "" || arr[1] == "" { + return "", "", fmt.Errorf("invalid has read seq id %s", id) } - if seq == 0 { - return nil + userID = arr[0] + conversationID = arr[1] + return + } + uSpitConversationUserMinSeq := func(id string) (conversationID string, userID string, err error) { + // ConversationUserMinSeq + conversationID + "u:" + userID + arr := strings.Split(id, "u:") + if len(arr) != 2 || arr[0] == "" || arr[1] == "" { + return "", "", fmt.Errorf("invalid has read seq id %s", id) } - if seq < 0 { - return fmt.Errorf("invalid max seq %s", maxValue) + conversationID = arr[0] + userID = arr[1] + return + } + + ts := []*taskSeq{ + { + Prefix: MaxSeq, + GetSeq: cSeq.GetMaxSeq, + SetSeq: cSeq.SetMinSeq, + }, + { + Prefix: MinSeq, + GetSeq: cSeq.GetMinSeq, + SetSeq: cSeq.SetMinSeq, + }, + { + Prefix: HasReadSeq, + GetSeq: func(ctx context.Context, id string) (int64, error) { + conversationID, userID, err := uSpitHasReadSeq(id) + if err != nil { + return 0, err + } + return uSeq.GetReadSeq(ctx, conversationID, userID) + }, + SetSeq: func(ctx context.Context, id string, seq int64) error { + conversationID, userID, err := uSpitHasReadSeq(id) + if err != nil { + return err + } + return uSeq.SetReadSeq(ctx, conversationID, userID, seq) + }, + }, + { + Prefix: ConversationUserMinSeq, + GetSeq: func(ctx context.Context, id string) (int64, error) { + conversationID, userID, err := uSpitConversationUserMinSeq(id) + if err != nil { + return 0, err + } + return uSeq.GetMinSeq(ctx, conversationID, userID) + }, + SetSeq: func(ctx context.Context, id string, seq int64) error { + conversationID, userID, err := uSpitConversationUserMinSeq(id) + if err != nil { + return err + } + return uSeq.SetMinSeq(ctx, conversationID, userID, seq) + }, + }, + } + + cancel() + ctx = context.Background() + + var wg sync.WaitGroup + wg.Add(len(ts)) + + for i := range ts { + go func(task *taskSeq) { + defer wg.Done() + err := seqRedisToMongo(ctx, rdb, task.GetSeq, task.SetSeq, task.Prefix, del, &task.Count) + task.End = time.Now() + task.Error = err + }(ts[i]) + } + start := time.Now() + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var buf bytes.Buffer + + printTaskInfo := func(now time.Time) { + buf.Reset() + buf.WriteString(now.Format(time.DateTime)) + buf.WriteString(" \n") + for i := range ts { + task := ts[i] + if task.Error == nil { + if task.End.IsZero() { + buf.WriteString(fmt.Sprintf("[%s] converting %s* count %d", now.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count))) + } else { + buf.WriteString(fmt.Sprintf("[%s] success %s* count %d", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count))) + } + } else { + buf.WriteString(fmt.Sprintf("[%s] failed %s* count %d error %s", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count), task.Error)) + } + buf.WriteString("\n") } - var ( - minSeq int64 - maxSeq = int64(seq) - ) - minKey := cachekey.MinSeq + conversationId - delKey[minKey] = struct{}{} - minValue, err := rdb.Get(ctx, minKey).Result() - if err == nil { - seq, err := strconv.Atoi(minValue) - if err != nil { - return fmt.Errorf("invalid min seq %s", minValue) + fmt.Println(buf.String()) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case s := <-sigs: + return fmt.Errorf("exit by signal %s", s) + case <-done: + errs := make([]error, 0, len(ts)) + for i := range ts { + task := ts[i] + if task.Error != nil { + errs = append(errs, fmt.Errorf("seq %s failed %w", task.Prefix, task.Error)) + } } - if seq < 0 { - return fmt.Errorf("invalid min seq %s", minValue) + if len(errs) > 0 { + return errors.Join(errs...) } - minSeq = int64(seq) - } else if !errors.Is(err, redis.Nil) { - return err - } - if maxSeq < minSeq { - return fmt.Errorf("invalid max seq %d < min seq %d", maxSeq, minSeq) - } - res, err := mongoutil.FindOne[*model.SeqConversation](ctx, coll, bson.M{"conversation_id": conversationId}, nil) - if err == nil { - if res.MaxSeq < int64(seq) { - _, err = coll.UpdateOne(ctx, bson.M{"conversation_id": conversationId}, bson.M{"$set": bson.M{"max_seq": maxSeq, "min_seq": minSeq}}) + printTaskInfo(time.Now()) + if err := SetVersion(versionColl, seqKey, seqVersion); err != nil { + return fmt.Errorf("set mongodb seq version %w", err) } + return nil + case now := <-ticker.C: + printTaskInfo(now) + } + } +} + +type taskSeq struct { + Prefix string + Count int64 + Error error + End time.Time + GetSeq func(ctx context.Context, id string) (int64, error) + SetSeq func(ctx context.Context, id string, seq int64) error +} + +func seqRedisToMongo(ctx context.Context, rdb redis.UniversalClient, getSeq func(ctx context.Context, id string) (int64, error), setSeq func(ctx context.Context, id string, seq int64) error, prefix string, delAfter time.Duration, count *int64) error { + var ( + cursor uint64 + keys []string + err error + ) + for { + keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result() + if err != nil { return err - } else if errors.Is(err, mongo.ErrNoDocuments) { - res = &model.SeqConversation{ - ConversationID: conversationId, - MaxSeq: maxSeq, - MinSeq: minSeq, + } + if len(keys) > 0 { + for _, key := range keys { + seqStr, err := rdb.Get(ctx, key).Result() + if err != nil { + return fmt.Errorf("redis get %s failed %w", key, err) + } + seq, err := strconv.Atoi(seqStr) + if err != nil { + return fmt.Errorf("invalid %s seq %s", key, seqStr) + } + if seq < 0 { + return fmt.Errorf("invalid %s seq %s", key, seqStr) + } + id := strings.TrimPrefix(key, prefix) + redisSeq := int64(seq) + mongoSeq, err := getSeq(ctx, id) + if err != nil { + return fmt.Errorf("get mongo seq %s failed %w", key, err) + } + if mongoSeq < redisSeq { + if err := setSeq(ctx, id, redisSeq); err != nil { + return fmt.Errorf("set mongo seq %s failed %w", key, err) + } + } + if delAfter > 0 { + if err := rdb.Expire(ctx, key, delAfter).Err(); err != nil { + return fmt.Errorf("redis expire key %s failed %w", key, err) + } + } else { + if err := rdb.Del(ctx, key).Err(); err != nil { + return fmt.Errorf("redis del key %s failed %w", key, err) + } + } + atomic.AddInt64(count, 1) } - _, err := coll.InsertOne(ctx, res) - return err - } else { - return err } - }) - if err != nil { - return err + if cursor == 0 { + return nil + } } - fmt.Println("convert seq conversation success") - return SetVersion(versionColl, seqKey, seqVersion) } func CheckVersion(coll *mongo.Collection, key string, currentVersion int) (converted bool, err error) { diff --git a/tools/seq/main.go b/tools/seq/main.go index 6bb4b7657..399e6d934 100644 --- a/tools/seq/main.go +++ b/tools/seq/main.go @@ -12,10 +12,11 @@ func main() { config string second int ) - flag.StringVar(&config, "c", "", "config directory") + flag.StringVar(&config, "c", "/Users/chao/Desktop/project/open-im-server/config", "config directory") flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion") flag.Parse() if err := internal.Main(config, time.Duration(second)*time.Second); err != nil { fmt.Println("seq task", err) } + fmt.Println("seq task success!") }