diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 65d04f381..e90c2288c 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -87,7 +87,12 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) + seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig) if err != nil { return err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index f1fb28fff..7cffb23a9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -91,7 +91,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) + seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) + if err != nil { + return err + } + seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig) if err != nil { return err } diff --git a/pkg/common/storage/cache/cachekey/seq.go b/pkg/common/storage/cache/cachekey/seq.go index 3f0ce98a4..ded0286d8 100644 --- a/pkg/common/storage/cache/cachekey/seq.go +++ b/pkg/common/storage/cache/cachekey/seq.go @@ -15,24 +15,24 @@ package cachekey const ( - maxSeq = "MAX_SEQ:" - minSeq = "MIN_SEQ:" - conversationUserMinSeq = "CON_USER_MIN_SEQ:" - hasReadSeq = "HAS_READ_SEQ:" + MaxSeq = "MAX_SEQ:" + MinSeq = "MIN_SEQ:" + ConversationUserMinSeq = "CON_USER_MIN_SEQ:" + HasReadSeq = "HAS_READ_SEQ:" ) func GetMaxSeqKey(conversationID string) string { - return maxSeq + conversationID + return MaxSeq + conversationID } func GetMinSeqKey(conversationID string) string { - return minSeq + conversationID + return MinSeq + conversationID } func GetHasReadSeqKey(conversationID string, userID string) string { - return hasReadSeq + userID + ":" + conversationID + return HasReadSeq + userID + ":" + conversationID } func GetConversationUserMinSeqKey(conversationID, userID string) string { - return conversationUserMinSeq + conversationID + "u:" + userID + return ConversationUserMinSeq + conversationID + "u:" + userID } diff --git a/pkg/common/storage/cache/cachekey/seq1.go b/pkg/common/storage/cache/cachekey/seq1.go index df3086d32..04274db55 100644 --- a/pkg/common/storage/cache/cachekey/seq1.go +++ b/pkg/common/storage/cache/cachekey/seq1.go @@ -2,7 +2,6 @@ package cachekey const ( MallocSeq = "MALLOC_SEQ:" - MallocSeqLock = "MALLOC_SEQ_LOCK:" MallocMinSeqLock = "MALLOC_MIN_SEQ:" ) @@ -10,10 +9,6 @@ func GetMallocSeqKey(conversationID string) string { return MallocSeq + conversationID } -func GetMallocSeqLockKey(conversationID string) string { - return MallocSeqLock + conversationID -} - func GetMallocMinSeqKey(conversationID string) string { return MallocMinSeqLock + conversationID } diff --git a/pkg/common/storage/cache/redis/seq.md b/pkg/common/storage/cache/redis/seq.md deleted file mode 100644 index 9aa66a326..000000000 --- a/pkg/common/storage/cache/redis/seq.md +++ /dev/null @@ -1,36 +0,0 @@ - -### mongo -```go -type Seq struct { - ConversationID string `bson:"conversation_id"` - MaxSeq int64 `bson:"max_seq"` - MinSeq int64 `bson:"min_seq"` -} -``` - -```go -type Seq interface { - Malloc(ctx context.Context, conversationID string, size int64) (int64, error) - GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - GetMinSeq(ctx context.Context, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, conversationID string, seq int64) error -} -``` - -1. Malloc 申请seq数量,返回的已有seq的最大值,消息用的第一个seq是返回值+1 -2. GetMaxSeq 获取申请的seq的最大值,在发消息的seq小于这个值 -3. GetMinSeq 获取最小的seq,用于拉取历史消息 -4. SetMinSeq 设置最小的seq,用于拉取历史消息 - -### redis -```go -type RedisSeq struct { - Curr int64 // 当前的最大seq - Last int64 // mongodb中申请的最大seq - Lock *int64 // 锁,用于在mongodb中申请seq -} -``` - -1. Malloc 申请seq数量,返回的已有seq的最大值,消息用的第一个seq是返回值+1,如果redis中申请数量够用,直接返回,并自增对应数量。如果redis中申请数量不够用,加锁,从mongodb中申请seq。 -2. GetMaxSeq 获取已发消息的最大seq就是Curr的值。如果redis中缓存不存在就通过mongodb获取最大seq。存储在redis中。其中Curr和Last都是这个seq值。 -3. GetMinSeq, SetMinSeq用之前rockscache的方案。 \ No newline at end of file diff --git a/pkg/common/storage/cache/redis/seq1.go.txt b/pkg/common/storage/cache/redis/seq1.go.txt deleted file mode 100644 index 9ccb83e66..000000000 --- a/pkg/common/storage/cache/redis/seq1.go.txt +++ /dev/null @@ -1,246 +0,0 @@ -package redis - -import ( - "context" - "errors" - "fmt" - "github.com/dtm-labs/rockscache" - "github.com/google/uuid" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/tools/errs" - "github.com/redis/go-redis/v9" - "strconv" - "time" -) - -var errLock = errors.New("lock failed") - -type MallocSeq interface { - Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) - GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - GetMinSeq(ctx context.Context, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, conversationID string, seq int64) error -} - -func NewSeqCache1(rdb redis.UniversalClient, mgo database.Seq) MallocSeq { - opt := rockscache.NewDefaultOptions() - opt.EmptyExpire = time.Second * 3 - opt.Delay = time.Second / 2 - return &seqCache1{ - rdb: rdb, - mgo: mgo, - rocks: rockscache.NewClient(rdb, opt), - lockExpire: time.Minute * 1, - seqExpire: time.Hour * 24 * 7, - minSeqExpire: time.Hour * 1, - groupMinNum: 1000, - userMinNum: 100, - } -} - -type seqCache1 struct { - rdb redis.UniversalClient - rocks *rockscache.Client - mgo database.Seq - lockExpire time.Duration - seqExpire time.Duration - minSeqExpire time.Duration - groupMinNum int64 - userMinNum int64 -} - -/* -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -*/ - -func (s *seqCache1) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - for i := 0; i < 10; i++ { - res, err := s.rdb.LIndex(ctx, cachekey.GetMallocSeqKey(conversationID), 0).Int64() - if err == nil { - return res, nil - } else if !errors.Is(err, redis.Nil) { - return 0, errs.Wrap(err) - } - - if err := s.mallocSeq(ctx, conversationID, 0, nil); err != nil { - return 0, err - } - } - return 0, errs.New("get max seq timeout") -} - -func (s *seqCache1) unlock(ctx context.Context, key string, owner string) error { - script := ` -local value = redis.call("GET", KEYS[1]) -if value == false then - return 0 -end -if value == ARGV[1] then - redis.call("DEL", KEYS[1]) - return 1 -end -return 2 -` - state, err := s.rdb.Eval(ctx, script, []string{key}, owner).Int() - if err != nil { - return errs.Wrap(err) - } - switch state { - case 0: - return errs.Wrap(redis.Nil) - case 1: - return nil - case 2: - return errs.New("not the lock holder") - default: - return errs.New(fmt.Sprintf("unknown state: %d", state)) - } -} - -func (s *seqCache1) initMallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) { - owner := uuid.New().String() - ok, err := s.rdb.SetNX(ctx, cachekey.GetMallocSeqLockKey(conversationID), owner, s.lockExpire).Result() - if err != nil { - return nil, err - } - - seq, err := s.mgo.Malloc(ctx, conversationID, size) - if err != nil { - return nil, err - } - seqs := make([]int64, 0, size) - for i := seq - size + 1; i <= seq; i++ { - seqs = append(seqs, i) - } - return seqs, nil -} - -func (s *seqCache1) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return getCache[int64](ctx, s.rocks, cachekey.GetMallocMinSeqKey(conversationID), s.minSeqExpire, func(ctx context.Context) (int64, error) { - return s.mgo.GetMinSeq(ctx, conversationID) - }) -} - -func (s *seqCache1) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { - if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil { - return err - } - return s.deleteMinSeqCache(ctx, conversationID) -} - -func (s *seqCache1) Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) { - if size <= 0 { - return nil, errs.Wrap(errors.New("size must be greater than 0")) - } - seqKey := cachekey.GetMallocSeqKey(conversationID) - lockKey := cachekey.GetMallocSeqLockKey(conversationID) - for i := 0; i < 10; i++ { - seqs, err := s.lpop(ctx, seqKey, lockKey, size) - if err != nil { - return nil, err - } - if len(seqs) < int(size) { - if err := s.mallocSeq(ctx, conversationID, size, &seqs); err != nil { - return nil, err - } - } - if len(seqs) >= int(size) { - return seqs, nil - } - } - return nil, errs.ErrInternalServer.WrapMsg("malloc seq failed") -} - -func (s *seqCache1) push(ctx context.Context, seqKey string, seqs []int64) error { - script := ` -redis.call("DEL", KEYS[1]) -for i = 2, #ARGV do - redis.call("RPUSH", KEYS[1], ARGV[i]) -end -redis.call("EXPIRE", KEYS[1], ARGV[1]) -return 1 -` - argv := make([]any, 0, 1+len(seqs)) - argv = append(argv, s.seqExpire.Seconds()) - for _, seq := range seqs { - argv = append(argv, seq) - } - err := s.rdb.Eval(ctx, script, []string{seqKey}, argv...).Err() - return errs.Wrap(err) -} - -func (s *seqCache1) lpop(ctx context.Context, seqKey, lockKey string, size int64) ([]int64, error) { - script := ` -local result = redis.call("LRANGE", KEYS[1], 0, ARGV[1]-1) -if #result == 0 then - return result -end -redis.call("LTRIM", KEYS[1], #result, -1) -if redis.call("LLEN", KEYS[1]) == 0 then - redis.call("DEL", KEYS[2]) -end -return result -` - res, err := s.rdb.Eval(ctx, script, []string{seqKey, lockKey}, size).Int64Slice() - if err != nil { - return nil, errs.Wrap(err) - } - return res, nil -} - -func (s *seqCache1) getMongoStepSize(conversationID string, size int64) int64 { - var num int64 - if msgprocessor.IsGroupConversationID(conversationID) { - num = s.groupMinNum - } else { - num = s.userMinNum - } - if size > num { - num += size - } - return num -} - -func (s *seqCache1) mallocSeq(ctx context.Context, conversationID string, size int64, seqs *[]int64) error { - var delMinSeqKey bool - _, err := getCache[string](ctx, s.rocks, cachekey.GetMallocSeqLockKey(conversationID), s.lockExpire, func(ctx context.Context) (string, error) { - res, err := s.mgo.Malloc(ctx, conversationID, s.getMongoStepSize(conversationID, size)) - if err != nil { - return "", err - } - delMinSeqKey = res[0] == 1 - if seqs != nil && size > 0 { - if len(*seqs) > 0 && (*seqs)[len(*seqs)-1]+1 == res[0] { - n := size - int64(len(*seqs)) - *seqs = append(*seqs, res[:n]...) - res = res[n:] - } else { - *seqs = res[:size] - res = res[size:] - } - } - if err := s.push(ctx, cachekey.GetMallocSeqKey(conversationID), res); err != nil { - return "", err - } - return strconv.Itoa(int(time.Now().UnixMicro())), nil - }) - if delMinSeqKey { - s.deleteMinSeqCache(ctx, conversationID) - } - return err -} - -func (s *seqCache1) deleteMinSeqCache(ctx context.Context, conversationID string) error { - return s.rocks.TagAsDeleted2(ctx, cachekey.GetMallocMinSeqKey(conversationID)) -} diff --git a/pkg/common/storage/cache/redis/seq1.go b/pkg/common/storage/cache/redis/seq_conversation.go similarity index 68% rename from pkg/common/storage/cache/redis/seq1.go rename to pkg/common/storage/cache/redis/seq_conversation.go index 316dea487..034462fd1 100644 --- a/pkg/common/storage/cache/redis/seq1.go +++ b/pkg/common/storage/cache/redis/seq_conversation.go @@ -3,74 +3,68 @@ package redis import ( "context" "fmt" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" "time" ) -type RedisHash struct { - NextSeq int64 - LastSeq int64 +func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache { + return &seqConversationCacheRedis{ + rdb: rdb, + mgo: mgo, + lockTime: time.Second * 3, + dataTime: time.Hour * 24 * 365, + minSeqExpireTime: time.Hour, + rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + } } -func NewTestSeq() *SeqMalloc { - mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) - if err != nil { - panic(err) - } - model, err := mgo.NewSeqMongo(mgocli.Database("openim_v3")) - if err != nil { - panic(err) - } - opt := &redis.Options{ - Addr: "172.16.8.48:16379", - Password: "openIM123", - DB: 1, - } - rdb := redis.NewClient(opt) - if err := rdb.Ping(context.Background()).Err(); err != nil { - panic(err) - } - return &SeqMalloc{ - rdb: rdb, - mgo: model, - //lockTime: time.Second * 30, - lockTime: time.Second * 60 * 60 * 24 * 1, - dataTime: time.Second * 60 * 60 * 24 * 7, - } +type seqConversationCacheRedis struct { + rdb redis.UniversalClient + mgo database.SeqConversation + rocks *rockscache.Client + lockTime time.Duration + dataTime time.Duration + minSeqExpireTime time.Duration +} + +func (s *seqConversationCacheRedis) getMinSeqKey(conversationID string) string { + return cachekey.GetMallocMinSeqKey(conversationID) } -type RedisSeq struct { - Curr int64 - Last int64 - Lock *int64 +func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { + if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil { + return err + } + if err := s.rocks.TagAsDeleted2(ctx, s.getMinSeqKey(conversationID)); err != nil { + return errs.Wrap(err) + } + return nil } -type SeqMalloc struct { - rdb redis.UniversalClient - mgo database.Seq - lockTime time.Duration - dataTime time.Duration +func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) { + return s.mgo.GetMinSeq(ctx, conversationID) + }) } -func (s *SeqMalloc) getSeqMallocKey(conversationID string) string { +func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) string { return cachekey.GetMallocSeqKey(conversationID) } -func (s *SeqMalloc) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) { +func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) { if lastSeq < currSeq { return 0, errs.New("lastSeq must be greater than currSeq") } - // 0: 成功 - // 1: 成功 锁过期,但未被其他人锁 - // 2: 已经被锁,但是锁的不是自己 + // 0: success + // 1: success the lock has expired, but has not been locked by anyone else + // 2: already locked, but not by yourself script := ` local key = KEYS[1] local lockValue = ARGV[1] @@ -97,12 +91,12 @@ return 0 return result, nil } -// malloc size=0为获取当前seq size>0为分配seq -func (s *SeqMalloc) malloc(ctx context.Context, key string, size int64) ([]int64, error) { - // 0: 成功 - // 1: 需要获取,并加锁 - // 2: 已经被锁 - // 3: 超过最大值,并加锁 +// malloc size=0 is to get the current seq size>0 is to allocate seq +func (s *seqConversationCacheRedis) malloc(ctx context.Context, key string, size int64) ([]int64, error) { + // 0: success + // 1: need to obtain and lock + // 2: already locked + // 3: exceeded the maximum value and locked script := ` local key = KEYS[1] local size = tonumber(ARGV[1]) @@ -156,7 +150,7 @@ return result return result, nil } -func (s *SeqMalloc) wait(ctx context.Context) error { +func (s *seqConversationCacheRedis) wait(ctx context.Context) error { timer := time.NewTimer(time.Second / 4) defer timer.Stop() select { @@ -167,7 +161,7 @@ func (s *SeqMalloc) wait(ctx context.Context) error { } } -func (s *SeqMalloc) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) { +func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) { for i := 0; i < 10; i++ { state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq) if err != nil { @@ -191,13 +185,13 @@ func (s *SeqMalloc) setSeqRetry(ctx context.Context, key string, owner int64, cu log.ZError(ctx, "set seq cache retrying still failed", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq) } -func (s *SeqMalloc) getMallocSize(conversationID string, size int64) int64 { +func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size int64) int64 { if size == 0 { return 0 } var basicSize int64 if msgprocessor.IsGroupConversationID(conversationID) { - basicSize = 200 + basicSize = 100 } else { basicSize = 50 } @@ -205,7 +199,7 @@ func (s *SeqMalloc) getMallocSize(conversationID string, size int64) int64 { return basicSize } -func (s *SeqMalloc) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { +func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { if size < 0 { return 0, errs.New("size must be greater than 0") } @@ -227,7 +221,6 @@ func (s *SeqMalloc) Malloc(ctx context.Context, conversationID string, size int6 s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) return seq, nil case 2: // locked - fmt.Println("locked----->", "conversationID", conversationID, "size", size) if err := s.wait(ctx); err != nil { return 0, err } @@ -257,6 +250,6 @@ func (s *SeqMalloc) Malloc(ctx context.Context, conversationID string, size int6 return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) } -func (s *SeqMalloc) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { +func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { return s.Malloc(ctx, conversationID, 0) } diff --git a/pkg/common/storage/cache/redis/seq_conversation_test.go b/pkg/common/storage/cache/redis/seq_conversation_test.go new file mode 100644 index 000000000..1a40624b8 --- /dev/null +++ b/pkg/common/storage/cache/redis/seq_conversation_test.go @@ -0,0 +1,109 @@ +package redis + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" +) + +func newTestSeq() *seqConversationCacheRedis { + mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) + if err != nil { + panic(err) + } + model, err := mgo.NewSeqConversationMongo(mgocli.Database("openim_v3")) + if err != nil { + panic(err) + } + opt := &redis.Options{ + Addr: "172.16.8.48:16379", + Password: "openIM123", + DB: 1, + } + rdb := redis.NewClient(opt) + if err := rdb.Ping(context.Background()).Err(); err != nil { + panic(err) + } + return NewSeqConversationCacheRedis(rdb, model).(*seqConversationCacheRedis) +} + +func TestSeq(t *testing.T) { + ts := newTestSeq() + var ( + wg sync.WaitGroup + speed atomic.Int64 + ) + + const count = 128 + wg.Add(count) + for i := 0; i < count; i++ { + index := i + 1 + go func() { + defer wg.Done() + var size int64 = 10 + cID := strconv.Itoa(index * 1) + for i := 1; ; i++ { + //first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo + first, err := ts.Malloc(context.Background(), cID, size) // redis + if err != nil { + t.Logf("[%d-%d] %s %s", index, i, cID, err) + return + } + speed.Add(size) + _ = first + //t.Logf("[%d] %d -> %d", i, first+1, first+size) + } + }() + } + + done := make(chan struct{}) + + go func() { + wg.Wait() + close(done) + }() + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-done: + ticker.Stop() + return + case <-ticker.C: + value := speed.Swap(0) + t.Logf("speed: %d/s", value) + } + } +} + +func TestDel(t *testing.T) { + ts := newTestSeq() + for i := 1; i < 100; i++ { + var size int64 = 100 + first, err := ts.Malloc(context.Background(), "100", size) + if err != nil { + t.Logf("[%d] %s", i, err) + return + } + t.Logf("[%d] %d -> %d", i, first+1, first+size) + time.Sleep(time.Second) + } +} + +func TestSeqMalloc(t *testing.T) { + ts := newTestSeq() + t.Log(ts.GetMaxSeq(context.Background(), "100")) +} + +func TestMinSeq(t *testing.T) { + ts := newTestSeq() + t.Log(ts.GetMinSeq(context.Background(), "10000000")) +} diff --git a/pkg/common/storage/cache/redis/seq_test.go b/pkg/common/storage/cache/redis/seq_test.go deleted file mode 100644 index 6fff6019a..000000000 --- a/pkg/common/storage/cache/redis/seq_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package redis - -import ( - "context" - "strconv" - "sync" - "sync/atomic" - "testing" - "time" -) - -func TestSeq(t *testing.T) { - ts := NewTestSeq() - var ( - wg sync.WaitGroup - speed atomic.Int64 - ) - - const count = 256 - wg.Add(count) - for i := 0; i < count; i++ { - index := i + 1 - go func() { - defer wg.Done() - var size int64 = 1 - cID := strconv.Itoa(index * 100) - for i := 1; ; i++ { - first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo - //first, err := ts.Malloc(context.Background(), cID, size) // redis - if err != nil { - t.Logf("[%d-%d] %s %s", index, i, cID, err) - return - } - speed.Add(size) - _ = first - //t.Logf("[%d] %d -> %d", i, first+1, first+size) - } - }() - } - - done := make(chan struct{}) - - go func() { - wg.Wait() - close(done) - }() - - ticker := time.NewTicker(time.Second) - - for { - select { - case <-done: - ticker.Stop() - return - case <-ticker.C: - value := speed.Swap(0) - t.Logf("speed: %d/s", value) - } - } - - //for i := 1; i < 1000000; i++ { - // var size int64 = 100 - // first, err := ts.Malloc(context.Background(), "1", size) - // if err != nil { - // t.Logf("[%d] %s", i, err) - // return - // } - // t.Logf("[%d] %d -> %d", i, first+1, first+size) - // time.Sleep(time.Second / 4) - //} -} - -func TestDel(t *testing.T) { - ts := NewTestSeq() - t.Log(ts.GetMaxSeq(context.Background(), "1")) - -} diff --git a/pkg/common/storage/cache/seq.go b/pkg/common/storage/cache/seq.go index 091b318c8..46e33c935 100644 --- a/pkg/common/storage/cache/seq.go +++ b/pkg/common/storage/cache/seq.go @@ -5,13 +5,13 @@ import ( ) type SeqCache interface { - SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error - GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error - SetMinSeqs(ctx context.Context, seqs map[string]int64) error - GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMinSeq(ctx context.Context, conversationID string) (int64, error) + //SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error + //GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + //GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + //SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error + //SetMinSeqs(ctx context.Context, seqs map[string]int64) error + //GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + //GetMinSeq(ctx context.Context, conversationID string) (int64, error) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error diff --git a/pkg/common/storage/cache/seq_conversation.go b/pkg/common/storage/cache/seq_conversation.go new file mode 100644 index 000000000..5d38537a9 --- /dev/null +++ b/pkg/common/storage/cache/seq_conversation.go @@ -0,0 +1,10 @@ +package cache + +import "context" + +type SeqConversationCache interface { + Malloc(ctx context.Context, conversationID string, size int64) (int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, seq int64) error + GetMinSeq(ctx context.Context, conversationID string) (int64, error) +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index c5c5462aa..3a29f6786 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -108,7 +108,7 @@ type CommonMsgDatabase interface { DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) } -func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { +func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cache.SeqCache, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) if err != nil { return nil, err @@ -129,28 +129,19 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cach msgDocDatabase: msgDocModel, msg: msg, seq: seq, + seqConversation: seqConversation, producer: producerToRedis, producerToMongo: producerToMongo, producerToPush: producerToPush, }, nil } -//func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) { -// msgDocModel, err := database.NewMsgMongo(database) -// if err != nil { -// return nil, err -// } -// //todo MsgCacheTimeout -// msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) -// seq := cache.NewSeqCache(rdb) -// return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig) -//} - type commonMsgDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel msg cache.MsgCache seq cache.SeqCache + seqConversation cache.SeqConversationCache producer *kafka.Producer producerToMongo *kafka.Producer producerToPush *kafka.Producer @@ -349,10 +340,9 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { - // TODO set SEQ - currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { - log.ZError(ctx, "storage.seq.GetMaxSeq", err) + currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) + if err != nil { + log.ZError(ctx, "storage.seq.Malloc", err) return 0, false, err } lenList := len(msgs) @@ -362,9 +352,6 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa if lenList < 1 { return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() } - if errs.Unwrap(err) == redis.Nil { - isNew = true - } lastMaxSeq := currentMaxSeq userSeqMap := make(map[string]int64) for _, m := range msgs { @@ -380,13 +367,6 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } else { prommetrics.MsgInsertRedisSuccessCounter.Inc() } - - err = db.seq.SetMaxSeq(ctx, conversationID, currentMaxSeq) - if err != nil { - log.ZError(ctx, "storage.seq.SetMaxSeq error", err, "conversationID", conversationID) - prommetrics.SeqSetFailedCounter.Inc() - } - err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap) if err != nil { log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) @@ -519,8 +499,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } - minSeq, err := db.seq.GetMinSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } if userMinSeq > minSeq { @@ -531,8 +511,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end) return 0, 0, nil, nil } - maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq) @@ -572,11 +552,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin var successMsgs []*sdkws.MsgData log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) - if err != nil { - if err != redis.Nil { - - log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) - } + if err != nil && !errors.Is(err, redis.Nil) { + log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) } successMsgs = append(successMsgs, cachedMsgs...) log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs) @@ -600,12 +577,12 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } - minSeq, err := db.seq.GetMinSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } - maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) + if err != nil { return 0, 0, nil, err } if userMinSeq < minSeq { @@ -649,7 +626,7 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont if minSeq == 0 { return nil } - return db.seq.SetMinSeq(ctx, conversationID, minSeq) + return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { @@ -803,7 +780,7 @@ func (db *commonMsgDatabase) DeleteMsgsBySeqs(ctx context.Context, conversationI func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { for _, conversationID := range conversationIDs { - maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) + maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { if err == redis.Nil { log.ZDebug(ctx, "max seq is nil", "conversationID", conversationID) @@ -812,7 +789,7 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u } continue } - if err := db.seq.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { + if err := db.seqConversation.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1) } } @@ -823,29 +800,37 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u //} func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return db.seq.GetMaxSeqs(ctx, conversationIDs) + result := make(map[string]int64) + for _, conversationID := range conversationIDs { + if result[conversationID] != 0 { + continue + } + seq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) + if err != nil { + return nil, err + } + result[conversationID] = seq + } + return result, nil } func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return db.seq.GetMaxSeq(ctx, conversationID) + return db.seqConversation.GetMaxSeq(ctx, conversationID) } func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return db.seq.SetMinSeq(ctx, conversationID, minSeq) + return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { - return db.seq.SetMinSeqs(ctx, seqs) + for conversationID, seq := range seqs { + if err := db.seqConversation.SetMinSeq(ctx, conversationID, seq); err != nil { + return err + } + } + return nil } -//func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { -// return db.seq.GetMinSeqs(ctx, conversationIDs) -//} -// -//func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { -// return db.seq.GetMinSeq(ctx, conversationID) -//} - func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) } @@ -895,11 +880,11 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context if err != nil { return } - minSeqCache, err = db.seq.GetMinSeq(ctx, conversationID) + minSeqCache, err = db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return } - maxSeqCache, err = db.seq.GetMaxSeq(ctx, conversationID) + maxSeqCache, err = db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return } @@ -1011,33 +996,8 @@ func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, d } } -//func (db *commonMsgDatabase) ClearMsg(ctx context.Context, ts int64) (err error) { -// var ( -// docNum int -// msgNum int -// start = time.Now() -// ) -// for { -// msgs, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, 100) -// if err != nil { -// return err -// } -// if len(msgs) == 0 { -// return nil -// } -// for _, msg := range msgs { -// num, err := db.deleteOneMsg(ctx, ts, msg) -// if err != nil { -// return err -// } -// docNum++ -// msgNum += num -// } -// } -//} - func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error { - dbSeq, err := db.seq.GetMinSeq(ctx, conversationID) + dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { if errors.Is(errs.Unwrap(err), redis.Nil) { return nil @@ -1047,5 +1007,5 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin if dbSeq >= seq { return nil } - return db.seq.SetMinSeq(ctx, conversationID, seq) + return db.seqConversation.SetMinSeq(ctx, conversationID, seq) } diff --git a/pkg/common/storage/database/mgo/seq.go b/pkg/common/storage/database/mgo/seq_conversation.go similarity index 50% rename from pkg/common/storage/database/mgo/seq.go rename to pkg/common/storage/database/mgo/seq_conversation.go index 9b8c86ffe..c9a3ac41a 100644 --- a/pkg/common/storage/database/mgo/seq.go +++ b/pkg/common/storage/database/mgo/seq_conversation.go @@ -3,6 +3,7 @@ package mgo import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" "go.mongodb.org/mongo-driver/bson" @@ -10,16 +11,24 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -func NewSeqMongo(db *mongo.Database) (*SeqMongo, error) { - coll := db.Collection("seq") - return &SeqMongo{coll: coll}, nil +func NewSeqConversationMongo(db *mongo.Database) (database.SeqConversation, error) { + coll := db.Collection(database.SeqConversationName) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "conversation_id", Value: 1}, + }, + }) + if err != nil { + return nil, err + } + return &seqConversationMongo{coll: coll}, nil } -type SeqMongo struct { +type seqConversationMongo struct { coll *mongo.Collection } -func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { +func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { if size < 0 { return 0, errors.New("size must be greater than 0") } @@ -29,7 +38,7 @@ func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64 filter := map[string]any{"conversation_id": conversationID} update := map[string]any{ "$inc": map[string]any{"max_seq": size}, - "$set": map[string]any{"min_seq": 1}, + "$set": map[string]any{"min_seq": int64(0)}, } opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1}) lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt) @@ -39,7 +48,19 @@ func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64 return lastSeq - size, nil } -func (s *SeqMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { +func (s *seqConversationMongo) MallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) { + first, err := s.Malloc(ctx, conversationID, size) + if err != nil { + return nil, err + } + seqs := make([]int64, 0, size) + for i := int64(0); i < size; i++ { + seqs = append(seqs, first+i+1) + } + return seqs, nil +} + +func (s *seqConversationMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "max_seq": 1})) if err == nil { return seq, nil @@ -50,7 +71,7 @@ func (s *SeqMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, } } -func (s *SeqMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { +func (s *seqConversationMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "min_seq": 1})) if err == nil { return seq, nil @@ -61,10 +82,10 @@ func (s *SeqMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, } } -func (s *SeqMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { +func (s *seqConversationMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { return mongoutil.UpdateOne(ctx, s.coll, bson.M{"conversation_id": conversationID}, bson.M{"$set": bson.M{"min_seq": seq}}, false) } -func (s *SeqMongo) GetConversation(ctx context.Context, conversationID string) (*model.Seq, error) { - return mongoutil.FindOne[*model.Seq](ctx, s.coll, bson.M{"conversation_id": conversationID}) +func (s *seqConversationMongo) GetConversation(ctx context.Context, conversationID string) (*model.SeqConversation, error) { + return mongoutil.FindOne[*model.SeqConversation](ctx, s.coll, bson.M{"conversation_id": conversationID}) } diff --git a/pkg/common/storage/database/mgo/seq_test.go b/pkg/common/storage/database/mgo/seq_conversation_test.go similarity index 100% rename from pkg/common/storage/database/mgo/seq_test.go rename to pkg/common/storage/database/mgo/seq_conversation_test.go diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index f496d4b41..68fa5af02 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -14,5 +14,5 @@ const ( LogName = "log" ObjectName = "s3" UserName = "user" - SeqName = "seq" + SeqConversationName = "seq" ) diff --git a/pkg/common/storage/database/seq.go b/pkg/common/storage/database/seq.go index 20fae3bb1..fdbc2f8f3 100644 --- a/pkg/common/storage/database/seq.go +++ b/pkg/common/storage/database/seq.go @@ -2,7 +2,7 @@ package database import "context" -type Seq interface { +type SeqConversation interface { Malloc(ctx context.Context, conversationID string, size int64) (int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) GetMinSeq(ctx context.Context, conversationID string) (int64, error) diff --git a/pkg/common/storage/model/seq.go b/pkg/common/storage/model/seq.go index 3db009167..1dc75eff1 100644 --- a/pkg/common/storage/model/seq.go +++ b/pkg/common/storage/model/seq.go @@ -1,6 +1,6 @@ package model -type Seq struct { +type SeqConversation struct { ConversationID string `bson:"conversation_id"` MaxSeq int64 `bson:"max_seq"` MinSeq int64 `bson:"min_seq"` diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go new file mode 100644 index 000000000..5cfd9fd45 --- /dev/null +++ b/tools/seq/internal/main.go @@ -0,0 +1,152 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "gopkg.in/yaml.v3" + "os" + "path/filepath" + "strconv" + "strings" + "time" +) + +const batchSize = 5 + +func readConfig[T any](dir string, name string) (*T, error) { + data, err := os.ReadFile(filepath.Join(dir, name)) + if err != nil { + return nil, err + } + var conf T + if err := yaml.Unmarshal(data, &conf); err != nil { + return nil, err + } + return &conf, nil +} + +func redisKey(rdb redis.UniversalClient, prefix string, fn func(ctx context.Context, key string, delKey map[string]struct{}) error) error { + var ( + cursor uint64 + keys []string + err error + ) + ctx := context.Background() + for { + keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result() + if err != nil { + return err + } + delKey := make(map[string]struct{}) + if len(keys) > 0 { + for _, key := range keys { + if err := fn(ctx, key, delKey); err != nil { + return err + } + } + } + if len(delKey) > 0 { + //if err := rdb.Del(ctx, datautil.Keys(delKey)...).Err(); err != nil { + // return err + //} + } + if cursor == 0 { + return nil + } + } +} + +func Main(conf string) error { + redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName) + if err != nil { + return err + } + mongodbConfig, err := readConfig[config.Mongo](conf, cmd.MongodbConfigFileName) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + rdb, err := redisutil.NewRedisClient(ctx, redisConfig.Build()) + if err != nil { + return err + } + mgocli, err := mongoutil.NewMongoDB(ctx, mongodbConfig.Build()) + if err != nil { + return err + } + if _, err := mgo.NewSeqConversationMongo(mgocli.GetDB()); err != nil { + return err + } + coll := mgocli.GetDB().Collection(database.SeqConversationName) + const prefix = cachekey.MaxSeq + return redisKey(rdb, prefix, func(ctx context.Context, key string, delKey map[string]struct{}) error { + conversationId := strings.TrimPrefix(key, prefix) + delKey[key] = struct{}{} + maxValue, err := rdb.Get(ctx, key).Result() + if err != nil { + return err + } + seq, err := strconv.Atoi(maxValue) + if err != nil { + return fmt.Errorf("invalid max seq %s", maxValue) + } + if seq == 0 { + return nil + } + if seq < 0 { + return fmt.Errorf("invalid max seq %s", maxValue) + } + var ( + minSeq int64 + maxSeq = int64(seq) + ) + minKey := cachekey.MinSeq + conversationId + delKey[minKey] = struct{}{} + minValue, err := rdb.Get(ctx, minKey).Result() + if err == nil { + seq, err := strconv.Atoi(minValue) + if err != nil { + return fmt.Errorf("invalid min seq %s", minValue) + } + if seq < 0 { + return fmt.Errorf("invalid min seq %s", minValue) + } + minSeq = int64(seq) + } else if !errors.Is(err, redis.Nil) { + return err + } + if maxSeq < minSeq { + return fmt.Errorf("invalid max seq %d < min seq %d", maxSeq, minSeq) + } + res, err := mongoutil.FindOne[*model.SeqConversation](ctx, coll, bson.M{"conversation_id": conversationId}, nil) + if err == nil { + if res.MaxSeq < int64(seq) { + _, err = coll.UpdateOne(ctx, bson.M{"conversation_id": conversationId}, bson.M{"$set": bson.M{"max_seq": maxSeq, "min_seq": minSeq}}) + } + return err + } else if errors.Is(err, mongo.ErrNoDocuments) { + res = &model.SeqConversation{ + ConversationID: conversationId, + MaxSeq: maxSeq, + MinSeq: minSeq, + } + _, err := coll.InsertOne(ctx, res) + return err + } else { + return err + } + }) +} diff --git a/tools/seq/main.go b/tools/seq/main.go new file mode 100644 index 000000000..ca5a043e7 --- /dev/null +++ b/tools/seq/main.go @@ -0,0 +1,16 @@ +package main + +import ( + "flag" + "fmt" + "github.com/openimsdk/open-im-server/v3/tools/seq/internal" +) + +func main() { + var config string + flag.StringVar(&config, "redis", "/Users/chao/Desktop/project/open-im-server/config", "config directory") + flag.Parse() + if err := internal.Main(config); err != nil { + fmt.Println("seq task", err) + } +}