pull/2393/head
withchao 1 year ago
parent d55d416f68
commit a2a28b43c5

@ -87,7 +87,12 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil { if err != nil {
return err return err
} }
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
}
seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig)
if err != nil { if err != nil {
return err return err
} }

@ -91,7 +91,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
}
seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, seqConversationCache, &config.KafkaConfig)
if err != nil { if err != nil {
return err return err
} }

@ -15,24 +15,24 @@
package cachekey package cachekey
const ( const (
maxSeq = "MAX_SEQ:" MaxSeq = "MAX_SEQ:"
minSeq = "MIN_SEQ:" MinSeq = "MIN_SEQ:"
conversationUserMinSeq = "CON_USER_MIN_SEQ:" ConversationUserMinSeq = "CON_USER_MIN_SEQ:"
hasReadSeq = "HAS_READ_SEQ:" HasReadSeq = "HAS_READ_SEQ:"
) )
func GetMaxSeqKey(conversationID string) string { func GetMaxSeqKey(conversationID string) string {
return maxSeq + conversationID return MaxSeq + conversationID
} }
func GetMinSeqKey(conversationID string) string { func GetMinSeqKey(conversationID string) string {
return minSeq + conversationID return MinSeq + conversationID
} }
func GetHasReadSeqKey(conversationID string, userID string) string { func GetHasReadSeqKey(conversationID string, userID string) string {
return hasReadSeq + userID + ":" + conversationID return HasReadSeq + userID + ":" + conversationID
} }
func GetConversationUserMinSeqKey(conversationID, userID string) string { func GetConversationUserMinSeqKey(conversationID, userID string) string {
return conversationUserMinSeq + conversationID + "u:" + userID return ConversationUserMinSeq + conversationID + "u:" + userID
} }

@ -2,7 +2,6 @@ package cachekey
const ( const (
MallocSeq = "MALLOC_SEQ:" MallocSeq = "MALLOC_SEQ:"
MallocSeqLock = "MALLOC_SEQ_LOCK:"
MallocMinSeqLock = "MALLOC_MIN_SEQ:" MallocMinSeqLock = "MALLOC_MIN_SEQ:"
) )
@ -10,10 +9,6 @@ func GetMallocSeqKey(conversationID string) string {
return MallocSeq + conversationID return MallocSeq + conversationID
} }
func GetMallocSeqLockKey(conversationID string) string {
return MallocSeqLock + conversationID
}
func GetMallocMinSeqKey(conversationID string) string { func GetMallocMinSeqKey(conversationID string) string {
return MallocMinSeqLock + conversationID return MallocMinSeqLock + conversationID
} }

@ -1,36 +0,0 @@
### mongo
```go
type Seq struct {
ConversationID string `bson:"conversation_id"`
MaxSeq int64 `bson:"max_seq"`
MinSeq int64 `bson:"min_seq"`
}
```
```go
type Seq interface {
Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
}
```
1. Malloc 申请seq数量返回的已有seq的最大值消息用的第一个seq是返回值+1
2. GetMaxSeq 获取申请的seq的最大值在发消息的seq小于这个值
3. GetMinSeq 获取最小的seq用于拉取历史消息
4. SetMinSeq 设置最小的seq用于拉取历史消息
### redis
```go
type RedisSeq struct {
Curr int64 // 当前的最大seq
Last int64 // mongodb中申请的最大seq
Lock *int64 // 锁用于在mongodb中申请seq
}
```
1. Malloc 申请seq数量返回的已有seq的最大值消息用的第一个seq是返回值+1如果redis中申请数量够用直接返回并自增对应数量。如果redis中申请数量不够用加锁从mongodb中申请seq。
2. GetMaxSeq 获取已发消息的最大seq就是Curr的值。如果redis中缓存不存在就通过mongodb获取最大seq。存储在redis中。其中Curr和Last都是这个seq值。
3. GetMinSeq, SetMinSeq用之前rockscache的方案。

@ -1,246 +0,0 @@
package redis
import (
"context"
"errors"
"fmt"
"github.com/dtm-labs/rockscache"
"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
var errLock = errors.New("lock failed")
type MallocSeq interface {
Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
}
func NewSeqCache1(rdb redis.UniversalClient, mgo database.Seq) MallocSeq {
opt := rockscache.NewDefaultOptions()
opt.EmptyExpire = time.Second * 3
opt.Delay = time.Second / 2
return &seqCache1{
rdb: rdb,
mgo: mgo,
rocks: rockscache.NewClient(rdb, opt),
lockExpire: time.Minute * 1,
seqExpire: time.Hour * 24 * 7,
minSeqExpire: time.Hour * 1,
groupMinNum: 1000,
userMinNum: 100,
}
}
type seqCache1 struct {
rdb redis.UniversalClient
rocks *rockscache.Client
mgo database.Seq
lockExpire time.Duration
seqExpire time.Duration
minSeqExpire time.Duration
groupMinNum int64
userMinNum int64
}
/*
1
2
3
4
5
6
7
8
9
10
*/
func (s *seqCache1) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
for i := 0; i < 10; i++ {
res, err := s.rdb.LIndex(ctx, cachekey.GetMallocSeqKey(conversationID), 0).Int64()
if err == nil {
return res, nil
} else if !errors.Is(err, redis.Nil) {
return 0, errs.Wrap(err)
}
if err := s.mallocSeq(ctx, conversationID, 0, nil); err != nil {
return 0, err
}
}
return 0, errs.New("get max seq timeout")
}
func (s *seqCache1) unlock(ctx context.Context, key string, owner string) error {
script := `
local value = redis.call("GET", KEYS[1])
if value == false then
return 0
end
if value == ARGV[1] then
redis.call("DEL", KEYS[1])
return 1
end
return 2
`
state, err := s.rdb.Eval(ctx, script, []string{key}, owner).Int()
if err != nil {
return errs.Wrap(err)
}
switch state {
case 0:
return errs.Wrap(redis.Nil)
case 1:
return nil
case 2:
return errs.New("not the lock holder")
default:
return errs.New(fmt.Sprintf("unknown state: %d", state))
}
}
func (s *seqCache1) initMallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) {
owner := uuid.New().String()
ok, err := s.rdb.SetNX(ctx, cachekey.GetMallocSeqLockKey(conversationID), owner, s.lockExpire).Result()
if err != nil {
return nil, err
}
seq, err := s.mgo.Malloc(ctx, conversationID, size)
if err != nil {
return nil, err
}
seqs := make([]int64, 0, size)
for i := seq - size + 1; i <= seq; i++ {
seqs = append(seqs, i)
}
return seqs, nil
}
func (s *seqCache1) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return getCache[int64](ctx, s.rocks, cachekey.GetMallocMinSeqKey(conversationID), s.minSeqExpire, func(ctx context.Context) (int64, error) {
return s.mgo.GetMinSeq(ctx, conversationID)
})
}
func (s *seqCache1) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil {
return err
}
return s.deleteMinSeqCache(ctx, conversationID)
}
func (s *seqCache1) Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) {
if size <= 0 {
return nil, errs.Wrap(errors.New("size must be greater than 0"))
}
seqKey := cachekey.GetMallocSeqKey(conversationID)
lockKey := cachekey.GetMallocSeqLockKey(conversationID)
for i := 0; i < 10; i++ {
seqs, err := s.lpop(ctx, seqKey, lockKey, size)
if err != nil {
return nil, err
}
if len(seqs) < int(size) {
if err := s.mallocSeq(ctx, conversationID, size, &seqs); err != nil {
return nil, err
}
}
if len(seqs) >= int(size) {
return seqs, nil
}
}
return nil, errs.ErrInternalServer.WrapMsg("malloc seq failed")
}
func (s *seqCache1) push(ctx context.Context, seqKey string, seqs []int64) error {
script := `
redis.call("DEL", KEYS[1])
for i = 2, #ARGV do
redis.call("RPUSH", KEYS[1], ARGV[i])
end
redis.call("EXPIRE", KEYS[1], ARGV[1])
return 1
`
argv := make([]any, 0, 1+len(seqs))
argv = append(argv, s.seqExpire.Seconds())
for _, seq := range seqs {
argv = append(argv, seq)
}
err := s.rdb.Eval(ctx, script, []string{seqKey}, argv...).Err()
return errs.Wrap(err)
}
func (s *seqCache1) lpop(ctx context.Context, seqKey, lockKey string, size int64) ([]int64, error) {
script := `
local result = redis.call("LRANGE", KEYS[1], 0, ARGV[1]-1)
if #result == 0 then
return result
end
redis.call("LTRIM", KEYS[1], #result, -1)
if redis.call("LLEN", KEYS[1]) == 0 then
redis.call("DEL", KEYS[2])
end
return result
`
res, err := s.rdb.Eval(ctx, script, []string{seqKey, lockKey}, size).Int64Slice()
if err != nil {
return nil, errs.Wrap(err)
}
return res, nil
}
func (s *seqCache1) getMongoStepSize(conversationID string, size int64) int64 {
var num int64
if msgprocessor.IsGroupConversationID(conversationID) {
num = s.groupMinNum
} else {
num = s.userMinNum
}
if size > num {
num += size
}
return num
}
func (s *seqCache1) mallocSeq(ctx context.Context, conversationID string, size int64, seqs *[]int64) error {
var delMinSeqKey bool
_, err := getCache[string](ctx, s.rocks, cachekey.GetMallocSeqLockKey(conversationID), s.lockExpire, func(ctx context.Context) (string, error) {
res, err := s.mgo.Malloc(ctx, conversationID, s.getMongoStepSize(conversationID, size))
if err != nil {
return "", err
}
delMinSeqKey = res[0] == 1
if seqs != nil && size > 0 {
if len(*seqs) > 0 && (*seqs)[len(*seqs)-1]+1 == res[0] {
n := size - int64(len(*seqs))
*seqs = append(*seqs, res[:n]...)
res = res[n:]
} else {
*seqs = res[:size]
res = res[size:]
}
}
if err := s.push(ctx, cachekey.GetMallocSeqKey(conversationID), res); err != nil {
return "", err
}
return strconv.Itoa(int(time.Now().UnixMicro())), nil
})
if delMinSeqKey {
s.deleteMinSeqCache(ctx, conversationID)
}
return err
}
func (s *seqCache1) deleteMinSeqCache(ctx context.Context, conversationID string) error {
return s.rocks.TagAsDeleted2(ctx, cachekey.GetMallocMinSeqKey(conversationID))
}

@ -3,74 +3,68 @@ package redis
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time" "time"
) )
type RedisHash struct { func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache {
NextSeq int64 return &seqConversationCacheRedis{
LastSeq int64 rdb: rdb,
mgo: mgo,
lockTime: time.Second * 3,
dataTime: time.Hour * 24 * 365,
minSeqExpireTime: time.Hour,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
}
} }
func NewTestSeq() *SeqMalloc { type seqConversationCacheRedis struct {
mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) rdb redis.UniversalClient
if err != nil { mgo database.SeqConversation
panic(err) rocks *rockscache.Client
} lockTime time.Duration
model, err := mgo.NewSeqMongo(mgocli.Database("openim_v3")) dataTime time.Duration
if err != nil { minSeqExpireTime time.Duration
panic(err) }
}
opt := &redis.Options{ func (s *seqConversationCacheRedis) getMinSeqKey(conversationID string) string {
Addr: "172.16.8.48:16379", return cachekey.GetMallocMinSeqKey(conversationID)
Password: "openIM123",
DB: 1,
}
rdb := redis.NewClient(opt)
if err := rdb.Ping(context.Background()).Err(); err != nil {
panic(err)
}
return &SeqMalloc{
rdb: rdb,
mgo: model,
//lockTime: time.Second * 30,
lockTime: time.Second * 60 * 60 * 24 * 1,
dataTime: time.Second * 60 * 60 * 24 * 7,
}
} }
type RedisSeq struct { func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
Curr int64 if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil {
Last int64 return err
Lock *int64 }
if err := s.rocks.TagAsDeleted2(ctx, s.getMinSeqKey(conversationID)); err != nil {
return errs.Wrap(err)
}
return nil
} }
type SeqMalloc struct { func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
rdb redis.UniversalClient return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) {
mgo database.Seq return s.mgo.GetMinSeq(ctx, conversationID)
lockTime time.Duration })
dataTime time.Duration
} }
func (s *SeqMalloc) getSeqMallocKey(conversationID string) string { func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) string {
return cachekey.GetMallocSeqKey(conversationID) return cachekey.GetMallocSeqKey(conversationID)
} }
func (s *SeqMalloc) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) { func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) {
if lastSeq < currSeq { if lastSeq < currSeq {
return 0, errs.New("lastSeq must be greater than currSeq") return 0, errs.New("lastSeq must be greater than currSeq")
} }
// 0 成功 // 0: success
// 1 成功 锁过期,但未被其他人锁 // 1: success the lock has expired, but has not been locked by anyone else
// 2 已经被锁,但是锁的不是自己 // 2: already locked, but not by yourself
script := ` script := `
local key = KEYS[1] local key = KEYS[1]
local lockValue = ARGV[1] local lockValue = ARGV[1]
@ -97,12 +91,12 @@ return 0
return result, nil return result, nil
} }
// malloc size=0为获取当前seq size>0为分配seq // malloc size=0 is to get the current seq size>0 is to allocate seq
func (s *SeqMalloc) malloc(ctx context.Context, key string, size int64) ([]int64, error) { func (s *seqConversationCacheRedis) malloc(ctx context.Context, key string, size int64) ([]int64, error) {
// 0 成功 // 0: success
// 1 需要获取,并加锁 // 1: need to obtain and lock
// 2 已经被锁 // 2: already locked
// 3 超过最大值,并加锁 // 3: exceeded the maximum value and locked
script := ` script := `
local key = KEYS[1] local key = KEYS[1]
local size = tonumber(ARGV[1]) local size = tonumber(ARGV[1])
@ -156,7 +150,7 @@ return result
return result, nil return result, nil
} }
func (s *SeqMalloc) wait(ctx context.Context) error { func (s *seqConversationCacheRedis) wait(ctx context.Context) error {
timer := time.NewTimer(time.Second / 4) timer := time.NewTimer(time.Second / 4)
defer timer.Stop() defer timer.Stop()
select { select {
@ -167,7 +161,7 @@ func (s *SeqMalloc) wait(ctx context.Context) error {
} }
} }
func (s *SeqMalloc) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) { func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq) state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq)
if err != nil { if err != nil {
@ -191,13 +185,13 @@ func (s *SeqMalloc) setSeqRetry(ctx context.Context, key string, owner int64, cu
log.ZError(ctx, "set seq cache retrying still failed", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq) log.ZError(ctx, "set seq cache retrying still failed", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
} }
func (s *SeqMalloc) getMallocSize(conversationID string, size int64) int64 { func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size int64) int64 {
if size == 0 { if size == 0 {
return 0 return 0
} }
var basicSize int64 var basicSize int64
if msgprocessor.IsGroupConversationID(conversationID) { if msgprocessor.IsGroupConversationID(conversationID) {
basicSize = 200 basicSize = 100
} else { } else {
basicSize = 50 basicSize = 50
} }
@ -205,7 +199,7 @@ func (s *SeqMalloc) getMallocSize(conversationID string, size int64) int64 {
return basicSize return basicSize
} }
func (s *SeqMalloc) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
if size < 0 { if size < 0 {
return 0, errs.New("size must be greater than 0") return 0, errs.New("size must be greater than 0")
} }
@ -227,7 +221,6 @@ func (s *SeqMalloc) Malloc(ctx context.Context, conversationID string, size int6
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize)
return seq, nil return seq, nil
case 2: // locked case 2: // locked
fmt.Println("locked----->", "conversationID", conversationID, "size", size)
if err := s.wait(ctx); err != nil { if err := s.wait(ctx); err != nil {
return 0, err return 0, err
} }
@ -257,6 +250,6 @@ func (s *SeqMalloc) Malloc(ctx context.Context, conversationID string, size int6
return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size)
} }
func (s *SeqMalloc) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return s.Malloc(ctx, conversationID, 0) return s.Malloc(ctx, conversationID, 0)
} }

@ -0,0 +1,109 @@
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
func newTestSeq() *seqConversationCacheRedis {
mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))
if err != nil {
panic(err)
}
model, err := mgo.NewSeqConversationMongo(mgocli.Database("openim_v3"))
if err != nil {
panic(err)
}
opt := &redis.Options{
Addr: "172.16.8.48:16379",
Password: "openIM123",
DB: 1,
}
rdb := redis.NewClient(opt)
if err := rdb.Ping(context.Background()).Err(); err != nil {
panic(err)
}
return NewSeqConversationCacheRedis(rdb, model).(*seqConversationCacheRedis)
}
func TestSeq(t *testing.T) {
ts := newTestSeq()
var (
wg sync.WaitGroup
speed atomic.Int64
)
const count = 128
wg.Add(count)
for i := 0; i < count; i++ {
index := i + 1
go func() {
defer wg.Done()
var size int64 = 10
cID := strconv.Itoa(index * 1)
for i := 1; ; i++ {
//first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo
first, err := ts.Malloc(context.Background(), cID, size) // redis
if err != nil {
t.Logf("[%d-%d] %s %s", index, i, cID, err)
return
}
speed.Add(size)
_ = first
//t.Logf("[%d] %d -> %d", i, first+1, first+size)
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
ticker := time.NewTicker(time.Second)
for {
select {
case <-done:
ticker.Stop()
return
case <-ticker.C:
value := speed.Swap(0)
t.Logf("speed: %d/s", value)
}
}
}
func TestDel(t *testing.T) {
ts := newTestSeq()
for i := 1; i < 100; i++ {
var size int64 = 100
first, err := ts.Malloc(context.Background(), "100", size)
if err != nil {
t.Logf("[%d] %s", i, err)
return
}
t.Logf("[%d] %d -> %d", i, first+1, first+size)
time.Sleep(time.Second)
}
}
func TestSeqMalloc(t *testing.T) {
ts := newTestSeq()
t.Log(ts.GetMaxSeq(context.Background(), "100"))
}
func TestMinSeq(t *testing.T) {
ts := newTestSeq()
t.Log(ts.GetMinSeq(context.Background(), "10000000"))
}

@ -1,77 +0,0 @@
package redis
import (
"context"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestSeq(t *testing.T) {
ts := NewTestSeq()
var (
wg sync.WaitGroup
speed atomic.Int64
)
const count = 256
wg.Add(count)
for i := 0; i < count; i++ {
index := i + 1
go func() {
defer wg.Done()
var size int64 = 1
cID := strconv.Itoa(index * 100)
for i := 1; ; i++ {
first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo
//first, err := ts.Malloc(context.Background(), cID, size) // redis
if err != nil {
t.Logf("[%d-%d] %s %s", index, i, cID, err)
return
}
speed.Add(size)
_ = first
//t.Logf("[%d] %d -> %d", i, first+1, first+size)
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
ticker := time.NewTicker(time.Second)
for {
select {
case <-done:
ticker.Stop()
return
case <-ticker.C:
value := speed.Swap(0)
t.Logf("speed: %d/s", value)
}
}
//for i := 1; i < 1000000; i++ {
// var size int64 = 100
// first, err := ts.Malloc(context.Background(), "1", size)
// if err != nil {
// t.Logf("[%d] %s", i, err)
// return
// }
// t.Logf("[%d] %d -> %d", i, first+1, first+size)
// time.Sleep(time.Second / 4)
//}
}
func TestDel(t *testing.T) {
ts := NewTestSeq()
t.Log(ts.GetMaxSeq(context.Background(), "1"))
}

@ -5,13 +5,13 @@ import (
) )
type SeqCache interface { type SeqCache interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error //SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) //GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error) //GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error //SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
SetMinSeqs(ctx context.Context, seqs map[string]int64) error //SetMinSeqs(ctx context.Context, seqs map[string]int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) //GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error) //GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error

@ -0,0 +1,10 @@
package cache
import "context"
type SeqConversationCache interface {
Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
}

@ -108,7 +108,7 @@ type CommonMsgDatabase interface {
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
} }
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cache.SeqCache, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil { if err != nil {
return nil, err return nil, err
@ -129,28 +129,19 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seq cach
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
msg: msg, msg: msg,
seq: seq, seq: seq,
seqConversation: seqConversation,
producer: producerToRedis, producer: producerToRedis,
producerToMongo: producerToMongo, producerToMongo: producerToMongo,
producerToPush: producerToPush, producerToPush: producerToPush,
}, nil }, nil
} }
//func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) {
// msgDocModel, err := database.NewMsgMongo(database)
// if err != nil {
// return nil, err
// }
// //todo MsgCacheTimeout
// msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline)
// seq := cache.NewSeqCache(rdb)
// return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig)
//}
type commonMsgDatabase struct { type commonMsgDatabase struct {
msgDocDatabase database.Msg msgDocDatabase database.Msg
msgTable model.MsgDocModel msgTable model.MsgDocModel
msg cache.MsgCache msg cache.MsgCache
seq cache.SeqCache seq cache.SeqCache
seqConversation cache.SeqConversationCache
producer *kafka.Producer producer *kafka.Producer
producerToMongo *kafka.Producer producerToMongo *kafka.Producer
producerToPush *kafka.Producer producerToPush *kafka.Producer
@ -349,10 +340,9 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
} }
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
// TODO set SEQ currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) if err != nil {
if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "storage.seq.Malloc", err)
log.ZError(ctx, "storage.seq.GetMaxSeq", err)
return 0, false, err return 0, false, err
} }
lenList := len(msgs) lenList := len(msgs)
@ -362,9 +352,6 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
if lenList < 1 { if lenList < 1 {
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
} }
if errs.Unwrap(err) == redis.Nil {
isNew = true
}
lastMaxSeq := currentMaxSeq lastMaxSeq := currentMaxSeq
userSeqMap := make(map[string]int64) userSeqMap := make(map[string]int64)
for _, m := range msgs { for _, m := range msgs {
@ -380,13 +367,6 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
} else { } else {
prommetrics.MsgInsertRedisSuccessCounter.Inc() prommetrics.MsgInsertRedisSuccessCounter.Inc()
} }
err = db.seq.SetMaxSeq(ctx, conversationID, currentMaxSeq)
if err != nil {
log.ZError(ctx, "storage.seq.SetMaxSeq error", err, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc()
}
err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap) err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil { if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
@ -519,8 +499,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
minSeq, err := db.seq.GetMinSeq(ctx, conversationID) minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
if userMinSeq > minSeq { if userMinSeq > minSeq {
@ -531,8 +511,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end) log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end)
return 0, 0, nil, nil return 0, 0, nil, nil
} }
maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq) log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq)
@ -572,11 +552,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
var successMsgs []*sdkws.MsgData var successMsgs []*sdkws.MsgData
log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd)
cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil { if err != nil && !errors.Is(err, redis.Nil) {
if err != redis.Nil { log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs)
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs)
}
} }
successMsgs = append(successMsgs, cachedMsgs...) successMsgs = append(successMsgs, cachedMsgs...)
log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs) log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs)
@ -600,12 +577,12 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
minSeq, err := db.seq.GetMinSeq(ctx, conversationID) minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
if userMinSeq < minSeq { if userMinSeq < minSeq {
@ -649,7 +626,7 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
if minSeq == 0 { if minSeq == 0 {
return nil return nil
} }
return db.seq.SetMinSeq(ctx, conversationID, minSeq) return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
} }
func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) {
@ -803,7 +780,7 @@ func (db *commonMsgDatabase) DeleteMsgsBySeqs(ctx context.Context, conversationI
func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) {
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil { if err != nil {
if err == redis.Nil { if err == redis.Nil {
log.ZDebug(ctx, "max seq is nil", "conversationID", conversationID) log.ZDebug(ctx, "max seq is nil", "conversationID", conversationID)
@ -812,7 +789,7 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u
} }
continue continue
} }
if err := db.seq.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { if err := db.seqConversation.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1) log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1)
} }
} }
@ -823,29 +800,37 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u
//} //}
func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.seq.GetMaxSeqs(ctx, conversationIDs) result := make(map[string]int64)
for _, conversationID := range conversationIDs {
if result[conversationID] != 0 {
continue
}
seq, err := db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil {
return nil, err
}
result[conversationID] = seq
}
return result, nil
} }
func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return db.seq.GetMaxSeq(ctx, conversationID) return db.seqConversation.GetMaxSeq(ctx, conversationID)
} }
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return db.seq.SetMinSeq(ctx, conversationID, minSeq) return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
} }
func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return db.seq.SetMinSeqs(ctx, seqs) for conversationID, seq := range seqs {
if err := db.seqConversation.SetMinSeq(ctx, conversationID, seq); err != nil {
return err
}
}
return nil
} }
//func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
// return db.seq.GetMinSeqs(ctx, conversationIDs)
//}
//
//func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
// return db.seq.GetMinSeq(ctx, conversationID)
//}
func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
} }
@ -895,11 +880,11 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
if err != nil { if err != nil {
return return
} }
minSeqCache, err = db.seq.GetMinSeq(ctx, conversationID) minSeqCache, err = db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil { if err != nil {
return return
} }
maxSeqCache, err = db.seq.GetMaxSeq(ctx, conversationID) maxSeqCache, err = db.seqConversation.GetMaxSeq(ctx, conversationID)
if err != nil { if err != nil {
return return
} }
@ -1011,33 +996,8 @@ func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, d
} }
} }
//func (db *commonMsgDatabase) ClearMsg(ctx context.Context, ts int64) (err error) {
// var (
// docNum int
// msgNum int
// start = time.Now()
// )
// for {
// msgs, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, 100)
// if err != nil {
// return err
// }
// if len(msgs) == 0 {
// return nil
// }
// for _, msg := range msgs {
// num, err := db.deleteOneMsg(ctx, ts, msg)
// if err != nil {
// return err
// }
// docNum++
// msgNum += num
// }
// }
//}
func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error { func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error {
dbSeq, err := db.seq.GetMinSeq(ctx, conversationID) dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil { if err != nil {
if errors.Is(errs.Unwrap(err), redis.Nil) { if errors.Is(errs.Unwrap(err), redis.Nil) {
return nil return nil
@ -1047,5 +1007,5 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin
if dbSeq >= seq { if dbSeq >= seq {
return nil return nil
} }
return db.seq.SetMinSeq(ctx, conversationID, seq) return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
} }

@ -3,6 +3,7 @@ package mgo
import ( import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
@ -10,16 +11,24 @@ import (
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
) )
func NewSeqMongo(db *mongo.Database) (*SeqMongo, error) { func NewSeqConversationMongo(db *mongo.Database) (database.SeqConversation, error) {
coll := db.Collection("seq") coll := db.Collection(database.SeqConversationName)
return &SeqMongo{coll: coll}, nil _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "conversation_id", Value: 1},
},
})
if err != nil {
return nil, err
}
return &seqConversationMongo{coll: coll}, nil
} }
type SeqMongo struct { type seqConversationMongo struct {
coll *mongo.Collection coll *mongo.Collection
} }
func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
if size < 0 { if size < 0 {
return 0, errors.New("size must be greater than 0") return 0, errors.New("size must be greater than 0")
} }
@ -29,7 +38,7 @@ func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64
filter := map[string]any{"conversation_id": conversationID} filter := map[string]any{"conversation_id": conversationID}
update := map[string]any{ update := map[string]any{
"$inc": map[string]any{"max_seq": size}, "$inc": map[string]any{"max_seq": size},
"$set": map[string]any{"min_seq": 1}, "$set": map[string]any{"min_seq": int64(0)},
} }
opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1}) opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1})
lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt) lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt)
@ -39,7 +48,19 @@ func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64
return lastSeq - size, nil return lastSeq - size, nil
} }
func (s *SeqMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { func (s *seqConversationMongo) MallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) {
first, err := s.Malloc(ctx, conversationID, size)
if err != nil {
return nil, err
}
seqs := make([]int64, 0, size)
for i := int64(0); i < size; i++ {
seqs = append(seqs, first+i+1)
}
return seqs, nil
}
func (s *seqConversationMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "max_seq": 1})) seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "max_seq": 1}))
if err == nil { if err == nil {
return seq, nil return seq, nil
@ -50,7 +71,7 @@ func (s *SeqMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64,
} }
} }
func (s *SeqMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { func (s *seqConversationMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "min_seq": 1})) seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "min_seq": 1}))
if err == nil { if err == nil {
return seq, nil return seq, nil
@ -61,10 +82,10 @@ func (s *SeqMongo) GetMinSeq(ctx context.Context, conversationID string) (int64,
} }
} }
func (s *SeqMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { func (s *seqConversationMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
return mongoutil.UpdateOne(ctx, s.coll, bson.M{"conversation_id": conversationID}, bson.M{"$set": bson.M{"min_seq": seq}}, false) return mongoutil.UpdateOne(ctx, s.coll, bson.M{"conversation_id": conversationID}, bson.M{"$set": bson.M{"min_seq": seq}}, false)
} }
func (s *SeqMongo) GetConversation(ctx context.Context, conversationID string) (*model.Seq, error) { func (s *seqConversationMongo) GetConversation(ctx context.Context, conversationID string) (*model.SeqConversation, error) {
return mongoutil.FindOne[*model.Seq](ctx, s.coll, bson.M{"conversation_id": conversationID}) return mongoutil.FindOne[*model.SeqConversation](ctx, s.coll, bson.M{"conversation_id": conversationID})
} }

@ -14,5 +14,5 @@ const (
LogName = "log" LogName = "log"
ObjectName = "s3" ObjectName = "s3"
UserName = "user" UserName = "user"
SeqName = "seq" SeqConversationName = "seq"
) )

@ -2,7 +2,7 @@ package database
import "context" import "context"
type Seq interface { type SeqConversation interface {
Malloc(ctx context.Context, conversationID string, size int64) (int64, error) Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error) GetMinSeq(ctx context.Context, conversationID string) (int64, error)

@ -1,6 +1,6 @@
package model package model
type Seq struct { type SeqConversation struct {
ConversationID string `bson:"conversation_id"` ConversationID string `bson:"conversation_id"`
MaxSeq int64 `bson:"max_seq"` MaxSeq int64 `bson:"max_seq"`
MinSeq int64 `bson:"min_seq"` MinSeq int64 `bson:"min_seq"`

@ -0,0 +1,152 @@
package internal
import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"gopkg.in/yaml.v3"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
const batchSize = 5
func readConfig[T any](dir string, name string) (*T, error) {
data, err := os.ReadFile(filepath.Join(dir, name))
if err != nil {
return nil, err
}
var conf T
if err := yaml.Unmarshal(data, &conf); err != nil {
return nil, err
}
return &conf, nil
}
func redisKey(rdb redis.UniversalClient, prefix string, fn func(ctx context.Context, key string, delKey map[string]struct{}) error) error {
var (
cursor uint64
keys []string
err error
)
ctx := context.Background()
for {
keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result()
if err != nil {
return err
}
delKey := make(map[string]struct{})
if len(keys) > 0 {
for _, key := range keys {
if err := fn(ctx, key, delKey); err != nil {
return err
}
}
}
if len(delKey) > 0 {
//if err := rdb.Del(ctx, datautil.Keys(delKey)...).Err(); err != nil {
// return err
//}
}
if cursor == 0 {
return nil
}
}
}
func Main(conf string) error {
redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName)
if err != nil {
return err
}
mongodbConfig, err := readConfig[config.Mongo](conf, cmd.MongodbConfigFileName)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
rdb, err := redisutil.NewRedisClient(ctx, redisConfig.Build())
if err != nil {
return err
}
mgocli, err := mongoutil.NewMongoDB(ctx, mongodbConfig.Build())
if err != nil {
return err
}
if _, err := mgo.NewSeqConversationMongo(mgocli.GetDB()); err != nil {
return err
}
coll := mgocli.GetDB().Collection(database.SeqConversationName)
const prefix = cachekey.MaxSeq
return redisKey(rdb, prefix, func(ctx context.Context, key string, delKey map[string]struct{}) error {
conversationId := strings.TrimPrefix(key, prefix)
delKey[key] = struct{}{}
maxValue, err := rdb.Get(ctx, key).Result()
if err != nil {
return err
}
seq, err := strconv.Atoi(maxValue)
if err != nil {
return fmt.Errorf("invalid max seq %s", maxValue)
}
if seq == 0 {
return nil
}
if seq < 0 {
return fmt.Errorf("invalid max seq %s", maxValue)
}
var (
minSeq int64
maxSeq = int64(seq)
)
minKey := cachekey.MinSeq + conversationId
delKey[minKey] = struct{}{}
minValue, err := rdb.Get(ctx, minKey).Result()
if err == nil {
seq, err := strconv.Atoi(minValue)
if err != nil {
return fmt.Errorf("invalid min seq %s", minValue)
}
if seq < 0 {
return fmt.Errorf("invalid min seq %s", minValue)
}
minSeq = int64(seq)
} else if !errors.Is(err, redis.Nil) {
return err
}
if maxSeq < minSeq {
return fmt.Errorf("invalid max seq %d < min seq %d", maxSeq, minSeq)
}
res, err := mongoutil.FindOne[*model.SeqConversation](ctx, coll, bson.M{"conversation_id": conversationId}, nil)
if err == nil {
if res.MaxSeq < int64(seq) {
_, err = coll.UpdateOne(ctx, bson.M{"conversation_id": conversationId}, bson.M{"$set": bson.M{"max_seq": maxSeq, "min_seq": minSeq}})
}
return err
} else if errors.Is(err, mongo.ErrNoDocuments) {
res = &model.SeqConversation{
ConversationID: conversationId,
MaxSeq: maxSeq,
MinSeq: minSeq,
}
_, err := coll.InsertOne(ctx, res)
return err
} else {
return err
}
})
}

@ -0,0 +1,16 @@
package main
import (
"flag"
"fmt"
"github.com/openimsdk/open-im-server/v3/tools/seq/internal"
)
func main() {
var config string
flag.StringVar(&config, "redis", "/Users/chao/Desktop/project/open-im-server/config", "config directory")
flag.Parse()
if err := internal.Main(config); err != nil {
fmt.Println("seq task", err)
}
}
Loading…
Cancel
Save