pull/2393/head
withchao 1 year ago
parent 1b1027192d
commit ef71d0cf62

@ -0,0 +1,19 @@
package cachekey
const (
MallocSeq = "MALLOC_SEQ:"
MallocSeqLock = "MALLOC_SEQ_LOCK:"
MallocMinSeqLock = "MALLOC_MIN_SEQ:"
)
func GetMallocSeqKey(conversationID string) string {
return MallocSeq + conversationID
}
func GetMallocSeqLockKey(conversationID string) string {
return MallocSeqLock + conversationID
}
func GetMallocMinSeqKey(conversationID string) string {
return MallocMinSeqLock + conversationID
}

@ -0,0 +1,246 @@
package redis
import (
"context"
"errors"
"fmt"
"github.com/dtm-labs/rockscache"
"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
var errLock = errors.New("lock failed")
type MallocSeq interface {
Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
}
func NewSeqCache1(rdb redis.UniversalClient, mgo database.Seq) MallocSeq {
opt := rockscache.NewDefaultOptions()
opt.EmptyExpire = time.Second * 3
opt.Delay = time.Second / 2
return &seqCache1{
rdb: rdb,
mgo: mgo,
rocks: rockscache.NewClient(rdb, opt),
lockExpire: time.Minute * 1,
seqExpire: time.Hour * 24 * 7,
minSeqExpire: time.Hour * 1,
groupMinNum: 1000,
userMinNum: 100,
}
}
type seqCache1 struct {
rdb redis.UniversalClient
rocks *rockscache.Client
mgo database.Seq
lockExpire time.Duration
seqExpire time.Duration
minSeqExpire time.Duration
groupMinNum int64
userMinNum int64
}
/*
1
2
3
4
5
6
7
8
9
10
*/
func (s *seqCache1) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
for i := 0; i < 10; i++ {
res, err := s.rdb.LIndex(ctx, cachekey.GetMallocSeqKey(conversationID), 0).Int64()
if err == nil {
return res, nil
} else if !errors.Is(err, redis.Nil) {
return 0, errs.Wrap(err)
}
if err := s.mallocSeq(ctx, conversationID, 0, nil); err != nil {
return 0, err
}
}
return 0, errs.New("get max seq timeout")
}
func (s *seqCache1) unlock(ctx context.Context, key string, owner string) error {
script := `
local value = redis.call("GET", KEYS[1])
if value == false then
return 0
end
if value == ARGV[1] then
redis.call("DEL", KEYS[1])
return 1
end
return 2
`
state, err := s.rdb.Eval(ctx, script, []string{key}, owner).Int()
if err != nil {
return errs.Wrap(err)
}
switch state {
case 0:
return errs.Wrap(redis.Nil)
case 1:
return nil
case 2:
return errs.New("not the lock holder")
default:
return errs.New(fmt.Sprintf("unknown state: %d", state))
}
}
func (s *seqCache1) initMallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) {
owner := uuid.New().String()
ok, err := s.rdb.SetNX(ctx, cachekey.GetMallocSeqLockKey(conversationID), owner, s.lockExpire).Result()
if err != nil {
return nil, err
}
seq, err := s.mgo.Malloc(ctx, conversationID, size)
if err != nil {
return nil, err
}
seqs := make([]int64, 0, size)
for i := seq - size + 1; i <= seq; i++ {
seqs = append(seqs, i)
}
return seqs, nil
}
func (s *seqCache1) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return getCache[int64](ctx, s.rocks, cachekey.GetMallocMinSeqKey(conversationID), s.minSeqExpire, func(ctx context.Context) (int64, error) {
return s.mgo.GetMinSeq(ctx, conversationID)
})
}
func (s *seqCache1) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil {
return err
}
return s.deleteMinSeqCache(ctx, conversationID)
}
func (s *seqCache1) Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) {
if size <= 0 {
return nil, errs.Wrap(errors.New("size must be greater than 0"))
}
seqKey := cachekey.GetMallocSeqKey(conversationID)
lockKey := cachekey.GetMallocSeqLockKey(conversationID)
for i := 0; i < 10; i++ {
seqs, err := s.lpop(ctx, seqKey, lockKey, size)
if err != nil {
return nil, err
}
if len(seqs) < int(size) {
if err := s.mallocSeq(ctx, conversationID, size, &seqs); err != nil {
return nil, err
}
}
if len(seqs) >= int(size) {
return seqs, nil
}
}
return nil, errs.ErrInternalServer.WrapMsg("malloc seq failed")
}
func (s *seqCache1) push(ctx context.Context, seqKey string, seqs []int64) error {
script := `
redis.call("DEL", KEYS[1])
for i = 2, #ARGV do
redis.call("RPUSH", KEYS[1], ARGV[i])
end
redis.call("EXPIRE", KEYS[1], ARGV[1])
return 1
`
argv := make([]any, 0, 1+len(seqs))
argv = append(argv, s.seqExpire.Seconds())
for _, seq := range seqs {
argv = append(argv, seq)
}
err := s.rdb.Eval(ctx, script, []string{seqKey}, argv...).Err()
return errs.Wrap(err)
}
func (s *seqCache1) lpop(ctx context.Context, seqKey, lockKey string, size int64) ([]int64, error) {
script := `
local result = redis.call("LRANGE", KEYS[1], 0, ARGV[1]-1)
if #result == 0 then
return result
end
redis.call("LTRIM", KEYS[1], #result, -1)
if redis.call("LLEN", KEYS[1]) == 0 then
redis.call("DEL", KEYS[2])
end
return result
`
res, err := s.rdb.Eval(ctx, script, []string{seqKey, lockKey}, size).Int64Slice()
if err != nil {
return nil, errs.Wrap(err)
}
return res, nil
}
func (s *seqCache1) getMongoStepSize(conversationID string, size int64) int64 {
var num int64
if msgprocessor.IsGroupConversationID(conversationID) {
num = s.groupMinNum
} else {
num = s.userMinNum
}
if size > num {
num += size
}
return num
}
func (s *seqCache1) mallocSeq(ctx context.Context, conversationID string, size int64, seqs *[]int64) error {
var delMinSeqKey bool
_, err := getCache[string](ctx, s.rocks, cachekey.GetMallocSeqLockKey(conversationID), s.lockExpire, func(ctx context.Context) (string, error) {
res, err := s.mgo.Malloc(ctx, conversationID, s.getMongoStepSize(conversationID, size))
if err != nil {
return "", err
}
delMinSeqKey = res[0] == 1
if seqs != nil && size > 0 {
if len(*seqs) > 0 && (*seqs)[len(*seqs)-1]+1 == res[0] {
n := size - int64(len(*seqs))
*seqs = append(*seqs, res[:n]...)
res = res[n:]
} else {
*seqs = res[:size]
res = res[size:]
}
}
if err := s.push(ctx, cachekey.GetMallocSeqKey(conversationID), res); err != nil {
return "", err
}
return strconv.Itoa(int(time.Now().UnixMicro())), nil
})
if delMinSeqKey {
s.deleteMinSeqCache(ctx, conversationID)
}
return err
}
func (s *seqCache1) deleteMinSeqCache(ctx context.Context, conversationID string) error {
return s.rocks.TagAsDeleted2(ctx, cachekey.GetMallocMinSeqKey(conversationID))
}

@ -69,26 +69,26 @@ type CommonMsgDatabase interface {
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers.
DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
//SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
//SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
//GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
//GetMinSeq(ctx context.Context, conversationID string) (int64, error)
//GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
//GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
//SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
//SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error)
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
//GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error)
//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error)
@ -349,6 +349,8 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
}
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
// TODO set SEQ
currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
log.ZError(ctx, "storage.seq.GetMaxSeq", err)
@ -817,9 +819,9 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u
}
}
func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return db.seq.SetMaxSeq(ctx, conversationID, maxSeq)
}
//func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
// return db.seq.SetMaxSeq(ctx, conversationID, maxSeq)
//}
func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.seq.GetMaxSeqs(ctx, conversationIDs)
@ -837,13 +839,13 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int
return db.seq.SetMinSeqs(ctx, seqs)
}
func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.seq.GetMinSeqs(ctx, conversationIDs)
}
func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return db.seq.GetMinSeq(ctx, conversationID)
}
//func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
// return db.seq.GetMinSeqs(ctx, conversationIDs)
//}
//
//func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
// return db.seq.GetMinSeq(ctx, conversationID)
//}
func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)

@ -0,0 +1,75 @@
package mgo
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewSeqMongo(db *mongo.Database) (*SeqMongo, error) {
coll := db.Collection("seq")
return &SeqMongo{coll: coll}, nil
}
type SeqMongo struct {
coll *mongo.Collection
}
func (s *SeqMongo) MallocSeq(ctx context.Context, conversationID string, size int64) (int64, error) {
if size <= 0 {
return 0, errors.New("size must be greater than 0")
}
filter := map[string]any{"conversation_id": conversationID}
update := map[string]any{
"$inc": map[string]any{"max_seq": size},
"$set": map[string]any{"min_seq": 1},
}
opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1})
return mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt)
}
func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) {
seq, err := s.MallocSeq(ctx, conversationID, size)
if err != nil {
return nil, err
}
seqs := make([]int64, 0, size)
for i := seq - size + 1; i <= seq; i++ {
seqs = append(seqs, i)
}
return seqs, nil
}
func (s *SeqMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "max_seq": 1}))
if err == nil {
return seq, nil
} else if IsNotFound(err) {
return 0, nil
} else {
return 0, err
}
}
func (s *SeqMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "min_seq": 1}))
if err == nil {
return seq, nil
} else if IsNotFound(err) {
return 0, nil
} else {
return 0, err
}
}
func (s *SeqMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
return mongoutil.UpdateOne(ctx, s.coll, bson.M{"conversation_id": conversationID}, bson.M{"$set": bson.M{"min_seq": seq}}, false)
}
func (s *SeqMongo) GetConversation(ctx context.Context, conversationID string) (*model.Seq, error) {
return mongoutil.FindOne[*model.Seq](ctx, s.coll, bson.M{"conversation_id": conversationID})
}

@ -14,4 +14,5 @@ const (
LogName = "log"
ObjectName = "s3"
UserName = "user"
SeqName = "seq"
)

@ -0,0 +1,10 @@
package database
import "context"
type Seq interface {
Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
}

@ -0,0 +1,7 @@
package model
type Seq struct {
ConversationID string `bson:"conversation_id"`
MaxSeq int64 `bson:"max_seq"`
MinSeq int64 `bson:"min_seq"`
}

@ -24,6 +24,10 @@ import (
"google.golang.org/protobuf/proto"
)
func IsGroupConversationID(conversationID string) bool {
return strings.HasPrefix(conversationID, "g_") || strings.HasPrefix(conversationID, "sg_")
}
func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string {
switch msg.SessionType {
case constant.SingleChatType:

Loading…
Cancel
Save