Merge branch 'convincr' into allmerge

# Conflicts:
#	pkg/common/storage/database/name.go
pull/2409/head
withchao 1 year ago
commit 4875a34ff4

@ -50,3 +50,11 @@ func (o *ConversationApi) SetConversations(c *gin.Context) {
func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client, c)
}
func (o *ConversationApi) GetFullOwnerConversationIDs(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetFullOwnerConversationIDs, o.Client, c)
}
func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetIncrementalConversation, o.Client, c)
}

@ -207,6 +207,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
conversationGroup.POST("/get_conversations", c.GetConversations)
conversationGroup.POST("/set_conversations", c.SetConversations)
conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs)
conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs)
conversationGroup.POST("/get_incremental_conversation", c.GetIncrementalConversation)
}
statisticsGroup := r.Group("/statistics")

@ -184,13 +184,23 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon
}
func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) {
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
conversations, err := c.getConversations(ctx, req.OwnerUserID, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbconversation.GetConversationsResp{
Conversations: conversations,
}, nil
}
func (c *conversationServer) getConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
conversations, err := c.conversationDatabase.FindConversations(ctx, ownerUserID, conversationIDs)
if err != nil {
return nil, err
}
resp := &pbconversation.GetConversationsResp{Conversations: []*pbconversation.Conversation{}}
resp.Conversations = convert.ConversationsDB2Pb(conversations)
return resp, nil
return convert.ConversationsDB2Pb(conversations), nil
}
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {

@ -0,0 +1,56 @@
package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
"github.com/openimsdk/protocol/conversation"
)
func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, req *conversation.GetFullOwnerConversationIDsReq) (*conversation.GetFullOwnerConversationIDsResp, error) {
vl, err := c.conversationDatabase.FindMaxConversationUserVersionCache(ctx, req.UserID)
if err != nil {
return nil, err
}
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
idHash := hashutil.IdHash(conversationIDs)
if req.IdHash == idHash {
conversationIDs = nil
}
return &conversation.GetFullOwnerConversationIDsResp{
Version: idHash,
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
ConversationIDs: conversationIDs,
}, nil
}
func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req *conversation.GetIncrementalConversationReq) (*conversation.GetIncrementalConversationResp, error) {
opt := incrversion.Option[*conversation.Conversation, conversation.GetIncrementalConversationResp]{
Ctx: ctx,
VersionKey: req.UserID,
VersionID: req.VersionID,
VersionNumber: req.Version,
Version: c.conversationDatabase.FindConversationUserVersion,
CacheMaxVersion: c.conversationDatabase.FindMaxConversationUserVersionCache,
Find: func(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
return c.getConversations(ctx, req.UserID, conversationIDs)
},
ID: func(elem *conversation.Conversation) string { return elem.GroupID },
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*conversation.Conversation, full bool) *conversation.GetIncrementalConversationResp {
return &conversation.GetIncrementalConversationResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: delIDs,
Insert: insertList,
Update: updateList,
}
},
}
return opt.Build()
}

@ -340,3 +340,8 @@ type FormDataMate struct {
Group string `json:"group"`
Key string `json:"key"`
}
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
//TODO implement me
panic("implement me")
}

@ -43,6 +43,7 @@ type thirdServer struct {
defaultExpire time.Duration
config *Config
}
type Config struct {
RpcConfig config.Third
RedisConfig config.Redis

@ -669,3 +669,8 @@ func (s *userServer) SortQuery(ctx context.Context, req *pbuser.SortQueryReq) (*
}
return &pbuser.SortQueryResp{Users: convert.UsersDB2Pb(users)}, nil
}
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
//TODO implement me
panic("implement me")
}

@ -23,6 +23,7 @@ const (
SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
ConversationUserMaxKey = "CONVERSATION_USER_MAX:"
)
func GetConversationKey(ownerUserID, conversationID string) string {
@ -56,3 +57,7 @@ func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string {
func GetUserConversationIDsHashKey(ownerUserID string) string {
return ConversationIDsHashKey + ownerUserID
}
func GetConversationUserMaxVersionKey(userID string) string {
return ConversationUserMaxKey + userID
}

@ -54,4 +54,8 @@ type ConversationCache interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
}

@ -95,6 +95,10 @@ func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID strin
return cachekey.GetUserConversationIDsHashKey(ownerUserID)
}
func (c *ConversationRedisCache) getConversationUserMaxVersionKey(ownerUserID string) string {
return cachekey.GetConversationUserMaxVersionKey(ownerUserID)
}
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
@ -233,6 +237,19 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers
for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
}
return cache
}
func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getConversationUserMaxVersionKey(userID))
}
return cache
}
func (c *ConversationRedisCache) FindMaxConversationUserVersion(ctx context.Context, userID string) (*model.VersionLog, error) {
return getCache(ctx, c.rcClient, c.getConversationUserMaxVersionKey(userID), c.expireTime, func(ctx context.Context) (*model.VersionLog, error) {
return c.conversationDB.FindConversationUserVersion(ctx, userID, 0, 0)
})
}

@ -66,6 +66,8 @@ type ConversationDatabase interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
// GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
// FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error)
FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error)
}
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@ -106,6 +108,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
if _, ok := fieldMap["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs)
log.ZDebug(ctx, "SetUsersConversationFieldTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
@ -137,7 +140,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
return err
}
cache := c.cache.CloneConversationCache()
cache = cache.DelUsersConversation(conversationID, userIDs...)
cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...)
if _, ok := args["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
}
@ -155,13 +158,14 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
userIDs = append(userIDs, conversation.OwnerUserID)
}
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ChainExecDel(ctx)
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx)
}
func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
for _, conversation := range conversations {
cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID)
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
ownerUserID := v[0]
userID := v[1]
@ -207,6 +211,7 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID)
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
}))
@ -322,3 +327,11 @@ func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Contex
func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
}
func (c *conversationDatabase) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) {
return c.conversationDB.FindConversationUserVersion(ctx, userID, version, limit)
}
func (c *conversationDatabase) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) {
return c.cache.FindMaxConversationUserVersion(ctx, userID)
}

@ -22,7 +22,6 @@ import (
type Conversation interface {
Create(ctx context.Context, conversations []*model.Conversation) (err error)
Delete(ctx context.Context, groupIDs []string) (err error)
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error)
Update(ctx context.Context, conversation *model.Conversation) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
@ -39,4 +38,5 @@ type Conversation interface {
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
}

@ -41,40 +41,71 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
if err != nil {
return nil, errs.Wrap(err)
}
return &ConversationMgo{coll: coll}, nil
version, err := NewVersionLog(db.Collection(database.ConversationVersionName))
if err != nil {
return nil, err
}
return &ConversationMgo{version: version, coll: coll}, nil
}
type ConversationMgo struct {
version database.VersionLog
coll *mongo.Collection
}
func (c *ConversationMgo) Create(ctx context.Context, conversations []*model.Conversation) (err error) {
return mongoutil.IncrVersion(func() error {
return mongoutil.InsertMany(ctx, c.coll, conversations)
}, func() error {
userConversation := make(map[string][]string)
for _, conversation := range conversations {
userConversation[conversation.OwnerUserID] = append(userConversation[conversation.OwnerUserID], conversation.ConversationID)
}
for userID, conversationIDs := range userConversation {
if err := c.version.IncrVersion(ctx, userID, conversationIDs, model.VersionStateInsert); err != nil {
return err
}
func (c *ConversationMgo) Delete(ctx context.Context, groupIDs []string) (err error) {
return mongoutil.DeleteMany(ctx, c.coll, bson.M{"group_id": bson.M{"$in": groupIDs}})
}
return nil
})
}
func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) {
if len(args) == 0 {
func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (int64, error) {
if len(args) == 0 || len(userIDs) == 0 {
return 0, nil
}
filter := bson.M{
"conversation_id": conversationID,
"owner_user_id": bson.M{"$in": userIDs},
}
if len(userIDs) > 0 {
filter["owner_user_id"] = bson.M{"$in": userIDs}
}
var rows int64
err := mongoutil.IncrVersion(func() error {
res, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
if err != nil {
return err
}
rows = res.ModifiedCount
return nil
}, func() error {
for _, userID := range userIDs {
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateUpdate); err != nil {
return err
}
}
return nil
})
if err != nil {
return 0, err
}
return res.ModifiedCount, nil
return rows, nil
}
func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) {
return mongoutil.IncrVersion(func() error {
return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)
}, func() error {
return c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate)
})
}
func (c *ConversationMgo) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) {
@ -178,3 +209,7 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co
options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}),
)
}
func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
return c.version.FindChangeLog(ctx, userID, version, limit)
}

@ -10,6 +10,7 @@ const (
GroupMemberName = "group_member"
GroupMemberVersionName = "group_member_version"
GroupJoinVersionName = "group_join_version"
ConversationVersionName = "conversation_version"
GroupRequestName = "group_request"
LogName = "log"
ObjectName = "s3"

Loading…
Cancel
Save