Merge branch 'openimsdk:main' into main

pull/2458/head
chao 1 year ago committed by GitHub
commit 57b35adcc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,6 +17,7 @@ package group
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
@ -126,25 +127,8 @@ func (g *GroupNotificationSender) getGroupInfo(ctx context.Context, groupID stri
if len(ownerUserIDs) > 0 {
ownerUserID = ownerUserIDs[0]
}
return &sdkws.GroupInfo{
GroupID: gm.GroupID,
GroupName: gm.GroupName,
Notification: gm.Notification,
Introduction: gm.Introduction,
FaceURL: gm.FaceURL,
OwnerUserID: ownerUserID,
CreateTime: gm.CreateTime.UnixMilli(),
MemberCount: num,
Ex: gm.Ex,
Status: gm.Status,
CreatorUserID: gm.CreatorUserID,
GroupType: gm.GroupType,
NeedVerification: gm.NeedVerification,
LookMemberInfo: gm.LookMemberInfo,
ApplyMemberFriend: gm.ApplyMemberFriend,
NotificationUpdateTime: gm.NotificationUpdateTime.UnixMilli(),
NotificationUserID: gm.NotificationUserID,
}, nil
return convert.Db2PbGroupInfo(gm, ownerUserID, num), nil
}
func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
@ -198,29 +182,6 @@ func (g *GroupNotificationSender) getGroupOwnerAndAdminUserID(ctx context.Contex
return datautil.Slice(members, fn), nil
}
//nolint:unused
func (g *GroupNotificationSender) groupDB2PB(group *model.Group, ownerUserID string, memberCount uint32) *sdkws.GroupInfo {
return &sdkws.GroupInfo{
GroupID: group.GroupID,
GroupName: group.GroupName,
Notification: group.Notification,
Introduction: group.Introduction,
FaceURL: group.FaceURL,
OwnerUserID: ownerUserID,
CreateTime: group.CreateTime.UnixMilli(),
MemberCount: memberCount,
Ex: group.Ex,
Status: group.Status,
CreatorUserID: group.CreatorUserID,
GroupType: group.GroupType,
NeedVerification: group.NeedVerification,
LookMemberInfo: group.LookMemberInfo,
ApplyMemberFriend: group.ApplyMemberFriend,
NotificationUpdateTime: group.NotificationUpdateTime.UnixMilli(),
NotificationUserID: group.NotificationUserID,
}
}
func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, appMangerLevel int32) *sdkws.GroupMemberFullInfo {
return &sdkws.GroupMemberFullInfo{
GroupID: member.GroupID,

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/singleflight"
"time"
@ -49,6 +50,7 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac
}
values, err := fn(ctx, queryIds)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "keys", keys, "queryIds", queryIds)
return nil, err
}
if len(values) == 0 {

@ -118,6 +118,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
t, err = fn(ctx)
if err != nil {
log.ZError(ctx, "getCache query database failed", err, "key", key)
return "", err
}
bs, err := json.Marshal(t)

@ -44,38 +44,38 @@ func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID s
return cachekey.GetSeqUserReadSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserCacheRedis) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
return s.mgo.GetUserMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil {
func (s *seqUserCacheRedis) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetUserMaxSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
}
func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserCacheRedis) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
return s.mgo.GetUserMinSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.SetMinSeqs(ctx, userID, map[string]int64{conversationID: seq})
func (s *seqUserCacheRedis) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.SetUserMinSeqs(ctx, userID, map[string]int64{conversationID: seq})
}
func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
return s.mgo.GetUserReadSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
@ -85,10 +85,10 @@ func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID strin
return nil
}
func (s *seqUserCacheRedis) SetMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil {
if err := s.mgo.SetUserMinSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
keys = append(keys, s.getSeqUserMinSeqKey(conversationID, userID))
@ -96,7 +96,7 @@ func (s *seqUserCacheRedis) SetMinSeqs(ctx context.Context, userID string, seqs
return DeleteCacheBySlot(ctx, s.rocks, keys)
}
func (s *seqUserCacheRedis) setRedisReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
func (s *seqUserCacheRedis) setUserRedisReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
keySeq := make(map[string]int64)
for conversationID, seq := range seqs {
@ -121,16 +121,16 @@ func (s *seqUserCacheRedis) setRedisReadSeqs(ctx context.Context, userID string,
return nil
}
func (s *seqUserCacheRedis) SetReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
if len(seqs) == 0 {
return nil
}
if err := s.setRedisReadSeqs(ctx, userID, seqs); err != nil {
if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
@ -138,13 +138,13 @@ func (s *seqUserCacheRedis) SetReadSeqs(ctx context.Context, userID string, seqs
return nil
}
func (s *seqUserCacheRedis) GetReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
func (s *seqUserCacheRedis) GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
res, err := batchGetCache2(ctx, s.rocks, s.readExpireTime, conversationIDs, func(conversationID string) string {
return s.getSeqUserReadSeqKey(conversationID, userID)
}, func(v *readSeqModel) string {
return v.ConversationID
}, func(ctx context.Context, conversationIDs []string) ([]*readSeqModel, error) {
seqs, err := s.mgo.GetReadSeqs(ctx, userID, conversationIDs)
seqs, err := s.mgo.GetUserReadSeqs(ctx, userID, conversationIDs)
if err != nil {
return nil, err
}

@ -3,13 +3,13 @@ package cache
import "context"
type SeqUser interface {
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
}

@ -334,7 +334,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
@ -498,7 +498,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
// "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group.
// This ensures that their message retrieval starts from the point they joined.
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
}
@ -576,7 +576,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
}
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
}
@ -674,12 +674,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 {
userMinSeq := seqs[len(seqs)-1] + 1
currentUserMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil {
return nil, err
}
if currentUserMinSeq < userMinSeq {
if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
}
}
@ -794,23 +794,23 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int
}
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
return db.seqUser.SetMinSeqs(ctx, userID, seqs)
return db.seqUser.SetUserMinSeqs(ctx, userID, seqs)
}
func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return db.seqUser.SetReadSeqs(ctx, userID, hasReadSeqs)
return db.seqUser.SetUserReadSeqs(ctx, userID, hasReadSeqs)
}
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetReadSeq(ctx, conversationID, userID, hasReadSeq)
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seqUser.GetReadSeqs(ctx, userID, conversationIDs)
return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
}
func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return db.seqUser.GetReadSeq(ctx, conversationID, userID)
return db.seqUser.GetUserReadSeq(ctx, conversationID, userID)
}
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {

@ -68,27 +68,27 @@ func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID
}
}
func (s *seqUserMongo) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserMongo) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "max_seq")
}
func (s *seqUserMongo) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserMongo) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "max_seq")
}
func (s *seqUserMongo) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserMongo) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "min_seq")
}
func (s *seqUserMongo) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserMongo) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "min_seq")
}
func (s *seqUserMongo) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "read_seq")
}
func (s *seqUserMongo) GetReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
if len(conversationID) == 0 {
return map[string]int64{}, nil
}
@ -105,6 +105,6 @@ func (s *seqUserMongo) GetReadSeqs(ctx context.Context, userID string, conversat
return res, nil
}
func (s *seqUserMongo) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
}

@ -3,11 +3,11 @@ package database
import "context"
type SeqUser interface {
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error)
GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error)
}

@ -130,14 +130,14 @@ func Main(conf string, del time.Duration) error {
if err != nil {
return 0, err
}
return uSeq.GetReadSeq(ctx, conversationID, userID)
return uSeq.GetUserReadSeq(ctx, conversationID, userID)
},
SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitHasReadSeq(id)
if err != nil {
return err
}
return uSeq.SetReadSeq(ctx, conversationID, userID, seq)
return uSeq.SetUserReadSeq(ctx, conversationID, userID, seq)
},
},
{
@ -147,14 +147,14 @@ func Main(conf string, del time.Duration) error {
if err != nil {
return 0, err
}
return uSeq.GetMinSeq(ctx, conversationID, userID)
return uSeq.GetUserMinSeq(ctx, conversationID, userID)
},
SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitConversationUserMinSeq(id)
if err != nil {
return err
}
return uSeq.SetMinSeq(ctx, conversationID, userID, seq)
return uSeq.SetUserMinSeq(ctx, conversationID, userID, seq)
},
},
}

Loading…
Cancel
Save