// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package controller import ( "context" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/tx" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/stringutil" ) type ConversationDatabase interface { // UpdateUsersConversationField updates the properties of a conversation for specified users. UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error // CreateConversation creates a batch of new conversations. CreateConversation(ctx context.Context, conversations []*relationtb.Conversation) error // SyncPeerUserPrivateConversationTx ensures transactional operation while syncing private conversations between peers. SyncPeerUserPrivateConversationTx(ctx context.Context, conversation []*relationtb.Conversation) error // FindConversations retrieves multiple conversations of a user by conversation IDs. FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.Conversation, error) // GetUserAllConversation fetches all conversations of a user on the server. GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationtb.Conversation, error) // SetUserConversations sets multiple conversation properties for a user, creates new conversations if they do not exist, or updates them otherwise. This operation is atomic. SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error // SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is // transactional. SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error // CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error // GetConversationIDs retrieves conversation IDs for a given user. GetConversationIDs(ctx context.Context, userID string) ([]string, error) // GetUserConversationIDsHash gets the hash of conversation IDs for a given user. GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) // GetAllConversationIDs fetches all conversation IDs. GetAllConversationIDs(ctx context.Context) ([]string, error) // GetAllConversationIDsNumber returns the number of all conversation IDs. GetAllConversationIDsNumber(ctx context.Context) (int64, error) // PageConversationIDs paginates through conversation IDs based on the specified pagination settings. PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) // GetConversationsByConversationID retrieves conversations by their IDs. GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) // GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria. GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) // GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages. GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) // GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) // FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error) // GetNotNotifyConversationIDs gets not notify conversationIDs by userID GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) // GetPinnedConversationIDs gets pinned conversationIDs by userID GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { return &conversationDatabase{ conversationDB: conversation, cache: cache, tx: tx, } } type conversationDatabase struct { conversationDB database.Conversation cache cache.ConversationCache tx tx.Tx } func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) (err error) { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() if conversation.GroupID != "" { cache = cache.DelSuperGroupRecvMsgNotNotifyUserIDs(conversation.GroupID).DelSuperGroupRecvMsgNotNotifyUserIDsHash(conversation.GroupID) } haveUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversation.ConversationID}) if err != nil { return err } if len(haveUserIDs) > 0 { _, err = c.conversationDB.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, fieldMap) if err != nil { return err } cache = cache.DelUsersConversation(conversation.ConversationID, haveUserIDs...) if _, ok := fieldMap["has_read_seq"]; ok { for _, userID := range haveUserIDs { cache = cache.DelUserAllHasReadSeqs(userID, conversation.ConversationID) } } if _, ok := fieldMap["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := fieldMap["is_pinned"]; ok { cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs) log.ZDebug(ctx, "SetUsersConversationFieldTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs) var conversations []*relationtb.Conversation now := time.Now() for _, v := range NotUserIDs { temp := new(relationtb.Conversation) if err = datautil.CopyStructFields(temp, conversation); err != nil { return err } temp.OwnerUserID = v temp.CreateTime = now conversations = append(conversations, temp) } if len(conversations) > 0 { err = c.conversationDB.Create(ctx, conversations) if err != nil { return err } cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConversations(conversation.ConversationID, NotUserIDs...) } return cache.ChainExecDel(ctx) }) } func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { _, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) if err != nil { return err } cache := c.cache.CloneConversationCache() cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...) if _, ok := args["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := args["is_pinned"]; ok { cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) } return cache.ChainExecDel(ctx) } func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationtb.Conversation) error { if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } var ( userIDs []string notNotifyUserIDs []string pinnedUserIDs []string ) cache := c.cache.CloneConversationCache() for _, conversation := range conversations { cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) } if conversation.IsPinned == true { pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) } } return cache.DelConversationIDs(userIDs...). DelUserConversationIDsHash(userIDs...). DelConversationVersionUserIDs(userIDs...). DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). DelConversationPinnedMessageUserIDs(pinnedUserIDs...). ChainExecDel(ctx) } func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() for _, conversation := range conversations { cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID, conversation.UserID) for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} { ownerUserID := v[0] userID := v[1] haveUserIDs, err := c.conversationDB.FindUserID(ctx, []string{ownerUserID}, []string{conversation.ConversationID}) if err != nil { return err } if len(haveUserIDs) > 0 { _, err := c.conversationDB.UpdateByMap(ctx, []string{ownerUserID}, conversation.ConversationID, map[string]any{"is_private_chat": conversation.IsPrivateChat}) if err != nil { return err } cache = cache.DelUsersConversation(conversation.ConversationID, ownerUserID) } else { newConversation := *conversation newConversation.OwnerUserID = ownerUserID newConversation.UserID = userID newConversation.ConversationID = conversation.ConversationID newConversation.IsPrivateChat = conversation.IsPrivateChat if err := c.conversationDB.Create(ctx, []*relationtb.Conversation{&newConversation}); err != nil { return err } cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID) } } } return cache.ChainExecDel(ctx) }) } func (c *conversationDatabase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.Conversation, error) { return c.cache.GetConversations(ctx, ownerUserID, conversationIDs) } func (c *conversationDatabase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationtb.Conversation, error) { return c.cache.GetConversation(ctx, ownerUserID, conversationID) } func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationtb.Conversation, error) { return c.cache.GetUserAllConversations(ctx, ownerUserID) } func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() cache = cache.DelConversationVersionUserIDs(ownerUserID). DelConversationNotNotifyMessageUserIDs(ownerUserID). DelConversationPinnedMessageUserIDs(ownerUserID) groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" })) for _, groupID := range groupIDs { cache = cache.DelSuperGroupRecvMsgNotNotifyUserIDs(groupID).DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID) } var conversationIDs []string for _, conversation := range conversations { conversationIDs = append(conversationIDs, conversation.ConversationID) cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) } existConversations, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDs) if err != nil { return err } if len(existConversations) > 0 { for _, conversation := range conversations { err = c.conversationDB.Update(ctx, conversation) if err != nil { return err } } } var existConversationIDs []string for _, conversation := range existConversations { existConversationIDs = append(existConversationIDs, conversation.ConversationID) } var notExistConversations []*relationtb.Conversation for _, conversation := range conversations { if !datautil.Contain(conversation.ConversationID, existConversationIDs...) { notExistConversations = append(notExistConversations, conversation) } } if len(notExistConversations) > 0 { err = c.conversationDB.Create(ctx, notExistConversations) if err != nil { return err } cache = cache.DelConversationIDs(ownerUserID). DelUserConversationIDsHash(ownerUserID). DelConversationNotReceiveMessageUserIDs(datautil.Slice(notExistConversations, func(e *relationtb.Conversation) string { return e.ConversationID })...) } return cache.ChainExecDel(ctx) }) } // func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { // return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) //} func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) if err != nil { return err } notExistUserIDs := stringutil.DifferenceString(userIDs, existConversationUserIDs) var conversations []*relationtb.Conversation for _, v := range notExistUserIDs { conversation := relationtb.Conversation{ConversationType: constant.ReadGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversations = append(conversations, &conversation) cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID) } cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...) if len(conversations) > 0 { err = c.conversationDB.Create(ctx, conversations) if err != nil { return err } } _, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]any{"max_seq": 0}) if err != nil { return err } for _, v := range existConversationUserIDs { cache = cache.DelConversations(v, conversationID) } return cache.ChainExecDel(ctx) }) } func (c *conversationDatabase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { return c.cache.GetUserConversationIDs(ctx, userID) } func (c *conversationDatabase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { return c.cache.GetUserConversationIDsHash(ctx, ownerUserID) } func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]string, error) { return c.conversationDB.GetAllConversationIDs(ctx) } func (c *conversationDatabase) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { return c.conversationDB.GetAllConversationIDsNumber(ctx) } func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) ([]string, error) { return c.conversationDB.PageConversationIDs(ctx, pagination) } func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) { return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs) } func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) { return c.conversationDB.GetConversationIDsNeedDestruct(ctx) } func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) } func (c *conversationDatabase) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) { return c.conversationDB.FindConversationUserVersion(ctx, userID, version, limit) } func (c *conversationDatabase) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) { return c.cache.FindMaxConversationUserVersion(ctx, userID) } func (c *conversationDatabase) GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error) { conversationIDs, err := c.cache.GetUserConversationIDs(ctx, ownerUserID) if err != nil { return 0, nil, err } findConversationIDs := datautil.Paginate(conversationIDs, int(pagination.GetPageNumber()), int(pagination.GetShowNumber())) conversations := make([]*relationtb.Conversation, 0, len(findConversationIDs)) for _, conversationID := range findConversationIDs { conversation, err := c.cache.GetConversation(ctx, ownerUserID, conversationID) if err != nil { return 0, nil, err } conversations = append(conversations, conversation) } return int64(len(conversationIDs)), conversations, nil } func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { conversationIDs, err := c.cache.GetUserNotNotifyConversationIDs(ctx, userID) if err != nil { return nil, err } return conversationIDs, nil } func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) { conversationIDs, err := c.cache.GetPinnedConversationIDs(ctx, userID) if err != nil { return nil, err } return conversationIDs, nil }