pull/2393/head
withchao 1 year ago
parent 8e3890ed88
commit 9b043feca7

@ -82,7 +82,6 @@ func Start(ctx context.Context, index int, config *Config) error {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := redis.NewMsgCache(rdb)
seqModel := redis.NewSeqCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
@ -92,7 +91,12 @@ func Start(ctx context.Context, index int, config *Config) error {
return err
}
seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig)
seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB())
if err != nil {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
if err != nil {
return err
}

@ -86,7 +86,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
return err
}
msgModel := redis.NewMsgCache(rdb)
seqModel := redis.NewSeqCache(rdb)
conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
@ -96,7 +95,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
return err
}
seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig)
seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB())
if err != nil {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
if err != nil {
return err
}

@ -1,38 +1,30 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cachekey
const (
MaxSeq = "MAX_SEQ:"
MinSeq = "MIN_SEQ:"
ConversationUserMinSeq = "CON_USER_MIN_SEQ:"
HasReadSeq = "HAS_READ_SEQ:"
MallocSeq = "MALLOC_SEQ:"
MallocMinSeqLock = "MALLOC_MIN_SEQ:"
SeqUserMaxSeq = "SEQ_USER_MAX:"
SeqUserMinSeq = "SEQ_USER_MIN:"
SeqUserReadSeq = "SEQ_USER_READ:"
)
func GetMaxSeqKey(conversationID string) string {
return MaxSeq + conversationID
func GetMallocSeqKey(conversationID string) string {
return MallocSeq + conversationID
}
func GetMallocMinSeqKey(conversationID string) string {
return MallocMinSeqLock + conversationID
}
func GetMinSeqKey(conversationID string) string {
return MinSeq + conversationID
func GetSeqUserMaxSeqKey(conversationID string, userID string) string {
return SeqUserMaxSeq + conversationID + ":" + userID
}
func GetHasReadSeqKey(conversationID string, userID string) string {
return HasReadSeq + userID + ":" + conversationID
func GetSeqUserMinSeqKey(conversationID string, userID string) string {
return SeqUserMinSeq + conversationID + ":" + userID
}
func GetConversationUserMinSeqKey(conversationID, userID string) string {
return ConversationUserMinSeq + conversationID + "u:" + userID
func GetSeqUserReadSeqKey(conversationID string, userID string) string {
return SeqUserReadSeq + conversationID + ":" + userID
}

@ -1,14 +0,0 @@
package cachekey
const (
MallocSeq = "MALLOC_SEQ:"
MallocMinSeqLock = "MALLOC_MIN_SEQ:"
)
func GetMallocSeqKey(conversationID string) string {
return MallocSeq + conversationID
}
func GetMallocMinSeqKey(conversationID string) string {
return MallocMinSeqLock + conversationID
}

@ -1,183 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
)
func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache {
return &seqCache{rdb: rdb}
}
type seqCache struct {
rdb redis.UniversalClient
}
func (c *seqCache) getMaxSeqKey(conversationID string) string {
return cachekey.GetMaxSeqKey(conversationID)
}
func (c *seqCache) getMinSeqKey(conversationID string) string {
return cachekey.GetMinSeqKey(conversationID)
}
func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string {
return cachekey.GetHasReadSeqKey(conversationID, userID)
}
func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return cachekey.GetConversationUserMinSeqKey(conversationID, userID)
}
func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
for i, v := range items {
res, err := c.rdb.Get(ctx, getkey(v)).Result()
if err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
val := stringutil.StringToInt64(res)
if val != 0 {
m[items[i]] = val
}
}
return m, nil
}
//func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
// return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
//}
//
//func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
// return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
//}
//
//func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
// return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
//}
//
//func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
// return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
//}
//
//func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
// for conversationID, seq := range seqs {
// if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
// return errs.Wrap(err)
// }
// }
// return nil
//}
//
//
//func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
// return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
//}
//
//func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
// return c.getSeq(ctx, conversationID, c.getMinSeqKey)
//}
//
//func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
// return c.setSeqs(ctx, seqs, c.getMinSeqKey)
//}
func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs {
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
}

@ -0,0 +1,89 @@
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser {
return &seqUserCacheRedis{
rdb: rdb,
mgo: mgo,
readSeqWriteRatio: 100,
expireTime: time.Hour * 24 * 7,
readExpireTime: time.Hour * 24 * 30,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
}
}
type seqUserCacheRedis struct {
rdb redis.UniversalClient
mgo database.SeqUser
rocks *rockscache.Client
expireTime time.Duration
readExpireTime time.Duration
readSeqWriteRatio int64
}
func (s *seqUserCacheRedis) getSeqUserMaxSeqKey(conversationID string, userID string) string {
return cachekey.GetSeqUserMaxSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) getSeqUserMinSeqKey(conversationID string, userID string) string {
return cachekey.GetSeqUserMinSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID string) string {
return cachekey.GetSeqUserReadSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
}
func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMinSeqKey(conversationID, userID))
}
func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
return nil
}

@ -1,30 +0,0 @@
package cache
import (
"context"
)
type SeqCache interface {
//SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
//GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
//GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
//SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
//SetMinSeqs(ctx context.Context, seqs map[string]int64) error
//GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
//GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
// seqs map: key userID value minSeq
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// seqs map: key conversationID value minSeq
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
// has read seq
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
// k: user, v: seq
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
// k: conversation, v :seq
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
}

@ -0,0 +1,12 @@
package cache
import "context"
type SeqUser interface {
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
}

@ -108,7 +108,7 @@ type CommonMsgDatabase interface {
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
}
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cache.SeqCache, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil, err
@ -128,7 +128,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cach
return &commonMsgDatabase{
msgDocDatabase: msgDocModel,
msg: msg,
seq: seq,
seqUser: seqUser,
seqConversation: seqConversation,
producer: producerToRedis,
producerToMongo: producerToMongo,
@ -140,8 +140,8 @@ type commonMsgDatabase struct {
msgDocDatabase database.Msg
msgTable model.MsgDocModel
msg cache.MsgCache
seq cache.SeqCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producer *kafka.Producer
producerToMongo *kafka.Producer
producerToPush *kafka.Producer
@ -339,6 +339,15 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
return nil
}
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
lenList := len(msgs)
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
@ -368,7 +377,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
} else {
prommetrics.MsgInsertRedisSuccessCounter.Inc()
}
err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap)
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc()
@ -496,7 +505,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
// "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group.
// This ensures that their message retrieval starts from the point they joined.
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
}
@ -574,7 +583,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
}
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
}
@ -672,12 +681,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 {
userMinSeq := seqs[len(seqs)-1] + 1
currentUserMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
currentUserMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
if err != nil {
return nil, err
}
if currentUserMinSeq < userMinSeq {
if err := db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
}
}
@ -832,40 +841,45 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int
return nil
}
func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
}
func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) {
return db.seq.GetConversationUserMinSeqs(ctx, conversationID, userIDs)
}
func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq)
}
func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return db.seq.SetConversationUserMinSeqs(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
return db.seq.SetUserConversationsMinSeqs(ctx, userID, seqs)
for conversationID, seq := range seqs {
if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
return nil
}
func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return db.seq.UserSetHasReadSeqs(ctx, userID, hasReadSeqs)
for conversationID, seq := range hasReadSeqs {
if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
return nil
}
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seq.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq)
return db.seqUser.SetReadSeq(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seq.GetHasReadSeqs(ctx, userID, conversationIDs)
cSeq := make(map[string]int64)
for _, conversationID := range conversationIDs {
if _, ok := cSeq[conversationID]; ok {
continue
}
seq, err := db.seqUser.GetReadSeq(ctx, conversationID, userID)
if err != nil {
return nil, err
}
cSeq[conversationID] = seq
}
return cSeq, nil
}
func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return db.seq.GetHasReadSeq(ctx, userID, conversationID)
return db.seqUser.GetReadSeq(ctx, conversationID, userID)
}
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {

@ -28,6 +28,26 @@ type seqConversationMongo struct {
coll *mongo.Collection
}
func (s *seqConversationMongo) setSeq(ctx context.Context, conversationID string, seq int64, field string) error {
filter := map[string]any{
"conversation_id": conversationID,
}
insert := bson.M{
"conversation_id": conversationID,
"min_seq": 0,
"max_seq": 0,
}
delete(insert, field)
update := map[string]any{
"$set": bson.M{
field: seq,
},
"$setOnInsert": insert,
}
opt := options.Update().SetUpsert(true)
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt)
}
func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
if size < 0 {
return 0, errors.New("size must be greater than 0")
@ -48,16 +68,8 @@ func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string
return lastSeq - size, nil
}
func (s *seqConversationMongo) MallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) {
first, err := s.Malloc(ctx, conversationID, size)
if err != nil {
return nil, err
}
seqs := make([]int64, 0, size)
for i := int64(0); i < size; i++ {
seqs = append(seqs, first+i+1)
}
return seqs, nil
func (s *seqConversationMongo) SetMaxSeq(ctx context.Context, conversationID string, seq int64) error {
return s.setSeq(ctx, conversationID, seq, "max_seq")
}
func (s *seqConversationMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
@ -83,7 +95,7 @@ func (s *seqConversationMongo) GetMinSeq(ctx context.Context, conversationID str
}
func (s *seqConversationMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
return mongoutil.UpdateOne(ctx, s.coll, bson.M{"conversation_id": conversationID}, bson.M{"$set": bson.M{"min_seq": seq}}, false)
return s.setSeq(ctx, conversationID, seq, "min_seq")
}
func (s *seqConversationMongo) GetConversation(ctx context.Context, conversationID string) (*model.SeqConversation, error) {

@ -15,19 +15,23 @@ func Result[V any](val V, err error) V {
return val
}
func TestName(t *testing.T) {
cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
tmp, err := NewSeqConversationMongo(cli.Database("openim_v3"))
if err != nil {
panic(err)
}
for i := 0; i < 10; i++ {
var size int64 = 100
firstSeq, err := tmp.Malloc(context.Background(), "1", size)
if err != nil {
t.Log(err)
return
}
t.Logf("%d -> %d", firstSeq, firstSeq+size-1)
}
func Mongodb() *mongo.Database {
return Result(
mongo.Connect(context.Background(),
options.Client().
ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").
SetConnectTimeout(5*time.Second)),
).Database("openim_v3")
}
func TestUserSeq(t *testing.T) {
uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4))
}
func TestConversationSeq(t *testing.T) {
cSeq := Result(NewSeqConversationMongo(Mongodb())).(*seqConversationMongo)
t.Log(cSeq.SetMaxSeq(context.Background(), "2000", 10))
t.Log(cSeq.Malloc(context.Background(), "2000", 10))
t.Log(cSeq.GetMaxSeq(context.Background(), "2000"))
}

@ -2,6 +2,7 @@ package mgo
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
@ -10,7 +11,7 @@ import (
)
func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) {
coll := db.Collection(database.SeqConversationName)
coll := db.Collection(database.SeqUserName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "user_id", Value: 1},
@ -27,45 +28,65 @@ type seqUserMongo struct {
coll *mongo.Collection
}
func (s *seqUserMongo) setSeq(ctx context.Context, userID string, conversationID string, seq int64, field string) error {
func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error {
filter := map[string]any{
"user_id": userID,
"conversation_id": conversationID,
}
insert := bson.M{
"user_id": userID,
"conversation_id": conversationID,
"min_seq": 0,
"max_seq": 0,
"read_seq": 0,
}
delete(insert, field)
update := map[string]any{
"$set": map[string]any{"field": int64(0)},
"$set": bson.M{
field: seq,
},
"$setOnInsert": insert,
}
opt := options.Update().SetUpsert(true)
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt)
}
func (s *seqUserMongo) GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) {
filter := map[string]any{
"user_id": userID,
"conversation_id": conversationID,
}
opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1})
seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt)
if err == nil {
return seq, nil
} else if errors.Is(err, mongo.ErrNoDocuments) {
return 0, nil
} else {
return 0, err
}
}
//TODO implement me
panic("implement me")
func (s *seqUserMongo) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "max_seq")
}
func (s *seqUserMongo) SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error {
//TODO implement me
panic("implement me")
func (s *seqUserMongo) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "max_seq")
}
func (s *seqUserMongo) GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
//TODO implement me
panic("implement me")
func (s *seqUserMongo) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "min_seq")
}
func (s *seqUserMongo) SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error {
//TODO implement me
panic("implement me")
func (s *seqUserMongo) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "min_seq")
}
func (s *seqUserMongo) GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
//TODO implement me
panic("implement me")
func (s *seqUserMongo) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "read_seq")
}
func (s *seqUserMongo) SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error {
//TODO implement me
panic("implement me")
func (s *seqUserMongo) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
}

@ -5,6 +5,7 @@ import "context"
type SeqConversation interface {
Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
}

@ -3,10 +3,10 @@ package database
import "context"
type SeqUser interface {
GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error)
SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error
GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error
GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
}

@ -1,30 +1,38 @@
package internal
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"gopkg.in/yaml.v3"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
const (
MaxSeq = "MAX_SEQ:"
MinSeq = "MIN_SEQ:"
ConversationUserMinSeq = "CON_USER_MIN_SEQ:"
HasReadSeq = "HAS_READ_SEQ:"
)
const (
batchSize = 100
dataVersionCollection = "data_version"
@ -44,48 +52,6 @@ func readConfig[T any](dir string, name string) (*T, error) {
return &conf, nil
}
func redisKey(rdb redis.UniversalClient, prefix string, del time.Duration, fn func(ctx context.Context, key string, delKey map[string]struct{}) error) error {
var (
cursor uint64
keys []string
err error
)
ctx := context.Background()
for {
keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result()
if err != nil {
return err
}
delKey := make(map[string]struct{})
if len(keys) > 0 {
for _, key := range keys {
if err := fn(ctx, key, delKey); err != nil {
return err
}
}
}
if len(delKey) > 0 {
delKeys := datautil.Keys(delKey)
if del < time.Second {
if err := rdb.Del(ctx, datautil.Keys(delKey)...).Err(); err != nil {
return err
}
} else {
pipe := rdb.Pipeline()
for _, key := range delKeys {
pipe.Expire(ctx, key, del)
}
if _, err := pipe.Exec(ctx); err != nil {
return err
}
}
}
if cursor == 0 {
return nil
}
}
}
func Main(conf string, del time.Duration) error {
redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName)
if err != nil {
@ -117,71 +83,218 @@ func Main(conf string, del time.Duration) error {
if _, err := mgo.NewSeqConversationMongo(mgocli.GetDB()); err != nil {
return err
}
coll := mgocli.GetDB().Collection(database.SeqConversationName)
const prefix = cachekey.MaxSeq
fmt.Println("start to convert seq conversation")
err = redisKey(rdb, prefix, del, func(ctx context.Context, key string, delKey map[string]struct{}) error {
conversationId := strings.TrimPrefix(key, prefix)
delKey[key] = struct{}{}
maxValue, err := rdb.Get(ctx, key).Result()
cSeq, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
}
uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB())
if err != nil {
return err
}
uSpitHasReadSeq := func(id string) (conversationID string, userID string, err error) {
// HasReadSeq + userID + ":" + conversationID
arr := strings.Split(id, ":")
if len(arr) != 2 || arr[0] == "" || arr[1] == "" {
return "", "", fmt.Errorf("invalid has read seq id %s", id)
}
userID = arr[0]
conversationID = arr[1]
return
}
uSpitConversationUserMinSeq := func(id string) (conversationID string, userID string, err error) {
// ConversationUserMinSeq + conversationID + "u:" + userID
arr := strings.Split(id, "u:")
if len(arr) != 2 || arr[0] == "" || arr[1] == "" {
return "", "", fmt.Errorf("invalid has read seq id %s", id)
}
conversationID = arr[0]
userID = arr[1]
return
}
ts := []*taskSeq{
{
Prefix: MaxSeq,
GetSeq: cSeq.GetMaxSeq,
SetSeq: cSeq.SetMinSeq,
},
{
Prefix: MinSeq,
GetSeq: cSeq.GetMinSeq,
SetSeq: cSeq.SetMinSeq,
},
{
Prefix: HasReadSeq,
GetSeq: func(ctx context.Context, id string) (int64, error) {
conversationID, userID, err := uSpitHasReadSeq(id)
if err != nil {
return 0, err
}
return uSeq.GetReadSeq(ctx, conversationID, userID)
},
SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitHasReadSeq(id)
if err != nil {
return err
}
seq, err := strconv.Atoi(maxValue)
return uSeq.SetReadSeq(ctx, conversationID, userID, seq)
},
},
{
Prefix: ConversationUserMinSeq,
GetSeq: func(ctx context.Context, id string) (int64, error) {
conversationID, userID, err := uSpitConversationUserMinSeq(id)
if err != nil {
return fmt.Errorf("invalid max seq %s", maxValue)
return 0, err
}
return uSeq.GetMinSeq(ctx, conversationID, userID)
},
SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitConversationUserMinSeq(id)
if err != nil {
return err
}
return uSeq.SetMinSeq(ctx, conversationID, userID, seq)
},
},
}
cancel()
ctx = context.Background()
var wg sync.WaitGroup
wg.Add(len(ts))
for i := range ts {
go func(task *taskSeq) {
defer wg.Done()
err := seqRedisToMongo(ctx, rdb, task.GetSeq, task.SetSeq, task.Prefix, del, &task.Count)
task.End = time.Now()
task.Error = err
}(ts[i])
}
start := time.Now()
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var buf bytes.Buffer
printTaskInfo := func(now time.Time) {
buf.Reset()
buf.WriteString(now.Format(time.DateTime))
buf.WriteString(" \n")
for i := range ts {
task := ts[i]
if task.Error == nil {
if task.End.IsZero() {
buf.WriteString(fmt.Sprintf("[%s] converting %s* count %d", now.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count)))
} else {
buf.WriteString(fmt.Sprintf("[%s] success %s* count %d", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count)))
}
} else {
buf.WriteString(fmt.Sprintf("[%s] failed %s* count %d error %s", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count), task.Error))
}
buf.WriteString("\n")
}
fmt.Println(buf.String())
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case s := <-sigs:
return fmt.Errorf("exit by signal %s", s)
case <-done:
errs := make([]error, 0, len(ts))
for i := range ts {
task := ts[i]
if task.Error != nil {
errs = append(errs, fmt.Errorf("seq %s failed %w", task.Prefix, task.Error))
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
printTaskInfo(time.Now())
if err := SetVersion(versionColl, seqKey, seqVersion); err != nil {
return fmt.Errorf("set mongodb seq version %w", err)
}
if seq == 0 {
return nil
case now := <-ticker.C:
printTaskInfo(now)
}
if seq < 0 {
return fmt.Errorf("invalid max seq %s", maxValue)
}
}
type taskSeq struct {
Prefix string
Count int64
Error error
End time.Time
GetSeq func(ctx context.Context, id string) (int64, error)
SetSeq func(ctx context.Context, id string, seq int64) error
}
func seqRedisToMongo(ctx context.Context, rdb redis.UniversalClient, getSeq func(ctx context.Context, id string) (int64, error), setSeq func(ctx context.Context, id string, seq int64) error, prefix string, delAfter time.Duration, count *int64) error {
var (
minSeq int64
maxSeq = int64(seq)
cursor uint64
keys []string
err error
)
minKey := cachekey.MinSeq + conversationId
delKey[minKey] = struct{}{}
minValue, err := rdb.Get(ctx, minKey).Result()
if err == nil {
seq, err := strconv.Atoi(minValue)
for {
keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result()
if err != nil {
return err
}
if len(keys) > 0 {
for _, key := range keys {
seqStr, err := rdb.Get(ctx, key).Result()
if err != nil {
return fmt.Errorf("redis get %s failed %w", key, err)
}
seq, err := strconv.Atoi(seqStr)
if err != nil {
return fmt.Errorf("invalid min seq %s", minValue)
return fmt.Errorf("invalid %s seq %s", key, seqStr)
}
if seq < 0 {
return fmt.Errorf("invalid min seq %s", minValue)
return fmt.Errorf("invalid %s seq %s", key, seqStr)
}
minSeq = int64(seq)
} else if !errors.Is(err, redis.Nil) {
return err
id := strings.TrimPrefix(key, prefix)
redisSeq := int64(seq)
mongoSeq, err := getSeq(ctx, id)
if err != nil {
return fmt.Errorf("get mongo seq %s failed %w", key, err)
}
if maxSeq < minSeq {
return fmt.Errorf("invalid max seq %d < min seq %d", maxSeq, minSeq)
if mongoSeq < redisSeq {
if err := setSeq(ctx, id, redisSeq); err != nil {
return fmt.Errorf("set mongo seq %s failed %w", key, err)
}
res, err := mongoutil.FindOne[*model.SeqConversation](ctx, coll, bson.M{"conversation_id": conversationId}, nil)
if err == nil {
if res.MaxSeq < int64(seq) {
_, err = coll.UpdateOne(ctx, bson.M{"conversation_id": conversationId}, bson.M{"$set": bson.M{"max_seq": maxSeq, "min_seq": minSeq}})
}
return err
} else if errors.Is(err, mongo.ErrNoDocuments) {
res = &model.SeqConversation{
ConversationID: conversationId,
MaxSeq: maxSeq,
MinSeq: minSeq,
if delAfter > 0 {
if err := rdb.Expire(ctx, key, delAfter).Err(); err != nil {
return fmt.Errorf("redis expire key %s failed %w", key, err)
}
_, err := coll.InsertOne(ctx, res)
return err
} else {
return err
if err := rdb.Del(ctx, key).Err(); err != nil {
return fmt.Errorf("redis del key %s failed %w", key, err)
}
}
atomic.AddInt64(count, 1)
}
}
if cursor == 0 {
return nil
}
})
if err != nil {
return err
}
fmt.Println("convert seq conversation success")
return SetVersion(versionColl, seqKey, seqVersion)
}
func CheckVersion(coll *mongo.Collection, key string, currentVersion int) (converted bool, err error) {

@ -12,10 +12,11 @@ func main() {
config string
second int
)
flag.StringVar(&config, "c", "", "config directory")
flag.StringVar(&config, "c", "/Users/chao/Desktop/project/open-im-server/config", "config directory")
flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion")
flag.Parse()
if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
fmt.Println("seq task", err)
}
fmt.Println("seq task success!")
}

Loading…
Cancel
Save